You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何基于任务完成时间重排aiostream异步序列

解决aiostream异步序列按任务完成时间重排的问题

刚好,aiostream.stream.map本身就提供了参数来实现这个需求——只需要添加ordered=False,就能让结果按照异步任务完成的先后顺序返回,而不是原始输入的顺序。

修正后的完整代码

import aiostream
import asyncio

async def async_func(x):
    await asyncio.sleep(1 - 0.2 * x)
    return x

async def main():
    # 若期望输出为[5,4,3,2,1],需将迭代范围改为range(1,6)
    s = aiostream.stream.iterate(range(1, 6))
    # 核心:开启ordered=False参数
    s_reordered = aiostream.stream.map(s, async_func, ordered=False)
    result = await aiostream.stream.list(s_reordered)
    print(result)  # 输出: [5,4,3,2,1]

await main()

为什么你之前的尝试没达到预期?

  • aiostream.stream.merge:这个工具是用来合并多个独立的异步流的,并不支持直接处理单个aiostream异步序列的元素重排,而且它本身默认按流的顺序合并,不符合你的需求。
  • aiostream.stream.flatten:它的作用是展开嵌套的异步序列(比如包含多个子流的流),对于普通的元素流来说,它不会改变元素的返回顺序,无法实现按任务完成时间排序。
  • stream.just+flattenstream.map默认是ordered=True,也就是串行执行任务,等待前一个任务完成后再处理下一个,所以即使你用just把每个元素包装成单独的流,最终展开后还是会保持原始输入的顺序,自然得不到你想要的结果。

内容的提问来源于stack exchange,提问作者Alexey Tochin

火山引擎 最新活动