基于Python连接WebSocket及订阅主题的示例查询
Python WebSocket 客户端:连接、订阅主题及发送消息示例
嗨,我明白你从Java转Python时找不到适配WebSocket主题订阅+消息收发示例的困扰,下面给你整理了两个最常用Python库的完整示例,直接就能适配你的中间件场景:
1. 用 websockets 库(异步推荐,性能更优)
这个是Python官方生态里推荐的异步WebSocket库,适合高并发场景,代码结构清晰:
首先安装库:
pip install websockets
完整示例代码(包含连接、订阅主题、发送消息、接收推送):
import asyncio import websockets import json async def websocket_client(): # 替换成你的中间件WebSocket地址 ws_url = "ws://your-middleware-host:port/ws" # 要订阅的目标主题 target_topic = "your-specific-topic" try: # 建立WebSocket连接 async with websockets.connect(ws_url) as websocket: print(f"成功连接到WebSocket服务器: {ws_url}") # 发送订阅主题请求(注意:这里的格式要和你的中间件要求匹配,通常是JSON) subscribe_msg = json.dumps({ "action": "subscribe", "topic": target_topic }) await websocket.send(subscribe_msg) print(f"已发送订阅请求,主题: {target_topic}") # 模拟发送业务数据(同样格式要匹配中间件要求) send_data = json.dumps({ "action": "publish", "topic": target_topic, "data": { "key1": "value1", "key2": 12345 } }) await websocket.send(send_data) print(f"已发送消息到主题: {target_topic}") # 持续接收服务器推送的消息(如果不需要可以去掉这个循环) async for message in websocket: print(f"收到主题推送消息: {message}") except websockets.exceptions.ConnectionClosed: print("WebSocket连接意外关闭") except Exception as e: print(f"发生错误: {str(e)}") # 运行异步客户端 asyncio.run(websocket_client())
2. 用 websocket-client 库(同步场景适用)
如果你的客户端是简单的同步逻辑,这个库更易上手:
首先安装库:
pip install websocket-client
完整示例代码:
import websocket import json def on_message(ws, message): # 处理收到的推送消息 print(f"收到主题推送消息: {message}") def on_error(ws, error): print(f"发生错误: {str(error)}") def on_close(ws, close_status_code, close_msg): print(f"WebSocket连接关闭,状态码: {close_status_code}, 消息: {close_msg}") def on_open(ws): print("成功连接到WebSocket服务器") # 订阅主题 target_topic = "your-specific-topic" subscribe_msg = json.dumps({ "action": "subscribe", "topic": target_topic }) ws.send(subscribe_msg) print(f"已订阅主题: {target_topic}") # 发送业务数据 send_data = json.dumps({ "action": "publish", "topic": target_topic, "data": {"content": "来自Python客户端的测试数据"} }) ws.send(send_data) print(f"已发送消息到主题: {target_topic}") if __name__ == "__main__": # 替换成你的中间件地址 ws_url = "ws://your-middleware-host:port/ws" ws = websocket.WebSocketApp( ws_url, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close ) # 持续运行客户端 ws.run_forever()
关键注意事项
- 主题订阅格式: 上面示例里的
subscribe消息格式是通用模板,你需要根据你的中间件实际要求调整(比如有的中间件用特定的指令字符串,而非JSON) - 异常处理: 生产环境建议加上重连逻辑,避免连接断开后无法恢复
- 心跳机制: 如果中间件有心跳要求,定时发送ping消息保持连接(
websockets库有自动ping功能,websocket-client可以手动实现)
内容的提问来源于stack exchange,提问作者Agustn Ernesto Cardeilhac Bans




