You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

多进程更新Pandas DataFrame值及单多核计算速度对比问题

Fixing Multiprocessing Update Issue for Pandas Rolling Average Calculation

First off, let's spot the obvious bugs in your code and then tackle the multiprocessing problem head-on.

1. Critical Logic Error in Calculation

Your Cal and Cal2 functions are calculating the mean of the c column instead of the a column — that's why even your single-core code isn't producing the right results! You meant to compute the average of the a column over the specified window, right? Let's fix that first, plus adjust the loop range to cover all rows properly:

def Cal(start, end):
    for i in range(start, end):
        if i < 10:
            df.loc[i,'c'] = df.loc[:i,'a'].mean()
        else :
            df.loc[i,'c'] = df.loc[i-9:i,'a'].mean()

The original loop range(end-start-1) was skipping the last row of each segment; switching to range(start, end) ensures we cover every index in the target range.

2. Why Multiprocessing with Manager Isn't Working

Two big issues are causing your shared DataFrame to not update:

  • Order Dependency: Your calculation for row i depends on the previous 9 rows of a (or all prior rows if i<10). Splitting the DataFrame into isolated chunks for processes means each process can't access the prior chunk's data, breaking the window calculation.
  • Shared Object Limitations: Even with Manager.Namespace, modifying a Pandas DataFrame across processes is unsafe. Pandas objects aren't designed for concurrent writes, and changes made in child processes often don't propagate back to the main process correctly.

3. Better Solutions

Option 1: Use Pandas Built-in rolling (Fastest & Simplest)

Pandas has optimized rolling window functions that blow manual loops out of the water — even single-core rolling will outperform your loop, and it can leverage multi-threading under the hood for large datasets. This one line replaces your entire loop:

import pandas as pd
import numpy as np
import time

total_num = 1000
df = pd.DataFrame(np.arange(1, total_num*2+1).reshape(total_num,2), columns=['a','b'])
df['c'] = 0

print('Single core with rolling: --->')
start_t = time.time()
# Window size 10, min_periods=1 uses all available rows when i<10
df['c'] = df['a'].rolling(window=10, min_periods=1).mean()
end_t = time.time()
print(f"Time taken: {end_t - start_t:.4f} seconds")

This is the most maintainable and efficient approach, especially as your dataset grows.

Option 2: Multiprocessing with Chunked Calculation (If You Must Use Processes)

If you need multiprocessing for extremely large datasets, split the DataFrame into chunks with a 9-row overlap (to cover the window dependency). Each chunk can compute its own c values independently, then we combine the results safely.

from multiprocessing import Pool
import pandas as pd
import numpy as np
import time

total_num = 1000
df = pd.DataFrame(np.arange(1, total_num*2+1).reshape(total_num,2), columns=['a','b'])
df['c'] = 0

def calculate_chunk(chunk):
    # Chunk includes extra 9 rows from the previous segment to handle window logic
    result = []
    for i in range(len(chunk)):
        if i < 10:
            avg = chunk['a'].iloc[:i+1].mean()
        else:
            avg = chunk['a'].iloc[i-9:i+1].mean()
        result.append(avg)
    # Return only the non-overlapping part of the results to avoid duplicates
    return result[9:] if len(chunk) > 9 else result

if __name__ == '__main__':
    num_core = 4
    chunk_size = total_num // num_core
    
    # Split into chunks with 9-row overlap
    chunks = []
    for i in range(num_core):
        start = max(0, i * chunk_size - 9)
        end = min(total_num, (i+1)*chunk_size)
        chunks.append(df.iloc[start:end].copy())
    
    print('Multiprocess with chunked calculation: ---->')
    start_t = time.time()
    with Pool(num_core) as pool:
        chunk_results = pool.map(calculate_chunk, chunks)
    
    # Combine results back into the original DataFrame
    idx = 0
    for res in chunk_results:
        df.loc[idx:idx+len(res)-1, 'c'] = res
        idx += len(res)
    
    end_t = time.time()
    print(f"Time taken: {end_t - start_t:.4f} seconds")

This works because each chunk has all the prior data it needs to compute its segment, and we only keep the non-overlapping results to avoid duplicates.

Key Takeaways

  • Always prioritize Pandas' built-in functions like rolling first — they're optimized, readable, and way faster than manual loops.
  • For multiprocessing on dependent tasks, ensure each chunk includes all necessary context (like the 9-row overlap here) to compute its segment independently.
  • Avoid modifying shared Pandas objects across processes directly; it's error-prone and inefficient.

内容的提问来源于stack exchange,提问作者DongHyeok Lee

火山引擎 最新活动