Python中如何从线程内执行的异步函数返回值?
嘿,作为异步和线程的初学者,你遇到的这个问题其实挺典型的——既要让异步代码持续跑着拿WebSocket数据,又要把数据传给同步线程,还不能让异步循环中途挂掉。先帮你理清楚核心问题,再给你一个可运行的解决方案。
先说说你代码里的几个小坑
- 函数名拼写错误:
SuscripcionIndividual应该是你定义的IndividualSubscription,Suscripciones对应Subscriptions,这会直接导致代码跑不起来; Subscriptions里的while True循环每次都重新创建任务,但gather之后没处理返回值,而且如果协程return了,循环虽然会再创建任务,但这样其实是反复断开重连,不是持续接收;main里调用task.result()会阻塞主线程,因为你的Subscriptions是while True永远不会返回,主线程会一直卡在这里,根本到不了启动function线程的步骤。
核心思路:用线程安全队列做数据桥梁
要让异步线程和同步线程安全地传递数据,最靠谱的方式就是用queue.Queue——它自带线程锁,能避免多线程下的数据竞争问题。异步代码持续接收WebSocket数据,把数据丢进队列;同步线程从队列里拿数据处理,完美解耦。
修正后的完整代码
直接给你改好的可运行代码,后面再一步步解释关键改动:
import asyncio from threading import Thread import websockets import json import time from queue import Queue # 线程安全队列:异步线程存数据,同步线程取数据 data_queue = Queue() URLS = [ "wss://stream.binance.com:9443/ws/xrpusdt@kline_1m", "wss://stream.binance.com:9443/ws/btcusdt@kline_1m", ] def start_background_loop(loop: asyncio.AbstractEventLoop): asyncio.set_event_loop(loop) loop.run_forever() async def IndividualSubscription(url: str): """单个WebSocket的持续订阅,收到数据就丢进队列""" while True: try: async with websockets.connect(url) as websocket: print(f"已连接到 {url}") # 持续接收数据,直到连接断开 while True: raw_data = await websocket.recv() parsed_data = json.loads(raw_data) # 把URL和数据一起丢进队列,方便同步线程区分来源 data_queue.put((url, parsed_data)) print(f"\n[异步线程] 收到 {url} 数据:\n{parsed_data['k']['c']}") except websockets.exceptions.ConnectionClosed: print(f"{url} 连接断开,5秒后重试...") await asyncio.sleep(5) async def Subscriptions(urls: list): """并发启动所有WebSocket订阅任务""" # 创建所有订阅协程任务 tasks = [asyncio.create_task(IndividualSubscription(url)) for url in urls] # 等待所有任务持续运行(除非被手动取消) await asyncio.gather(*tasks) def main(): # 创建异步事件循环并在后台线程运行 loop = asyncio.new_event_loop() async_thread = Thread(target=start_background_loop, args=(loop,), daemon=True) async_thread.start() # 启动订阅协程,这里不用result(),因为协程会一直跑 asyncio.run_coroutine_threadsafe(Subscriptions(URLS), loop) def sync_processing_thread(): """同步线程的处理逻辑:从队列拿数据并做你想做的操作""" print("同步线程启动,等待数据中...") while True: if not data_queue.empty(): url, data = data_queue.get() # 这里替换成你的同步处理逻辑 print(f"\n[同步线程] 处理 {url} 数据:") print(f"当前K线收盘价: {data['k']['c']}") # 标记任务完成(可选,用于队列的任务跟踪) data_queue.task_done() # 加个小休眠,避免同步线程空转占CPU time.sleep(0.1) if __name__ == "__main__": main() # 启动同步处理线程 sync_thread = Thread(target=sync_processing_thread, daemon=True) sync_thread.start() # 让主线程保持运行(守护线程会随主线程退出) try: while True: time.sleep(1) except KeyboardInterrupt: print("\n程序已终止")
关键改动详解
线程安全队列
data_queue
这是异步和同步线程之间的“数据管道”,异步代码用put()存数据,同步代码用get()取数据,队列内部已经处理了锁的问题,完全不用担心并发冲突。持续接收WebSocket数据
把IndividualSubscription改成双层while True:外层处理重连(如果连接断开自动重试),内层持续接收数据。这样每个WebSocket连接会一直保持,不断接收新的K线数据,不会因为一次return就终止。避免主线程阻塞
之前用task.result()会让主线程卡死,现在直接启动协程就行——因为Subscriptions里的gather会一直等待所有订阅任务运行,不会返回,所以主线程可以继续做其他事(比如启动同步线程)。同步线程的优雅处理
同步线程通过检查队列是否为空来获取数据,加了time.sleep(0.1)避免一直占用CPU,处理完数据后调用task_done()是好习惯,方便后续如果需要等待队列所有任务完成的场景。
额外小技巧:处理WebSocket重连
代码里已经加了连接断开后的重试逻辑,这样如果网络波动导致WebSocket断开,会自动重新连接,不用手动重启程序。
这样调整后,你的异步WebSocket会持续获取数据,同步线程也能实时拿到这些数据进行处理,完全满足你的需求啦!
内容的提问来源于stack exchange,提问作者Abner




