import signal, sys, time, os, json import serial import asyncio from pymodbus.server.async_io import ModbusTcpServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext import argparse async def measurement_weight(port, data): command_hex = '4430314B570D0A' command_bytes = bytes.fromhex(command_hex) while True: try: port.write(command_bytes) time.sleep(0.02) result = port.readline() result = str(result, 'utf-8') result = result.replace(' ', '') result = float(result.replace('\r\n', '')) result *= 1000 result += 1000 # print(result) # if result < 0.0: # result = 0.0 data.setValues(3, 1, [int(result)]) except Exception as e: print(f'measurement_weight: \n{e}') port.close() time.sleep(0.5) port.open() pass await asyncio.sleep(1) async def set_zero(port, data): command_hex = '4430314B5A0D0A' command_bytes = bytes.fromhex(command_hex) while True: try: val = data.getValues(1, 1, count=1) if val[0] == 1: port.write(command_bytes) result = port.readline() data.setValues(1, 1, [0]) else: print(f'set_zero: \n{e}') pass except: pass await asyncio.sleep(1) def exit_handler(signum, frame): print('Stop') sys.exit(0) async def run_server(port, data): # TCP 서버 시작 with open(os.path.join('./config.json'), encoding='UTF-8') as f: jsonData = json.load(f) server = ModbusTcpServer(context, address=(jsonData['modbus-server']['address'], jsonData['modbus-server']['port'])) update_coil_task = asyncio.create_task(measurement_weight(port=port, data=data)) update_hr_task = asyncio.create_task(set_zero(port=port, data=data)) await server.serve_forever() await asyncio.gather(update_coil_task, update_hr_task) if __name__ == '__main__': # Define serial port serial_port = serial.Serial('/dev/ttyMAX0', 115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=1) # For detect Ctrl+C signal.signal(signal.SIGINT, exit_handler) # Create ModbusTCP server store = ModbusSlaveContext( co=ModbusSequentialDataBlock(0, [0] * 100), # Coils hr=ModbusSequentialDataBlock(0, [0] * 100) # Holding Registers ) context = ModbusServerContext(slaves=store, single=True) loop = asyncio.get_event_loop() loop.run_until_complete(run_server(port=serial_port, data=store))