You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何用Python将含外键映射的Excel数据导入PostgreSQL?

针对你2万行Excel数据导入带外键的PostgreSQL表的需求,我推荐一套高效且易维护的方案,核心思路是先预加载外键映射字典,批量转换文本值为ID,再用PostgreSQL友好的批量插入方法,具体步骤如下:

一、预加载外键映射字典(关键第一步)

你的外键对应维度表(用户表、产品表)数据量远小于2万行,直接把这些表的数据拉到内存做成**{文本值: ID}**的字典,后续转换时能做到O(1)的查找效率,这是最省时间的方式。

用SQLAlchemy或pandas直接从数据库读取即可:

import pandas as pd
from sqlalchemy import create_engine

# 连接PostgreSQL
engine = create_engine('postgresql://用户名:密码@主机:端口/数据库名')

# 生成用户名称→ID的映射字典
user_map = pd.read_sql("SELECT user_id, Name FROM user_table", engine) \
            .set_index('Name')['user_id'].to_dict()

# 生成产品名称→ID的映射字典
product_map = pd.read_sql("SELECT product_id, product FROM product_table", engine) \
              .set_index('product')['product_id'].to_dict()

二、转换Excel DataFrame中的文本为外键ID

你已经能用xlwings把Excel转成DataFrame,接下来用pandas的map()方法批量替换,2万行数据瞬间就能完成:

import xlwings as xw

# 读取Excel数据并转为DataFrame
wb = xw.Book('你的订单数据.xlsx')
df = wb.sheets['Sheet1'].used_range.options(pd.DataFrame, index=False).value
wb.close()

# 批量转换文本值为外键ID
df['user_id'] = df['user'].map(user_map)
df['product_id'] = df['bought'].map(product_map)

这里一定要做无效数据检查,避免因为Excel里有不存在的用户/产品导致插入失败:

# 找出无匹配的用户和产品
invalid_users = df[df['user_id'].isna()]['user'].unique()
invalid_products = df[df['product_id'].isna()]['bought'].unique()

if len(invalid_users) > 0:
    print(f"注意:以下用户在数据库中不存在:{invalid_users}")
if len(invalid_products) > 0:
    print(f"注意:以下产品在数据库中不存在:{invalid_products}")

# 过滤掉无效数据(可选,根据业务需求处理)
df = df.dropna(subset=['user_id', 'product_id'])

三、高效批量插入PostgreSQL

现在DataFrame里已经是合法的外键ID了,就可以用高效的批量插入方法,推荐以下几种(按易用性排序):

方法1:Pandas to_sql(最省心)

设置method='multi'让pandas生成批量INSERT语句,搭配chunksize控制每次插入的行数,比默认逐行插入快10倍以上:

# 只保留需要插入的列(假设订单表的order_id是自增主键,无需手动传入)
df[['user_id', 'product_id']].to_sql(
    name='order_table',  # 目标订单表名
    con=engine,
    if_exists='append',  # 追加数据,表不存在则自动创建
    method='multi',
    chunksize=1000,  # 每次插入1000行,可根据服务器性能调整
    index=False
)

方法2:SQLAlchemy bulk_insert_mappings(更快)

如果用SQLAlchemy ORM,bulk_insert_mappings是专门为批量插入优化的方法,比to_sql更高效:

from sqlalchemy.orm import sessionmaker
from your_models import Order  # 替换成你的Order模型类

Session = sessionmaker(bind=engine)
session = Session()

# 把DataFrame转成字典列表
order_records = df[['user_id', 'product_id']].to_dict('records')
session.bulk_insert_mappings(Order, order_records)
session.commit()
session.close()

方法3:Psycopg2 executemany(底层高效)

如果追求极致性能,用psycopg2的原生executemany,直接构造SQL语句:

import psycopg2

conn = psycopg2.connect("dbname=你的数据库 user=用户名 password=密码 host=主机")
cur = conn.cursor()

# 构造INSERT语句
insert_sql = "INSERT INTO order_table (user_id, product_id) VALUES (%s, %s)"
# 准备数据元组列表
data = list(df[['user_id', 'product_id']].itertuples(index=False, name=None))

cur.executemany(insert_sql, data)
conn.commit()
cur.close()
conn.close()

四、性能优化小贴士

  • 对于2万行数据,上面的方法都能在几秒内完成;如果以后数据量涨到百万级,可以试试PostgreSQL的COPY命令(pandas to_sql设置method='copy',或用psycopg2的copy_from),这是PostgreSQL最快的数据导入方式。
  • 不建议临时关闭外键约束,我们已经提前验证了ID的有效性,保持约束能避免脏数据。
  • 确保维度表(用户、产品)的主键字段有索引(默认主键自带索引),这样生成映射字典时的查询速度会更快。

内容的提问来源于stack exchange,提问作者Bishonen_PL

火山引擎 最新活动