You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python中需实时数据交互的跨模块函数并行执行实现方案问询

Hey there! Let's break down how to solve this problem. You need to run loop() and concurrent() in parallel, pass real-time data between them without direct function calls, and keep them separated into different modules/classes. Here are two reliable approaches that fit your requirements:


Approach 1: Threading + Queue (Great for I/O-bound Tasks)

Since your functions need bidirectional real-time communication—loop() sends data to concurrent() and waits for its result—we can use threading (lightweight for I/O work) paired with queue.Queue to pass data safely between threads. This keeps the functions fully decoupled, so you can easily move them to separate modules or classes later.

import threading
import queue

# Create two queues for bidirectional communication
task_queue = queue.Queue()  # For sending attribute_of_A from loop to concurrent
result_queue = queue.Queue()  # For sending stuff_needed_in_A back to loop

def loop():
    test = 0
    while test < 3:
        test += 1
        print('doing A stuff')
        # Send real-time test value to the concurrent worker
        task_queue.put(test)
        # Wait for the result from concurrent before continuing
        stuff_needed_in_A = result_queue.get()
        print(stuff_needed_in_A)

def concurrent_worker():
    # Run indefinitely until the main thread exits (daemon=True handles this)
    while True:
        # Wait for incoming data from loop
        attribute_of_A = task_queue.get()
        print('doing B stuff')
        # Process the data
        stuff_needed_in_A = 2 * attribute_of_A
        # Send result back to loop
        result_queue.put(stuff_needed_in_A)
        # Mark the task as completed (for queue tracking)
        task_queue.task_done()

if __name__ == "__main__":
    # Start the concurrent function in a background thread
    worker_thread = threading.Thread(target=concurrent_worker, daemon=True)
    worker_thread.start()
    
    # Run the main loop
    loop()
    # Wait for all remaining tasks to finish
    task_queue.join()

How this meets your requirements:

  • Real-time data transfer: No storage needed—data is passed directly via queues as soon as it's generated in loop().
  • Result dependency: loop() blocks on result_queue.get() until it receives the result from concurrent(), ensuring each iteration gets the required value.
  • Decoupling: loop() and concurrent_worker() never call each other directly. You can move them to separate files/classes and just pass the queues as parameters.

Approach 2: Multiprocessing + Queue (For CPU-bound Tasks)

If concurrent() does heavy CPU work, Python's Global Interpreter Lock (GIL) will limit threading performance. In this case, use multiprocessing instead—each process has its own Python interpreter, so it can run in parallel on multiple cores.

import multiprocessing

def loop(task_queue, result_queue):
    test = 0
    while test < 3:
        test += 1
        print('doing A stuff')
        task_queue.put(test)
        stuff_needed_in_A = result_queue.get()
        print(stuff_needed_in_A)

def concurrent_worker(task_queue, result_queue):
    # Run until we receive a termination signal
    while True:
        attribute_of_A = task_queue.get()
        # Exit if we get a None signal
        if attribute_of_A is None:
            break
        print('doing B stuff')
        stuff_needed_in_A = 2 * attribute_of_A
        result_queue.put(stuff_needed_in_A)

if __name__ == "__main__":
    # Multiprocessing queues work across process boundaries
    task_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()
    
    # Start the concurrent function in a separate process
    worker_process = multiprocessing.Process(target=concurrent_worker, args=(task_queue, result_queue))
    worker_process.start()
    
    # Run the main loop
    loop(task_queue, result_queue)
    
    # Send termination signal to the worker process
    task_queue.put(None)
    # Wait for the worker to finish cleanly
    worker_process.join()

Key differences from threading:

  • Bypasses the GIL for true parallelism on CPU-heavy tasks.
  • Data passed through queues is serialized (pickled), so make sure any custom objects you pass are pickle-compatible.
  • Requires an explicit termination signal (like None) to stop the worker process, since daemon processes don't exit as cleanly as threads.

Key Notes & Alternatives

  • If you don't need loop() to wait for concurrent()'s result before starting the next iteration, you could use asyncio for asynchronous I/O. But since your requirement specifies needing the result per iteration, the queue-based thread/process approach is the most straightforward.
  • If you want to organize this into classes, you can wrap each function in a class and inject the queues via the constructor. For example:
    class LoopRunner:
        def __init__(self, task_queue, result_queue):
            self.task_queue = task_queue
            self.result_queue = result_queue
        
        def run(self):
            # ... loop logic here ...
    
    class ConcurrentWorker:
        def __init__(self, task_queue, result_queue):
            self.task_queue = task_queue
            self.result_queue = result_queue
        
        def run(self):
            # ... concurrent logic here ...
    

内容的提问来源于stack exchange,提问作者Eduard Barnoviciu

火山引擎 最新活动