多进程更新Pandas DataFrame值及单多核计算速度对比问题
First off, let's spot the obvious bugs in your code and then tackle the multiprocessing problem head-on.
1. Critical Logic Error in Calculation
Your Cal and Cal2 functions are calculating the mean of the c column instead of the a column — that's why even your single-core code isn't producing the right results! You meant to compute the average of the a column over the specified window, right? Let's fix that first, plus adjust the loop range to cover all rows properly:
def Cal(start, end): for i in range(start, end): if i < 10: df.loc[i,'c'] = df.loc[:i,'a'].mean() else : df.loc[i,'c'] = df.loc[i-9:i,'a'].mean()
The original loop range(end-start-1) was skipping the last row of each segment; switching to range(start, end) ensures we cover every index in the target range.
2. Why Multiprocessing with Manager Isn't Working
Two big issues are causing your shared DataFrame to not update:
- Order Dependency: Your calculation for row
idepends on the previous 9 rows ofa(or all prior rows ifi<10). Splitting the DataFrame into isolated chunks for processes means each process can't access the prior chunk's data, breaking the window calculation. - Shared Object Limitations: Even with
Manager.Namespace, modifying a Pandas DataFrame across processes is unsafe. Pandas objects aren't designed for concurrent writes, and changes made in child processes often don't propagate back to the main process correctly.
3. Better Solutions
Option 1: Use Pandas Built-in rolling (Fastest & Simplest)
Pandas has optimized rolling window functions that blow manual loops out of the water — even single-core rolling will outperform your loop, and it can leverage multi-threading under the hood for large datasets. This one line replaces your entire loop:
import pandas as pd import numpy as np import time total_num = 1000 df = pd.DataFrame(np.arange(1, total_num*2+1).reshape(total_num,2), columns=['a','b']) df['c'] = 0 print('Single core with rolling: --->') start_t = time.time() # Window size 10, min_periods=1 uses all available rows when i<10 df['c'] = df['a'].rolling(window=10, min_periods=1).mean() end_t = time.time() print(f"Time taken: {end_t - start_t:.4f} seconds")
This is the most maintainable and efficient approach, especially as your dataset grows.
Option 2: Multiprocessing with Chunked Calculation (If You Must Use Processes)
If you need multiprocessing for extremely large datasets, split the DataFrame into chunks with a 9-row overlap (to cover the window dependency). Each chunk can compute its own c values independently, then we combine the results safely.
from multiprocessing import Pool import pandas as pd import numpy as np import time total_num = 1000 df = pd.DataFrame(np.arange(1, total_num*2+1).reshape(total_num,2), columns=['a','b']) df['c'] = 0 def calculate_chunk(chunk): # Chunk includes extra 9 rows from the previous segment to handle window logic result = [] for i in range(len(chunk)): if i < 10: avg = chunk['a'].iloc[:i+1].mean() else: avg = chunk['a'].iloc[i-9:i+1].mean() result.append(avg) # Return only the non-overlapping part of the results to avoid duplicates return result[9:] if len(chunk) > 9 else result if __name__ == '__main__': num_core = 4 chunk_size = total_num // num_core # Split into chunks with 9-row overlap chunks = [] for i in range(num_core): start = max(0, i * chunk_size - 9) end = min(total_num, (i+1)*chunk_size) chunks.append(df.iloc[start:end].copy()) print('Multiprocess with chunked calculation: ---->') start_t = time.time() with Pool(num_core) as pool: chunk_results = pool.map(calculate_chunk, chunks) # Combine results back into the original DataFrame idx = 0 for res in chunk_results: df.loc[idx:idx+len(res)-1, 'c'] = res idx += len(res) end_t = time.time() print(f"Time taken: {end_t - start_t:.4f} seconds")
This works because each chunk has all the prior data it needs to compute its segment, and we only keep the non-overlapping results to avoid duplicates.
Key Takeaways
- Always prioritize Pandas' built-in functions like
rollingfirst — they're optimized, readable, and way faster than manual loops. - For multiprocessing on dependent tasks, ensure each chunk includes all necessary context (like the 9-row overlap here) to compute its segment independently.
- Avoid modifying shared Pandas objects across processes directly; it's error-prone and inefficient.
内容的提问来源于stack exchange,提问作者DongHyeok Lee




