You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

多进程并行统计多表记录数并合并为单DataFrame的问题排查

解决多进程返回DataFrame合并问题

你的代码没法合并各进程的DataFrame,核心是没有让进程返回结果,也没有在主进程收集这些结果,咱们一步步修复:

问题分析

  1. task函数调用了tblRowCn但没有返回它的结果,导致每个进程跑完后,主进程拿不到生成的DataFrame
  2. 主进程的executor.map没有接收返回值,自然没法后续合并
  3. tblRowCn在表不存在时返回空列表,合并时需要过滤这些无效值,避免报错

修改后的完整代码

import multiprocessing as mp
import pandas as pd
import sqlanydb
# 假设ego是你已定义的模块,保留原有逻辑

list_tabl_cn =['TBL_A', 'TBL_B', 'TBL_C', 'TBL_D']

def tblRowCn(p_tbl):
    # 每个进程独立创建数据库连接(这部分是对的,多进程不能共享连接)
    connDb = sqlanydb.connect(uid='dba', pwd='sql', host='ip:port', dbn='blah')
    is_tableExists = ego.my_desc(p_tbl,163).shape[0]
    if is_tableExists:
        if p_tbl == 'STG_CFG_SYS':
            Q_ = """\
SELECT OPENDATE as TGL_POS, COUNT(1) CN FROM {0} GROUP BY TGL_POS
""".format(p_tbl)
        else:
            Q_ = """\
SELECT TANGGAL_POSISI as TGL_POS, COUNT(1) CN FROM {0} GROUP BY TGL_POS
""".format(p_tbl)
        df_tbl = pd.read_sql_query(Q_, connDb, parse_dates=['TGL_POS'])
        df_tbl['THN'],df_tbl['BLN']= df_tbl['TGL_POS'].dt.year, df_tbl['TGL_POS'].dt.month
        # 记得关闭连接,避免资源泄漏
        connDb.close()
    else:
        df_tbl = pd.DataFrame()  # 把空列表改成空DataFrame,后续合并更统一
    return df_tbl

def task(table_nm):
    print(f"Task Executed with process {mp.current_process().pid}")
    # 关键:返回tblRowCn的执行结果
    return tblRowCn(table_nm.upper())

def main():
    # 用cpu_count()-8没问题,可根据机器资源调整
    executor = mp.Pool(mp.cpu_count()-8)
    # 关键:接收每个进程返回的结果
    results = executor.map(task, list_tabl_cn)
    executor.close()
    executor.join()  # 等待所有进程完成,确保结果都返回
    
    # 过滤掉空的DataFrame,只保留有效数据
    valid_dfs = [df for df in results if not df.empty]
    
    if valid_dfs:
        # 合并所有有效DataFrame
        final_df = pd.concat(valid_dfs, ignore_index=True)
        print("合并完成的DataFrame:")
        print(final_df.head())
        # 这里可以添加保存或后续处理逻辑
    else:
        print("没有有效数据可以合并")

if __name__ == "__main__":
    main()

关键修改点说明

  1. task函数返回结果:在task里添加return tblRowCn(...),这样每个进程会把生成的DataFrame传回主进程
  2. 主进程收集结果:用results = executor.map(...)接收所有进程的返回值,map会按输入列表的顺序返回对应结果
  3. 统一空返回值类型:把tblRowCn里的df_tbl=[]改成df_tbl = pd.DataFrame(),后续过滤和合并时更统一,避免类型错误
  4. 添加executor.join():确保所有进程都执行完毕后再处理结果,避免主进程提前执行合并逻辑
  5. 过滤空DataFrame并合并:用列表推导式筛选非空的DataFrame,再用pd.concat合并,ignore_index=True重置索引,避免索引冲突

这样就能顺利把各进程返回的DataFrame合并成单个DataFrame了。

内容的提问来源于stack exchange,提问作者Durian Parongil

火山引擎 最新活动