Python中需实时数据交互的跨模块函数并行执行实现方案问询
Hey there! Let's break down how to solve this problem. You need to run loop() and concurrent() in parallel, pass real-time data between them without direct function calls, and keep them separated into different modules/classes. Here are two reliable approaches that fit your requirements:
Approach 1: Threading + Queue (Great for I/O-bound Tasks)
Since your functions need bidirectional real-time communication—loop() sends data to concurrent() and waits for its result—we can use threading (lightweight for I/O work) paired with queue.Queue to pass data safely between threads. This keeps the functions fully decoupled, so you can easily move them to separate modules or classes later.
import threading import queue # Create two queues for bidirectional communication task_queue = queue.Queue() # For sending attribute_of_A from loop to concurrent result_queue = queue.Queue() # For sending stuff_needed_in_A back to loop def loop(): test = 0 while test < 3: test += 1 print('doing A stuff') # Send real-time test value to the concurrent worker task_queue.put(test) # Wait for the result from concurrent before continuing stuff_needed_in_A = result_queue.get() print(stuff_needed_in_A) def concurrent_worker(): # Run indefinitely until the main thread exits (daemon=True handles this) while True: # Wait for incoming data from loop attribute_of_A = task_queue.get() print('doing B stuff') # Process the data stuff_needed_in_A = 2 * attribute_of_A # Send result back to loop result_queue.put(stuff_needed_in_A) # Mark the task as completed (for queue tracking) task_queue.task_done() if __name__ == "__main__": # Start the concurrent function in a background thread worker_thread = threading.Thread(target=concurrent_worker, daemon=True) worker_thread.start() # Run the main loop loop() # Wait for all remaining tasks to finish task_queue.join()
How this meets your requirements:
- Real-time data transfer: No storage needed—data is passed directly via queues as soon as it's generated in
loop(). - Result dependency:
loop()blocks onresult_queue.get()until it receives the result fromconcurrent(), ensuring each iteration gets the required value. - Decoupling:
loop()andconcurrent_worker()never call each other directly. You can move them to separate files/classes and just pass the queues as parameters.
Approach 2: Multiprocessing + Queue (For CPU-bound Tasks)
If concurrent() does heavy CPU work, Python's Global Interpreter Lock (GIL) will limit threading performance. In this case, use multiprocessing instead—each process has its own Python interpreter, so it can run in parallel on multiple cores.
import multiprocessing def loop(task_queue, result_queue): test = 0 while test < 3: test += 1 print('doing A stuff') task_queue.put(test) stuff_needed_in_A = result_queue.get() print(stuff_needed_in_A) def concurrent_worker(task_queue, result_queue): # Run until we receive a termination signal while True: attribute_of_A = task_queue.get() # Exit if we get a None signal if attribute_of_A is None: break print('doing B stuff') stuff_needed_in_A = 2 * attribute_of_A result_queue.put(stuff_needed_in_A) if __name__ == "__main__": # Multiprocessing queues work across process boundaries task_queue = multiprocessing.Queue() result_queue = multiprocessing.Queue() # Start the concurrent function in a separate process worker_process = multiprocessing.Process(target=concurrent_worker, args=(task_queue, result_queue)) worker_process.start() # Run the main loop loop(task_queue, result_queue) # Send termination signal to the worker process task_queue.put(None) # Wait for the worker to finish cleanly worker_process.join()
Key differences from threading:
- Bypasses the GIL for true parallelism on CPU-heavy tasks.
- Data passed through queues is serialized (pickled), so make sure any custom objects you pass are pickle-compatible.
- Requires an explicit termination signal (like
None) to stop the worker process, since daemon processes don't exit as cleanly as threads.
Key Notes & Alternatives
- If you don't need
loop()to wait forconcurrent()'s result before starting the next iteration, you could useasynciofor asynchronous I/O. But since your requirement specifies needing the result per iteration, the queue-based thread/process approach is the most straightforward. - If you want to organize this into classes, you can wrap each function in a class and inject the queues via the constructor. For example:
class LoopRunner: def __init__(self, task_queue, result_queue): self.task_queue = task_queue self.result_queue = result_queue def run(self): # ... loop logic here ... class ConcurrentWorker: def __init__(self, task_queue, result_queue): self.task_queue = task_queue self.result_queue = result_queue def run(self): # ... concurrent logic here ...
内容的提问来源于stack exchange,提问作者Eduard Barnoviciu




