多进程并行统计多表记录数并合并为单DataFrame的问题排查
解决多进程返回DataFrame合并问题
你的代码没法合并各进程的DataFrame,核心是没有让进程返回结果,也没有在主进程收集这些结果,咱们一步步修复:
问题分析
task函数调用了tblRowCn但没有返回它的结果,导致每个进程跑完后,主进程拿不到生成的DataFrame- 主进程的
executor.map没有接收返回值,自然没法后续合并 tblRowCn在表不存在时返回空列表,合并时需要过滤这些无效值,避免报错
修改后的完整代码
import multiprocessing as mp import pandas as pd import sqlanydb # 假设ego是你已定义的模块,保留原有逻辑 list_tabl_cn =['TBL_A', 'TBL_B', 'TBL_C', 'TBL_D'] def tblRowCn(p_tbl): # 每个进程独立创建数据库连接(这部分是对的,多进程不能共享连接) connDb = sqlanydb.connect(uid='dba', pwd='sql', host='ip:port', dbn='blah') is_tableExists = ego.my_desc(p_tbl,163).shape[0] if is_tableExists: if p_tbl == 'STG_CFG_SYS': Q_ = """\ SELECT OPENDATE as TGL_POS, COUNT(1) CN FROM {0} GROUP BY TGL_POS """.format(p_tbl) else: Q_ = """\ SELECT TANGGAL_POSISI as TGL_POS, COUNT(1) CN FROM {0} GROUP BY TGL_POS """.format(p_tbl) df_tbl = pd.read_sql_query(Q_, connDb, parse_dates=['TGL_POS']) df_tbl['THN'],df_tbl['BLN']= df_tbl['TGL_POS'].dt.year, df_tbl['TGL_POS'].dt.month # 记得关闭连接,避免资源泄漏 connDb.close() else: df_tbl = pd.DataFrame() # 把空列表改成空DataFrame,后续合并更统一 return df_tbl def task(table_nm): print(f"Task Executed with process {mp.current_process().pid}") # 关键:返回tblRowCn的执行结果 return tblRowCn(table_nm.upper()) def main(): # 用cpu_count()-8没问题,可根据机器资源调整 executor = mp.Pool(mp.cpu_count()-8) # 关键:接收每个进程返回的结果 results = executor.map(task, list_tabl_cn) executor.close() executor.join() # 等待所有进程完成,确保结果都返回 # 过滤掉空的DataFrame,只保留有效数据 valid_dfs = [df for df in results if not df.empty] if valid_dfs: # 合并所有有效DataFrame final_df = pd.concat(valid_dfs, ignore_index=True) print("合并完成的DataFrame:") print(final_df.head()) # 这里可以添加保存或后续处理逻辑 else: print("没有有效数据可以合并") if __name__ == "__main__": main()
关键修改点说明
- 让
task函数返回结果:在task里添加return tblRowCn(...),这样每个进程会把生成的DataFrame传回主进程 - 主进程收集结果:用
results = executor.map(...)接收所有进程的返回值,map会按输入列表的顺序返回对应结果 - 统一空返回值类型:把
tblRowCn里的df_tbl=[]改成df_tbl = pd.DataFrame(),后续过滤和合并时更统一,避免类型错误 - 添加
executor.join():确保所有进程都执行完毕后再处理结果,避免主进程提前执行合并逻辑 - 过滤空DataFrame并合并:用列表推导式筛选非空的DataFrame,再用
pd.concat合并,ignore_index=True重置索引,避免索引冲突
这样就能顺利把各进程返回的DataFrame合并成单个DataFrame了。
内容的提问来源于stack exchange,提问作者Durian Parongil




