Python多进程/执行器间如何共享资源?并行处理方案咨询
Hey there! Let’s tackle your problem head-on—parallelizing file processing while efficiently sharing that big data structure you need for queries and calculations. Below are the best approaches tailored to Python 3.5+, with tradeoffs and practical examples to help you choose what works best for your use case.
1. Read-Only Data? Let Workers Load Their Own Copy (Leverage Copy-on-Write)
If your large data structure is read-only (which sounds likely for query/calculation tasks), this is often the fastest and simplest approach. On Linux/macOS, Python leverages the OS’s copy-on-write (COW) mechanism: when each worker process loads the data, it doesn’t actually duplicate the memory until a process modifies it. For read-only data, this means all workers share the same physical memory.
Example with concurrent.futures.ProcessPoolExecutor
import concurrent.futures # Global variable to hold the large data in each worker large_data = None def init_worker(): """Initialize each worker with the large data structure.""" global large_data # Replace this with your actual logic to load/compute the data large_data = load_or_calculate_large_data() def process_single_file(file_path): """Process a single file using the shared large data.""" with open(file_path, 'r') as f: content = f.read() # Your query/calculation logic here result = run_query(content, large_data) return result if __name__ == '__main__': # List of all files to process file_list = ['file_01.txt', 'file_02.txt', ..., 'file_1000.txt'] # Create a process pool, initializing each worker with the data with concurrent.futures.ProcessPoolExecutor(initializer=init_worker) as executor: # Map file paths to the processing function results = list(executor.map(process_single_file, file_list)) # Do something with the results here print(f"Processed {len(results)} files successfully!")
Pros: No inter-process communication (IPC) overhead, blazing fast for read-only tasks.
Cons: On Windows, COW isn’t supported—each worker will duplicate the data in memory, which could be a problem if the data is extremely large.
2. Shared Mutable Data (or Windows Compatibility): Use multiprocessing.Manager
If you need to modify the shared data structure across processes, or you’re on Windows and can’t afford duplicate memory, multiprocessing.Manager creates proxy objects that are shared across processes. These proxies handle IPC under the hood, but note that they add a small overhead for each access.
Example with Shared Dictionary
from multiprocessing import Pool, Manager def process_file(file_path, shared_data, lock): """Process a file and update shared data (with lock for thread safety).""" with open(file_path, 'r') as f: content = f.read() # Query the shared data value = shared_data.get(content_key) # Calculate result result = compute_with_value(value) # If modifying shared data, use a lock to avoid race conditions with lock: shared_data['results'].append(result) if __name__ == '__main__': # Create a manager to hold shared objects with Manager() as manager: # Create shared data structure and lock shared_large_data = manager.dict(load_large_data()) shared_results = manager.list() lock = manager.Lock() file_list = ['file_01.txt', 'file_02.txt', ...] # Create a pool and pass shared objects to each task with Pool() as pool: pool.starmap(process_file, [(f, shared_large_data, lock) for f in file_list]) # Access the final results print(f"All results: {list(shared_results)}")
Pros: Works across all platforms, supports mutable shared data, easy to implement.
Cons: IPC overhead can slow down frequent accesses—best for tasks where you don’t query the shared data thousands of times per file.
3. Ultra-Fast Shared Memory (Python 3.8+)
For large, structured data (like NumPy arrays, matrices, or byte streams), the shared_memory module (added in Python 3.8) lets you directly share memory blocks between processes, eliminating IPC overhead entirely. This is ideal for performance-critical tasks.
Example with Shared NumPy Array
import concurrent.futures import numpy as np from multiprocessing import shared_memory def process_file(file_path, shm_name, shm_shape, shm_dtype): """Access the shared NumPy array and process the file.""" # Attach to the existing shared memory block existing_shm = shared_memory.SharedMemory(name=shm_name) # Reconstruct the NumPy array from the shared memory large_array = np.ndarray(shm_shape, dtype=shm_dtype, buffer=existing_shm.buf) # Process file using large_array with open(file_path, 'r') as f: content = f.read() idx = parse_index_from_content(content) result = large_array[idx] * some_calculation() # Clean up: close the shared memory (don't unlink yet!) existing_shm.close() return result if __name__ == '__main__': # Create your large NumPy array large_array = np.random.rand(10_000_000) # Example large array # Create a shared memory block for the array shm = shared_memory.SharedMemory(create=True, size=large_array.nbytes) # Create a new array that uses the shared memory buffer shared_array = np.ndarray(large_array.shape, dtype=large_array.dtype, buffer=shm.buf) # Copy data to the shared array shared_array[:] = large_array[:] file_list = ['file_01.txt', 'file_02.txt', ...] with concurrent.futures.ProcessPoolExecutor() as executor: # Pass shared memory details to each worker results = list(executor.map(process_file, file_list, [shm.name]*len(file_list), [large_array.shape]*len(file_list), [large_array.dtype]*len(file_list))) # Clean up: unlink the shared memory block (only after all workers are done) shm.close() shm.unlink()
Pros: Near-native memory access speed, no IPC overhead, perfect for large numerical data.
Cons: Requires manual memory management, only works with structured data, Python 3.8+.
Key Recommendations to Choose the Right Approach
- Read-only, Linux/macOS: Go with the worker-initiated copy (Option 1) for maximum speed.
- Mutable data or Windows: Use
multiprocessing.Manager(Option 2) for simplicity. - Large structured data (NumPy, etc.): Use
shared_memory(Option 3) for top performance. - Batch file processing: Split your file list into chunks instead of processing one file at a time to reduce task scheduling overhead.
内容的提问来源于stack exchange,提问作者jpp1




