You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python中如何从线程内执行的异步函数返回值?

搞定异步WebSocket订阅与同步线程的数据传递问题

嘿,作为异步和线程的初学者,你遇到的这个问题其实挺典型的——既要让异步代码持续跑着拿WebSocket数据,又要把数据传给同步线程,还不能让异步循环中途挂掉。先帮你理清楚核心问题,再给你一个可运行的解决方案。

先说说你代码里的几个小坑

  1. 函数名拼写错误:SuscripcionIndividual应该是你定义的IndividualSubscriptionSuscripciones对应Subscriptions,这会直接导致代码跑不起来;
  2. Subscriptions里的while True循环每次都重新创建任务,但gather之后没处理返回值,而且如果协程return了,循环虽然会再创建任务,但这样其实是反复断开重连,不是持续接收;
  3. main里调用task.result()会阻塞主线程,因为你的Subscriptionswhile True永远不会返回,主线程会一直卡在这里,根本到不了启动function线程的步骤。

核心思路:用线程安全队列做数据桥梁

要让异步线程和同步线程安全地传递数据,最靠谱的方式就是用queue.Queue——它自带线程锁,能避免多线程下的数据竞争问题。异步代码持续接收WebSocket数据,把数据丢进队列;同步线程从队列里拿数据处理,完美解耦。

修正后的完整代码

直接给你改好的可运行代码,后面再一步步解释关键改动:

import asyncio
from threading import Thread
import websockets
import json
import time
from queue import Queue

# 线程安全队列:异步线程存数据,同步线程取数据
data_queue = Queue()

URLS = [
    "wss://stream.binance.com:9443/ws/xrpusdt@kline_1m",
    "wss://stream.binance.com:9443/ws/btcusdt@kline_1m",
]

def start_background_loop(loop: asyncio.AbstractEventLoop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def IndividualSubscription(url: str):
    """单个WebSocket的持续订阅,收到数据就丢进队列"""
    while True:
        try:
            async with websockets.connect(url) as websocket:
                print(f"已连接到 {url}")
                # 持续接收数据,直到连接断开
                while True:
                    raw_data = await websocket.recv()
                    parsed_data = json.loads(raw_data)
                    # 把URL和数据一起丢进队列,方便同步线程区分来源
                    data_queue.put((url, parsed_data))
                    print(f"\n[异步线程] 收到 {url} 数据:\n{parsed_data['k']['c']}")
        except websockets.exceptions.ConnectionClosed:
            print(f"{url} 连接断开,5秒后重试...")
            await asyncio.sleep(5)

async def Subscriptions(urls: list):
    """并发启动所有WebSocket订阅任务"""
    # 创建所有订阅协程任务
    tasks = [asyncio.create_task(IndividualSubscription(url)) for url in urls]
    # 等待所有任务持续运行(除非被手动取消)
    await asyncio.gather(*tasks)

def main():
    # 创建异步事件循环并在后台线程运行
    loop = asyncio.new_event_loop()
    async_thread = Thread(target=start_background_loop, args=(loop,), daemon=True)
    async_thread.start()
    # 启动订阅协程,这里不用result(),因为协程会一直跑
    asyncio.run_coroutine_threadsafe(Subscriptions(URLS), loop)

def sync_processing_thread():
    """同步线程的处理逻辑:从队列拿数据并做你想做的操作"""
    print("同步线程启动,等待数据中...")
    while True:
        if not data_queue.empty():
            url, data = data_queue.get()
            # 这里替换成你的同步处理逻辑
            print(f"\n[同步线程] 处理 {url} 数据:")
            print(f"当前K线收盘价: {data['k']['c']}")
            # 标记任务完成(可选,用于队列的任务跟踪)
            data_queue.task_done()
        # 加个小休眠,避免同步线程空转占CPU
        time.sleep(0.1)

if __name__ == "__main__":
    main()
    # 启动同步处理线程
    sync_thread = Thread(target=sync_processing_thread, daemon=True)
    sync_thread.start()
    
    # 让主线程保持运行(守护线程会随主线程退出)
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\n程序已终止")

关键改动详解

  1. 线程安全队列data_queue
    这是异步和同步线程之间的“数据管道”,异步代码用put()存数据,同步代码用get()取数据,队列内部已经处理了锁的问题,完全不用担心并发冲突。

  2. 持续接收WebSocket数据
    IndividualSubscription改成双层while True:外层处理重连(如果连接断开自动重试),内层持续接收数据。这样每个WebSocket连接会一直保持,不断接收新的K线数据,不会因为一次return就终止。

  3. 避免主线程阻塞
    之前用task.result()会让主线程卡死,现在直接启动协程就行——因为Subscriptions里的gather会一直等待所有订阅任务运行,不会返回,所以主线程可以继续做其他事(比如启动同步线程)。

  4. 同步线程的优雅处理
    同步线程通过检查队列是否为空来获取数据,加了time.sleep(0.1)避免一直占用CPU,处理完数据后调用task_done()是好习惯,方便后续如果需要等待队列所有任务完成的场景。

额外小技巧:处理WebSocket重连

代码里已经加了连接断开后的重试逻辑,这样如果网络波动导致WebSocket断开,会自动重新连接,不用手动重启程序。

这样调整后,你的异步WebSocket会持续获取数据,同步线程也能实时拿到这些数据进行处理,完全满足你的需求啦!

内容的提问来源于stack exchange,提问作者Abner

火山引擎 最新活动