You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何通过Python使用MERGE函数高效向Oracle数据库批量导入百万级CSV数据

如何通过Python使用MERGE函数高效向Oracle数据库批量导入百万级CSV数据

你的问题核心在于单条记录循环执行MERGE的开销被百万次放大——executemany虽然是批量执行,但每条MERGE语句都要单独完成"匹配检查-更新/插入"的逻辑,加上Python与数据库的往返通信、Oracle的单条记录处理开销,百万级数据自然会耗时极长。

下面是针对这个场景的高效优化方案,核心思路是将"百万次小MERGE"转化为"一次大MERGE",结合Oracle临时表和批量插入来实现:


一、优化原理

  1. 先将CSV数据批量导入Oracle的全局临时表(会话级隔离,不会污染其他会话,数据自动清理),这个过程是纯批量插入,开销极低;
  2. 仅执行一次MERGE语句,从临时表同步数据到目标表TEST_TABLE——Oracle对表级别的MERGE优化远高于单条记录的循环处理,能利用索引快速完成匹配和批量更新/插入。

二、具体实现步骤与代码

1. 提前准备Oracle临时表

建议在数据库中提前创建全局临时表(也可以在Python中动态创建),会话结束后数据会自动清理:

CREATE GLOBAL TEMPORARY TABLE TEMP_CSV_DATA (
    ID VARCHAR2(50),  -- 请根据实际数据类型调整长度
    COUNTRY VARCHAR2(100),
    "DATE" DATE
) ON COMMIT PRESERVE ROWS;  -- 提交后保留数据,直到会话结束

2. 优化后的Python代码

替换你原有的数据处理和MERGE逻辑,重点优化内存占用批量操作效率

import csv
import io
import cx_Oracle
from botocore.exceptions import ClientError

# ... 省略数据库连接、S3配置等原有代码 ...

# --------------------------
# 步骤1:批量将CSV数据插入临时表
# --------------------------
# 插入临时表的SQL
insert_temp_sql = "INSERT INTO TEMP_CSV_DATA (ID, COUNTRY, \"DATE\") VALUES (:1, :2, :3)"

# 流式读取S3文件,避免一次性加载百万条数据到内存
obj = s3.Object(CDH_S3_Bucket, f"{CDH_Path}/{s3_filename}")
body = obj.get()['Body']

# 用流式方式处理CSV,无需一次性读取所有内容
with io.TextIOWrapper(body, encoding='utf-8') as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    headings = next(csv_reader)  # 跳过表头

    # 分块插入:每10000条提交一次(可根据内存/数据库性能调整)
    BATCH_SIZE = 10000
    batch = []
    # 显式指定字段类型,避免Oracle隐式类型转换,提升性能
    cursor.setinputsizes(
        cx_Oracle.STRING,  # ID字段类型
        cx_Oracle.STRING,  # COUNTRY字段类型
        cx_Oracle.DATE     # DATE字段类型
    )

    for line in csv_reader:
        # 注意:如果CSV中的DATE是字符串,需要转换为Python datetime对象
        # 示例:line[2] = datetime.datetime.strptime(line[2], "%Y-%m-%d")
        batch.append(line)
        if len(batch) >= BATCH_SIZE:
            cursor.executemany(insert_temp_sql, batch)
            connection.commit()
            batch = []  # 清空批次,避免内存占用过高

    # 处理剩余的不足一个批次的数据
    if batch:
        cursor.executemany(insert_temp_sql, batch)
        connection.commit()

# --------------------------
# 步骤2:一次性执行MERGE同步到目标表
# --------------------------
bulk_merge_sql = """
MERGE INTO TEST_TABLE a
USING (SELECT ID, COUNTRY, "DATE" FROM TEMP_CSV_DATA) src
ON (src.ID = a.ID)
WHEN MATCHED THEN
    UPDATE SET
        a.COUNTRY = src.COUNTRY,
        a."DATE" = src."DATE"
WHEN NOT MATCHED THEN
    INSERT (ID, COUNTRY, "DATE")
    VALUES (src.ID, src.COUNTRY, src."DATE")
"""

cursor.execute(bulk_merge_sql)
connection.commit()

# 可选:手动清空临时表(全局临时表会话结束会自动清空,此步骤非必须)
# cursor.execute("TRUNCATE TABLE TEMP_CSV_DATA")
# connection.commit()

# ... 关闭游标、连接等原有代码 ...

三、额外性能优化建议

  1. 确保目标表有索引TEST_TABLEID字段必须有主键或唯一索引,否则MERGE时的匹配逻辑会触发全表扫描,百万级数据下会直接拖慢整个过程。
  2. 调整批次大小BATCH_SIZE的最优值需要根据你的内存大小、数据库带宽测试,常见的范围是5000-50000条/批次。
  3. 避免内存溢出:绝对不要把百万条数据全部加载到Python列表中(你的原代码中data.append(line)会导致这个问题),用流式读取+分块插入是关键。
  4. 数据库端优化:如果权限允许,可以临时关闭目标表的非唯一索引,完成MERGE后再重建;或者调整Oracle的PGA_AGGREGATE_TARGET等参数,提升批量操作的内存分配。

为什么原方法效率低?

你的原代码用executemany执行MERGE,本质上还是让Oracle为每条CSV记录单独执行一次MERGE逻辑:

  • 每条记录都要经历"解析SQL→匹配索引→更新/插入"的完整流程,百万次的重复开销极大;
  • Python与数据库的往返通信、上下文切换也会累积大量耗时。

而先批量插入临时表再一次性MERGE,把百万次小操作压缩为两次批量操作,Oracle能利用自身的批量处理优化(比如全表扫描、批量更新),性能会提升10-100倍。

备注:内容来源于stack exchange,提问作者Jegor Wieler

火山引擎 最新活动