Python多进程读取串口数据遇Serial Exception及运行问题求助
解决多进程串口数据写入CSV的问题
看起来你遇到了两个核心问题:跨进程传递串口对象引发的Serial Exception,以及IDLE运行失败、终端窗口闪退。我来一步步帮你解决:
一、Serial Exception的根源与修复
你遇到的Serial Exception本质是:serial.Serial对象无法被Python的pickle模块序列化,而multiprocessing的队列(Queue)依赖序列化来跨进程传递对象。直接把主进程创建的串口对象传到子进程,必然会报错。
正确的做法是:在负责读取串口的子进程内部创建串口连接,而不是在主进程创建后传递。这样每个进程拥有独立的串口句柄,避免序列化问题。
二、IDLE无法运行+终端窗口闪退的解决
- IDLE兼容性问题:IDLE的运行环境和标准终端有差异,尤其是Windows系统下,它的事件循环和multiprocessing的
spawn启动方式(Windows默认)会冲突,导致多进程代码无法正常运行。建议直接用系统终端(cmd/PowerShell)运行脚本。 - 终端窗口闪退:如果是双击脚本运行,报错后窗口会立刻关闭。解决方法有两个:
- 在代码末尾添加
input("Press Enter to exit..."),让窗口在报错后停留; - 手动打开终端,cd到脚本所在目录,用
python your_script.py命令运行,这样报错信息会保留在终端里。
- 在代码末尾添加
三、优化后的完整代码示例
下面是调整后的代码,采用“串口读取进程+CSV写入进程”的架构,用队列传递数据,同时解决上述问题:
import multiprocessing import time import datetime import serial import csv def serial_reader(queue, port='COM3', baudrate=9600): """子进程:读取串口数据,放入队列""" try: # 在子进程内部创建串口连接 fio2_ser = serial.Serial(port, baudrate, timeout=1) print(f"串口已连接:{port}") while True: # 读取串口数据(根据你的设备输出格式调整) data = fio2_ser.readline().decode('utf-8').strip() if data: queue.put(data) time.sleep(0.1) # 避免过度占用CPU except serial.SerialException as e: print(f"串口错误:{e}") finally: if 'fio2_ser' in locals() and fio2_ser.is_open: fio2_ser.close() print("串口已关闭") def csv_writer(queue, interval=2): """子进程:按固定间隔从队列取数据,写入CSV""" # 初始化CSV文件,写入表头 with open('device_data.csv', 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f) # 检查文件是否为空,为空则写表头 f.seek(0, 2) if f.tell() == 0: writer.writerow(['时间', '数据']) last_write_time = time.time() while True: current_time = time.time() # 每interval秒写入一次(取队列中最新的数据,或者累计数据,按需调整) if current_time - last_write_time >= interval: # 读取队列中所有数据(这里取最后一条,你可以改成累计所有数据) latest_data = None while not queue.empty(): latest_data = queue.get() if latest_data: timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') writer.writerow([timestamp, latest_data]) f.flush() # 立即写入磁盘,避免缓存 print(f"已写入数据:{timestamp} - {latest_data}") last_write_time = current_time time.sleep(0.1) if __name__ == '__main__': # 创建队列,用于进程间通信 data_queue = multiprocessing.Queue() # 创建并启动子进程 serial_process = multiprocessing.Process(target=serial_reader, args=(data_queue,)) csv_process = multiprocessing.Process(target=csv_writer, args=(data_queue,)) serial_process.start() csv_process.start() try: # 主进程等待子进程结束(按Ctrl+C终止) serial_process.join() csv_process.join() except KeyboardInterrupt: print("程序正在终止...") serial_process.terminate() csv_process.terminate() # 等待进程退出 serial_process.join() csv_process.join() finally: print("程序已退出") # 让终端窗口停留(仅双击运行时有效) input("Press Enter to exit...")
代码说明:
- 把串口初始化放在
serial_reader子进程里,避免跨进程传递串口对象; csv_writer子进程按2秒间隔从队列取数据,这里取的是队列中最新的一条数据(如果你的设备输出频繁,这样可以保证每2秒写入最新值;如果需要累计所有数据,可以去掉循环取队列的逻辑,直接取一条);- 添加了异常处理和资源释放(关闭串口、终止进程);
- 末尾的
input()解决终端窗口闪退问题; - 必须放在
if __name__ == '__main__':块中,这是Windows下多进程代码的强制要求。
额外提示
- 运行前请修改串口参数(
port和baudrate)为你的设备实际参数; - 如果需要调试,可以在关键位置添加
print()输出,或者用logging模块记录日志; - 如果CSV写入有延迟,确保调用了
f.flush()强制写入磁盘。
内容的提问来源于stack exchange,提问作者Shawn




