如何批量导入列结构存在差异的多个CSV文件到PostgreSQL?
如何批量导入列结构存在差异的多个CSV文件到PostgreSQL?
别担心,刚接触PostgreSQL遇到这种多CSV列不一致的情况太正常了!你现有的代码已经能处理单个CSV的导入,我们只需要在这个基础上扩展,解决收集所有列名和数据与列对齐这两个核心问题就行。我帮你把代码改成支持批量导入的版本,一步步来解释:
核心思路拆解
我们要完成的核心任务分三步:
- 先遍历所有CSV,把所有出现过的列名收集全,确保目标表能容纳所有CSV的数据
- 创建包含所有列的统一目标表
- 逐个处理每个CSV:将数据与目标表的列对齐(缺失列补空、多余列按需处理),再高效导入到数据库
完整批量导入代码
import pandas as pd import psycopg2 from io import StringIO import os # Database connection parameters db_params = { "dbname": "your_database", "user": "your_username", "password": "your_password", "host": "your_host", "port": "your_port", } # 替换成你的CSV文件夹路径(所有要导入的CSV都放在这里) csv_folder = "your_csv_folder_path" # 统一的目标表名 target_table_name = "your_unified_table" # -------------------------- # 步骤1:高效收集所有CSV的列名(仅读表头,不加载全量数据) # -------------------------- all_columns = set() # 筛选文件夹下所有CSV文件 csv_files = [f for f in os.listdir(csv_folder) if f.lower().endswith(".csv")] for csv_file in csv_files: file_path = os.path.join(csv_folder, csv_file) # 只读取表头行,快速收集列名 df_cols = pd.read_csv(file_path, nrows=0).columns all_columns.update(df_cols) # 转成有序列表,方便后续对齐 all_columns = sorted(list(all_columns)) print(f"收集到所有CSV的列:{all_columns}") # -------------------------- # 步骤2:连接数据库,创建包含所有列的目标表 # -------------------------- conn = psycopg2.connect(**db_params) cur = conn.cursor() # 构建建表语句,统一用TEXT类型(后续可根据需求调整为更精确的类型) col_str = ", ".join([f'"{col}" TEXT' for col in all_columns]) create_table_query = f"CREATE TABLE IF NOT EXISTS {target_table_name} ({col_str});" cur.execute(create_table_query) conn.commit() # -------------------------- # 步骤3:逐个导入CSV,自动对齐列 # -------------------------- for csv_file in csv_files: file_path = os.path.join(csv_folder, csv_file) print(f"正在处理文件:{csv_file}") # 读取当前CSV到DataFrame df = pd.read_csv(file_path) # 对齐到目标表的列:缺失列自动补空,多余列直接丢弃(需保留可自行调整逻辑) df_aligned = df.reindex(columns=all_columns, fill_value=None) # 用StringIO配合COPY命令导入,保持高效性 buffer = StringIO() # 按目标表列顺序导出,NaN替换为空字符串适配PostgreSQL df_aligned.to_csv(buffer, index=False, header=False, na_rep='') buffer.seek(0) # 执行COPY:指定目标表的所有列,确保数据对应到正确位置 copy_query = f""" COPY {target_table_name} ({', '.join([f'"{col}"' for col in all_columns])}) FROM STDIN WITH CSV DELIMITER ',' NULL '' """ cur.copy_expert(copy_query, buffer) conn.commit() print(f"文件 {csv_file} 导入完成!") # 关闭数据库连接 cur.close() conn.close() print("所有CSV文件批量导入完成!")
关键细节说明
- 高效收集列名:用
pd.read_csv(nrows=0)只读取CSV表头,不用加载全量数据,处理上百个大CSV时速度会快很多 - 数据列对齐:
df.reindex(columns=all_columns, fill_value=None)自动帮你把每个CSV的列和目标表列匹配,缺失的列会补空,多余的列直接丢弃(如果想保留多余列,可以加个判断打印提示,避免数据丢失) - COPY命令适配:指定目标表的所有列顺序,把NaN替换为空字符串,这样PostgreSQL能正确识别空值,不会导入报错
- 容错性扩展:如果想避免单个文件导入失败导致整个任务中断,可以给每个文件的处理加上
try-except块,跳过错误文件并记录日志
可选优化建议
- 列类型优化:如果需要更精确的列类型(比如数字用INT/FLOAT、日期用DATE),可以在收集列名后,抽取部分CSV的样本数据推断类型,再修改建表语句
- 大文件分块:针对超大CSV,可以用
pd.read_csv(chunksize=10000)分块读取导入,避免内存占用过高 - 日志记录:把处理过程写入日志文件,方便后续排查导入失败的问题
备注:内容来源于stack exchange,提问作者speedqueen1212




