You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何批量导入列结构存在差异的多个CSV文件到PostgreSQL?

如何批量导入列结构存在差异的多个CSV文件到PostgreSQL?

别担心,刚接触PostgreSQL遇到这种多CSV列不一致的情况太正常了!你现有的代码已经能处理单个CSV的导入,我们只需要在这个基础上扩展,解决收集所有列名数据与列对齐这两个核心问题就行。我帮你把代码改成支持批量导入的版本,一步步来解释:

核心思路拆解

我们要完成的核心任务分三步:

  • 先遍历所有CSV,把所有出现过的列名收集全,确保目标表能容纳所有CSV的数据
  • 创建包含所有列的统一目标表
  • 逐个处理每个CSV:将数据与目标表的列对齐(缺失列补空、多余列按需处理),再高效导入到数据库

完整批量导入代码

import pandas as pd
import psycopg2
from io import StringIO
import os

# Database connection parameters
db_params = {
    "dbname": "your_database",
    "user": "your_username",
    "password": "your_password",
    "host": "your_host",
    "port": "your_port",
}

# 替换成你的CSV文件夹路径(所有要导入的CSV都放在这里)
csv_folder = "your_csv_folder_path"
# 统一的目标表名
target_table_name = "your_unified_table"

# --------------------------
# 步骤1:高效收集所有CSV的列名(仅读表头,不加载全量数据)
# --------------------------
all_columns = set()
# 筛选文件夹下所有CSV文件
csv_files = [f for f in os.listdir(csv_folder) if f.lower().endswith(".csv")]

for csv_file in csv_files:
    file_path = os.path.join(csv_folder, csv_file)
    # 只读取表头行,快速收集列名
    df_cols = pd.read_csv(file_path, nrows=0).columns
    all_columns.update(df_cols)

# 转成有序列表,方便后续对齐
all_columns = sorted(list(all_columns))
print(f"收集到所有CSV的列:{all_columns}")

# --------------------------
# 步骤2:连接数据库,创建包含所有列的目标表
# --------------------------
conn = psycopg2.connect(**db_params)
cur = conn.cursor()

# 构建建表语句,统一用TEXT类型(后续可根据需求调整为更精确的类型)
col_str = ", ".join([f'"{col}" TEXT' for col in all_columns])
create_table_query = f"CREATE TABLE IF NOT EXISTS {target_table_name} ({col_str});"
cur.execute(create_table_query)
conn.commit()

# --------------------------
# 步骤3:逐个导入CSV,自动对齐列
# --------------------------
for csv_file in csv_files:
    file_path = os.path.join(csv_folder, csv_file)
    print(f"正在处理文件:{csv_file}")

    # 读取当前CSV到DataFrame
    df = pd.read_csv(file_path)

    # 对齐到目标表的列:缺失列自动补空,多余列直接丢弃(需保留可自行调整逻辑)
    df_aligned = df.reindex(columns=all_columns, fill_value=None)

    # 用StringIO配合COPY命令导入,保持高效性
    buffer = StringIO()
    # 按目标表列顺序导出,NaN替换为空字符串适配PostgreSQL
    df_aligned.to_csv(buffer, index=False, header=False, na_rep='')
    buffer.seek(0)

    # 执行COPY:指定目标表的所有列,确保数据对应到正确位置
    copy_query = f"""
    COPY {target_table_name} ({', '.join([f'"{col}"' for col in all_columns])})
    FROM STDIN WITH CSV DELIMITER ',' NULL ''
    """
    cur.copy_expert(copy_query, buffer)
    conn.commit()

    print(f"文件 {csv_file} 导入完成!")

# 关闭数据库连接
cur.close()
conn.close()
print("所有CSV文件批量导入完成!")

关键细节说明

  1. 高效收集列名:用pd.read_csv(nrows=0)只读取CSV表头,不用加载全量数据,处理上百个大CSV时速度会快很多
  2. 数据列对齐df.reindex(columns=all_columns, fill_value=None)自动帮你把每个CSV的列和目标表列匹配,缺失的列会补空,多余的列直接丢弃(如果想保留多余列,可以加个判断打印提示,避免数据丢失)
  3. COPY命令适配:指定目标表的所有列顺序,把NaN替换为空字符串,这样PostgreSQL能正确识别空值,不会导入报错
  4. 容错性扩展:如果想避免单个文件导入失败导致整个任务中断,可以给每个文件的处理加上try-except块,跳过错误文件并记录日志

可选优化建议

  • 列类型优化:如果需要更精确的列类型(比如数字用INT/FLOAT、日期用DATE),可以在收集列名后,抽取部分CSV的样本数据推断类型,再修改建表语句
  • 大文件分块:针对超大CSV,可以用pd.read_csv(chunksize=10000)分块读取导入,避免内存占用过高
  • 日志记录:把处理过程写入日志文件,方便后续排查导入失败的问题

备注:内容来源于stack exchange,提问作者speedqueen1212

火山引擎 最新活动