You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python Asyncio异步协程返回值获取问题:运行时错误、结果顺序验证及优化方案咨询

Hey there! Let's break down your problems one by one and fix them up:

1. Fixing the RuntimeError: Event loop is closed issue

The error happens because manually managing the event loop can lead to race conditions with aiohttp's internal cleanup (especially in older Python versions). A much cleaner and safer approach is to use asyncio.run() (introduced in Python 3.7+), which automatically handles creating, running, and closing the event loop for you.

Another small optimization: your original fetch function creates a new ClientSession for every request. Reusing a single session is better because it maintains a connection pool, which boosts performance significantly.

Here's the revised code:

import asyncio
import aiohttp

async def fetch(session, url):  # Reuse the same session across requests
    async with session.get(url) as resp:
        assert resp.status == 200
        return await resp.text()

async def main():
    urls = ['https://python.org', 'https://google.com', 'https://amazon.com']
    async with aiohttp.ClientSession() as session:  # Single session for all requests
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    # Print results while the loop is still active (no closed loop errors)
    for url, content in zip(urls, results):
        print(f"Content from {url} (first 50 characters):\n{content[:50]}...\n")

if __name__ == "__main__":
    asyncio.run(main())  # Auto-manages the event loop

2. Does asyncio.gather() preserve the order of results?

Absolutely! This is a key feature of asyncio.gather(). It returns results in the exact same order you passed in the tasks—regardless of which task finishes first. So if you pass tasks corresponding to urls[0], urls[1], urls[2], results[0] will always be the response from urls[0], even if urls[2]'s request completes faster. You don't have to worry about mismatched results here.

3. Better implementations and when to use asyncio.Queue

Improved implementation tips

Beyond fixing the loop issue, you can make your code more robust with these tweaks:

  • Timeout handling: Prevent requests from hanging indefinitely by adding a timeout.
  • Error handling: Replace assert with proper error catching so one failed request doesn't crash the entire batch.

Example with these improvements:

import asyncio
import aiohttp
from aiohttp import ClientTimeout

async def fetch(session, url):
    try:
        async with session.get(url, timeout=ClientTimeout(total=10)) as resp:
            resp.raise_for_status()  # Handles 4xx/5xx status codes gracefully
            return url, await resp.text()
    except Exception as e:
        return url, str(e)

async def main():
    urls = ['https://python.org', 'https://google.com', 'https://amazon.com', 'https://invalid-url.example']
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    for url, content in results:
        if not content.startswith('http') and not content.startswith('Failed'):
            print(f"Successfully fetched {url} (first 50 chars):\n{content[:50]}...\n")
        else:
            print(f"Failed to fetch {url}: {content}\n")

if __name__ == "__main__":
    asyncio.run(main())

When to use asyncio.Queue?

Use a queue when:

  • You have a huge number of tasks (creating hundreds/thousands of tasks at once can eat up memory).
  • You need to control concurrency (limit how many requests are sent at once to avoid getting blocked by the target website).
  • Tasks are dynamically generated (e.g., scraping a site and adding new URLs to the queue as you go).

Here's an example using asyncio.Queue to limit concurrency to 3 requests at a time:

import asyncio
import aiohttp
from aiohttp import ClientTimeout

async def worker(session, queue):
    while True:
        url = await queue.get()
        try:
            async with session.get(url, timeout=ClientTimeout(total=10)) as resp:
                resp.raise_for_status()
                content = await resp.text()
                print(f"Successfully fetched {url} (first 50 chars):\n{content[:50]}...\n")
        except Exception as e:
            print(f"Failed to fetch {url}: {e}\n")
        finally:
            queue.task_done()  # Mark the task as completed

async def main():
    urls = ['https://python.org', 'https://google.com', 'https://amazon.com', 
            'https://github.com', 'https://stackoverflow.com']
    queue = asyncio.Queue()
    
    # Populate the queue with URLs
    for url in urls:
        await queue.put(url)
    
    concurrency_limit = 3
    async with aiohttp.ClientSession() as session:
        # Create worker tasks
        workers = [asyncio.create_task(worker(session, queue)) for _ in range(concurrency_limit)]
        # Wait until all tasks in the queue are done
        await queue.join()
        # Cancel the workers since there's no more tasks
        for worker_task in workers:
            worker_task.cancel()
        # Wait for all workers to finish canceling
        await asyncio.gather(*workers, return_exceptions=True)

if __name__ == "__main__":
    asyncio.run(main())

内容的提问来源于stack exchange,提问作者Spark

火山引擎 最新活动