You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

多进程中使用AsyncIO遇RuntimeError及AttributeError问题求助

Hey there, let's work through your AsyncIO + multiprocessing issues step by step—this kind of mix-up is super common, especially with Python 3.7's slightly stricter event loop rules!

First, Fix the "no running event loop" Error

Python 3.7 doesn't automatically create an event loop in all contexts (especially not in child spawned processes). When you switch from threads to AsyncIO in a multiprocessing worker, you need to explicitly initialize and run the event loop inside that process.

Instead of trying to call async functions directly in your process's run() method, do this:

import asyncio

class YourWorkerProcess(Process):
    # ... your __init__ and other methods ...

    def run(self):
        # Create a fresh event loop for this child process (critical—don't share loops between processes!)
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        
        try:
            # Run your async task until it completes
            loop.run_until_complete(self._get_filter_collateral())
        finally:
            # Clean up the loop when done
            loop.close()

If you prefer using Python 3.7's asyncio.run() (which handles loop creation/cleanup for you), you can replace the loop code with:

asyncio.run(self._get_filter_collateral())

Just make sure this is called inside the child process's run() method—never rely on a loop created in the parent process.

Next, Fix the "coroutine was not awaited" Warning

This happens when you call an async function without either await-ing it or passing it to the event loop. For example:

  • Bad: self._some_async_method() (returns a coroutine object but doesn't execute it)
  • Good: await self._some_async_method() (inside another async function)
  • Good: asyncio.create_task(self._some_async_method()) (adds to the loop, but you still need to ensure the loop runs long enough for it to finish)

Double-check every async function call in _get_filter_collateral()—make sure they're either awaited or properly scheduled as tasks that the event loop will process.

Finally, Fix the "Namespace has no attribute 'dfs'" Error

This is likely a timing or initialization issue with your multiprocessing Namespace object:

  1. Ensure the ns object is properly passed to the child process: In your process's __init__, accept the ns as a parameter and store it as an instance variable (like self.ns = ns).
  2. Wait for ns.dfs to be initialized before accessing it: If the parent process sets ns.dfs after starting the child, your async task might try to access it too early. Either initialize ns.dfs before starting the process, or add a check in your async method (e.g., using asyncio.sleep() in a loop until hasattr(self.ns, 'dfs') is True).

Example Integrated Code Snippet

Here's how your class might look with all fixes applied:

from multiprocessing import Process, Manager
import asyncio

class CollateralFilterProcess(Process):
    def __init__(self, ns):
        super().__init__()
        self.ns = ns  # Pass the shared Namespace from parent

    async def _get_filter_collateral(self):
        # Wait for ns.dfs to be initialized if needed
        while not hasattr(self.ns, 'dfs'):
            await asyncio.sleep(0.1)
        
        # Your async IO logic here (use aiohttp/aiofiles for async IO operations!)
        filtered_data = await self._async_process_data(self.ns.dfs)
        self.ns.filtered_collateral = filtered_data

    async def _async_process_data(self, raw_data):
        # Example async task (replace with your actual IO work)
        await asyncio.sleep(1)  # Simulate IO delay
        return [item for item in raw_data if item['value'] > 100]

    def run(self):
        asyncio.run(self._get_filter_collateral())

# In app.py entry point
if __name__ == "__main__":
    with Manager() as manager:
        ns = manager.Namespace()
        # Initialize ns.dfs BEFORE starting the process
        ns.dfs = [{'id': 1, 'value': 50}, {'id': 2, 'value': 150}, {'id': 3, 'value': 200}]
        
        proc = CollateralFilterProcess(ns)
        proc.start()
        proc.join()
        
        print("Filtered data:", ns.filtered_collateral)

One last tip: For IO-bound tasks, make sure you're using async-compatible libraries (like aiohttp instead of requests, aiofiles instead of standard open()). Using blocking IO calls inside async functions will freeze the event loop and negate any performance gains.

内容的提问来源于stack exchange,提问作者Francisco

火山引擎 最新活动