You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

基于pymodbus:如何更新运行中ModbusRTU Server的Context

解决Pymodbus RTU Server运行中Context无法更新的问题

看起来你踩了pymodbus Server运行时Context更新的两个核心坑:主线程阻塞导致后续代码无法执行,以及跨进程/线程的Context访问没有线程安全保障。我来帮你拆解问题并给出可落地的解决方案。

为什么你之前的方法无效?

先理清楚根源:

  1. StartSerialServer()默认是阻塞调用,会完全占用主线程——这意味着你在它之后写的任何更新代码都没机会执行,除非把Server放到后台线程运行。
  2. 外部脚本是独立进程,全局变量是进程隔离的——你修改的是外部脚本自己进程里的Context副本,和Server进程里的Context完全没关系。
  3. 直接操作Context没有同步机制——Server在处理客户端请求时会读写Context,并发修改会导致数据不一致,甚至更新被覆盖。

解决方案:分两种场景处理

场景1:同一进程内用线程更新Context

如果你的更新逻辑是和Server在同一个进程里(比如内部定时任务),可以用以下方式实现:

from pymodbus.server import StartSerialServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusServerContext, ModbusSlaveContext
from pymodbus.transaction import ModbusRtuFramer
import threading
import time

# 全局共享:Context和线程锁(必须加锁保证线程安全)
server_context = None
context_lock = threading.Lock()

def create_initial_context():
    """创建初始寄存器上下文"""
    slave_ctx = ModbusSlaveContext(
        di=ModbusSequentialDataBlock(0, [0]*100),
        co=ModbusSequentialDataBlock(0, [0]*100),
        hr=ModbusSequentialDataBlock(0, [100]*100),  # 初始保持寄存器值为100
        ir=ModbusSequentialDataBlock(0, [0]*100)
    )
    return ModbusServerContext(slaves=slave_ctx, single=True)

def thread_safe_update_registers(func_code, addr, value):
    """线程安全的寄存器更新函数"""
    global server_context, context_lock
    with context_lock:
        # 根据功能码更新对应寄存器组
        if func_code == 3:  # 保持寄存器
            server_context[0].setValues(3, addr, [value])
        elif func_code == 1:  # 线圈寄存器
            server_context[0].setValues(1, addr, [value])
        print(f"已更新寄存器:功能码{func_code},地址{addr},值{value}")

def run_modbus_server():
    """在后台线程启动Modbus RTU Server"""
    global server_context
    server_context = create_initial_context()
    StartSerialServer(
        server_context,
        port='/dev/ttyUSB0',  # 替换成你的串口设备
        baudrate=9600,
        framer=ModbusRtuFramer,
        timeout=1
    )

def example_internal_updater():
    """示例:内部线程定时更新寄存器"""
    while True:
        # 每隔5秒更新保持寄存器地址0的值+1
        current_value = server_context[0].getValues(3, 0, 1)[0]
        thread_safe_update_registers(3, 0, current_value + 1)
        time.sleep(5)

if __name__ == "__main__":
    # 启动Modbus Server线程
    server_thread = threading.Thread(target=run_modbus_server)
    server_thread.daemon = True
    server_thread.start()

    # 启动内部更新线程
    updater_thread = threading.Thread(target=example_internal_updater)
    updater_thread.daemon = True
    updater_thread.start()

    # 主线程保持运行
    input("按回车键退出服务器...\n")

场景2:外部独立脚本更新Context

如果需要从完全独立的脚本更新,必须通过**进程间通信(IPC)**让外部脚本把更新指令传给Server进程,再由Server内部线程安全地修改Context。这里用TCP套接字实现最简单:

第一步:修改后的Server代码(带TCP更新监听)

from pymodbus.server import StartSerialServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusServerContext, ModbusSlaveContext
from pymodbus.transaction import ModbusRtuFramer
import threading
import socket
import json

server_context = None
context_lock = threading.Lock()

def create_initial_context():
    slave_ctx = ModbusSlaveContext(
        di=ModbusSequentialDataBlock(0, [0]*100),
        co=ModbusSequentialDataBlock(0, [0]*100),
        hr=ModbusSequentialDataBlock(0, [100]*100),
        ir=ModbusSequentialDataBlock(0, [0]*100)
    )
    return ModbusServerContext(slaves=slave_ctx, single=True)

def thread_safe_update_registers(func_code, addr, value):
    global server_context, context_lock
    with context_lock:
        if func_code == 3:
            server_context[0].setValues(3, addr, [value])
        elif func_code == 1:
            server_context[0].setValues(1, addr, [value])
        print(f"已更新寄存器:FC={func_code}, Addr={addr}, Value={value}")

def run_modbus_server():
    global server_context
    server_context = create_initial_context()
    StartSerialServer(
        server_context,
        port='/dev/ttyUSB0',
        baudrate=9600,
        framer=ModbusRtuFramer,
        timeout=1
    )

def run_update_listener():
    """启动TCP监听,接收外部脚本的更新请求"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(('localhost', 5000))
    sock.listen(1)
    print("等待外部更新请求,监听端口5000...")
    while True:
        conn, addr = sock.accept()
        data = conn.recv(1024).decode('utf-8')
        if not data:
            conn.close()
            continue
        try:
            update_cmd = json.loads(data)
            thread_safe_update_registers(
                update_cmd['fc'],
                update_cmd['addr'],
                update_cmd['value']
            )
            conn.send(b"更新成功")
        except Exception as e:
            conn.send(f"错误:{str(e)}".encode('utf-8'))
        conn.close()

if __name__ == "__main__":
    server_thread = threading.Thread(target=run_modbus_server)
    server_thread.daemon = True
    server_thread.start()

    listener_thread = threading.Thread(target=run_update_listener)
    listener_thread.daemon = True
    listener_thread.start()

    input("按回车键退出服务器...\n")

第二步:外部更新脚本代码

import socket
import json

def send_modbus_update(fc, addr, value):
    """向Modbus Server发送更新请求"""
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('localhost', 5000))
    update_data = json.dumps({
        'fc': fc,
        'addr': addr,
        'value': value
    })
    sock.send(update_data.encode('utf-8'))
    response = sock.recv(1024).decode('utf-8')
    print(f"服务器响应:{response}")
    sock.close()

if __name__ == "__main__":
    # 示例:更新保持寄存器地址0的值为200
    send_modbus_update(fc=3, addr=0, value=200)
    # 可以继续发送其他更新请求

关键注意事项

  1. 必须加线程锁:pymodbus的Context不是线程安全的,读写操作必须用锁同步,否则会出现数据错乱。
  2. Server后台运行:一定要把StartSerialServer放到线程里,否则主线程被阻塞,任何后续逻辑都无法执行。
  3. 跨进程只能用IPC:外部脚本无法直接访问Server进程的内存,必须通过套接字、管道或消息队列等方式传递更新指令。

内容的提问来源于stack exchange,提问作者user_cr

火山引擎 最新活动