You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python多进程/执行器间如何共享资源?并行处理方案咨询

Parallelizing Your File-Processing Program in Python 3.5+ (With Shared Large Data Structures)

Hey there! Let’s tackle your problem head-on—parallelizing file processing while efficiently sharing that big data structure you need for queries and calculations. Below are the best approaches tailored to Python 3.5+, with tradeoffs and practical examples to help you choose what works best for your use case.

1. Read-Only Data? Let Workers Load Their Own Copy (Leverage Copy-on-Write)

If your large data structure is read-only (which sounds likely for query/calculation tasks), this is often the fastest and simplest approach. On Linux/macOS, Python leverages the OS’s copy-on-write (COW) mechanism: when each worker process loads the data, it doesn’t actually duplicate the memory until a process modifies it. For read-only data, this means all workers share the same physical memory.

Example with concurrent.futures.ProcessPoolExecutor

import concurrent.futures

# Global variable to hold the large data in each worker
large_data = None

def init_worker():
    """Initialize each worker with the large data structure."""
    global large_data
    # Replace this with your actual logic to load/compute the data
    large_data = load_or_calculate_large_data()

def process_single_file(file_path):
    """Process a single file using the shared large data."""
    with open(file_path, 'r') as f:
        content = f.read()
        # Your query/calculation logic here
        result = run_query(content, large_data)
    return result

if __name__ == '__main__':
    # List of all files to process
    file_list = ['file_01.txt', 'file_02.txt', ..., 'file_1000.txt']
    
    # Create a process pool, initializing each worker with the data
    with concurrent.futures.ProcessPoolExecutor(initializer=init_worker) as executor:
        # Map file paths to the processing function
        results = list(executor.map(process_single_file, file_list))
    
    # Do something with the results here
    print(f"Processed {len(results)} files successfully!")

Pros: No inter-process communication (IPC) overhead, blazing fast for read-only tasks.
Cons: On Windows, COW isn’t supported—each worker will duplicate the data in memory, which could be a problem if the data is extremely large.

2. Shared Mutable Data (or Windows Compatibility): Use multiprocessing.Manager

If you need to modify the shared data structure across processes, or you’re on Windows and can’t afford duplicate memory, multiprocessing.Manager creates proxy objects that are shared across processes. These proxies handle IPC under the hood, but note that they add a small overhead for each access.

Example with Shared Dictionary

from multiprocessing import Pool, Manager

def process_file(file_path, shared_data, lock):
    """Process a file and update shared data (with lock for thread safety)."""
    with open(file_path, 'r') as f:
        content = f.read()
        # Query the shared data
        value = shared_data.get(content_key)
        # Calculate result
        result = compute_with_value(value)
        
        # If modifying shared data, use a lock to avoid race conditions
        with lock:
            shared_data['results'].append(result)

if __name__ == '__main__':
    # Create a manager to hold shared objects
    with Manager() as manager:
        # Create shared data structure and lock
        shared_large_data = manager.dict(load_large_data())
        shared_results = manager.list()
        lock = manager.Lock()
        
        file_list = ['file_01.txt', 'file_02.txt', ...]
        
        # Create a pool and pass shared objects to each task
        with Pool() as pool:
            pool.starmap(process_file, [(f, shared_large_data, lock) for f in file_list])
        
        # Access the final results
        print(f"All results: {list(shared_results)}")

Pros: Works across all platforms, supports mutable shared data, easy to implement.
Cons: IPC overhead can slow down frequent accesses—best for tasks where you don’t query the shared data thousands of times per file.

3. Ultra-Fast Shared Memory (Python 3.8+)

For large, structured data (like NumPy arrays, matrices, or byte streams), the shared_memory module (added in Python 3.8) lets you directly share memory blocks between processes, eliminating IPC overhead entirely. This is ideal for performance-critical tasks.

Example with Shared NumPy Array

import concurrent.futures
import numpy as np
from multiprocessing import shared_memory

def process_file(file_path, shm_name, shm_shape, shm_dtype):
    """Access the shared NumPy array and process the file."""
    # Attach to the existing shared memory block
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    # Reconstruct the NumPy array from the shared memory
    large_array = np.ndarray(shm_shape, dtype=shm_dtype, buffer=existing_shm.buf)
    
    # Process file using large_array
    with open(file_path, 'r') as f:
        content = f.read()
        idx = parse_index_from_content(content)
        result = large_array[idx] * some_calculation()
    
    # Clean up: close the shared memory (don't unlink yet!)
    existing_shm.close()
    return result

if __name__ == '__main__':
    # Create your large NumPy array
    large_array = np.random.rand(10_000_000)  # Example large array
    
    # Create a shared memory block for the array
    shm = shared_memory.SharedMemory(create=True, size=large_array.nbytes)
    # Create a new array that uses the shared memory buffer
    shared_array = np.ndarray(large_array.shape, dtype=large_array.dtype, buffer=shm.buf)
    # Copy data to the shared array
    shared_array[:] = large_array[:]
    
    file_list = ['file_01.txt', 'file_02.txt', ...]
    
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # Pass shared memory details to each worker
        results = list(executor.map(process_file, file_list, 
                                   [shm.name]*len(file_list), 
                                   [large_array.shape]*len(file_list), 
                                   [large_array.dtype]*len(file_list)))
    
    # Clean up: unlink the shared memory block (only after all workers are done)
    shm.close()
    shm.unlink()

Pros: Near-native memory access speed, no IPC overhead, perfect for large numerical data.
Cons: Requires manual memory management, only works with structured data, Python 3.8+.

Key Recommendations to Choose the Right Approach

  • Read-only, Linux/macOS: Go with the worker-initiated copy (Option 1) for maximum speed.
  • Mutable data or Windows: Use multiprocessing.Manager (Option 2) for simplicity.
  • Large structured data (NumPy, etc.): Use shared_memory (Option 3) for top performance.
  • Batch file processing: Split your file list into chunks instead of processing one file at a time to reduce task scheduling overhead.

内容的提问来源于stack exchange,提问作者jpp1

火山引擎 最新活动