You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

基于Python连接WebSocket及订阅主题的示例查询

Python WebSocket 客户端:连接、订阅主题及发送消息示例

嗨,我明白你从Java转Python时找不到适配WebSocket主题订阅+消息收发示例的困扰,下面给你整理了两个最常用Python库的完整示例,直接就能适配你的中间件场景:


1. 用 websockets 库(异步推荐,性能更优)

这个是Python官方生态里推荐的异步WebSocket库,适合高并发场景,代码结构清晰:

首先安装库:

pip install websockets

完整示例代码(包含连接、订阅主题、发送消息、接收推送):

import asyncio
import websockets
import json

async def websocket_client():
    # 替换成你的中间件WebSocket地址
    ws_url = "ws://your-middleware-host:port/ws"
    # 要订阅的目标主题
    target_topic = "your-specific-topic"

    try:
        # 建立WebSocket连接
        async with websockets.connect(ws_url) as websocket:
            print(f"成功连接到WebSocket服务器: {ws_url}")

            # 发送订阅主题请求(注意:这里的格式要和你的中间件要求匹配,通常是JSON)
            subscribe_msg = json.dumps({
                "action": "subscribe",
                "topic": target_topic
            })
            await websocket.send(subscribe_msg)
            print(f"已发送订阅请求,主题: {target_topic}")

            # 模拟发送业务数据(同样格式要匹配中间件要求)
            send_data = json.dumps({
                "action": "publish",
                "topic": target_topic,
                "data": {
                    "key1": "value1",
                    "key2": 12345
                }
            })
            await websocket.send(send_data)
            print(f"已发送消息到主题: {target_topic}")

            # 持续接收服务器推送的消息(如果不需要可以去掉这个循环)
            async for message in websocket:
                print(f"收到主题推送消息: {message}")

    except websockets.exceptions.ConnectionClosed:
        print("WebSocket连接意外关闭")
    except Exception as e:
        print(f"发生错误: {str(e)}")

# 运行异步客户端
asyncio.run(websocket_client())

2. 用 websocket-client 库(同步场景适用)

如果你的客户端是简单的同步逻辑,这个库更易上手:

首先安装库:

pip install websocket-client

完整示例代码:

import websocket
import json

def on_message(ws, message):
    # 处理收到的推送消息
    print(f"收到主题推送消息: {message}")

def on_error(ws, error):
    print(f"发生错误: {str(error)}")

def on_close(ws, close_status_code, close_msg):
    print(f"WebSocket连接关闭,状态码: {close_status_code}, 消息: {close_msg}")

def on_open(ws):
    print("成功连接到WebSocket服务器")
    # 订阅主题
    target_topic = "your-specific-topic"
    subscribe_msg = json.dumps({
        "action": "subscribe",
        "topic": target_topic
    })
    ws.send(subscribe_msg)
    print(f"已订阅主题: {target_topic}")

    # 发送业务数据
    send_data = json.dumps({
        "action": "publish",
        "topic": target_topic,
        "data": {"content": "来自Python客户端的测试数据"}
    })
    ws.send(send_data)
    print(f"已发送消息到主题: {target_topic}")

if __name__ == "__main__":
    # 替换成你的中间件地址
    ws_url = "ws://your-middleware-host:port/ws"
    ws = websocket.WebSocketApp(
        ws_url,
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )
    # 持续运行客户端
    ws.run_forever()

关键注意事项

  • 主题订阅格式: 上面示例里的subscribe消息格式是通用模板,你需要根据你的中间件实际要求调整(比如有的中间件用特定的指令字符串,而非JSON)
  • 异常处理: 生产环境建议加上重连逻辑,避免连接断开后无法恢复
  • 心跳机制: 如果中间件有心跳要求,定时发送ping消息保持连接(websockets库有自动ping功能,websocket-client可以手动实现)

内容的提问来源于stack exchange,提问作者Agustn Ernesto Cardeilhac Bans

火山引擎 最新活动