You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python多数据库表查询存S3脚本的代码架构优化咨询

优化60张表查询与S3存储的代码组织方案

嘿,这种维护60个重复函数的痛苦我太懂了——不仅主函数里要挨个调用,后续改逻辑或者加新表都要重复写一堆代码,完全不符合DRY(Don't Repeat Yourself)原则。咱们从配置驱动、逻辑复用、合理封装这几个角度来重构代码,让它清爽又好维护:

1. 用配置列表替代60个重复函数

把每张表的查询规则、输出路径这些信息抽成结构化配置,比如一个字典列表,所有表的差异都放在配置里,不用再写重复的函数:

# 可以把这个配置单独放到config.py文件里,方便管理
TABLE_CONFIGS = [
    {
        "table_name": "user_behavior",
        "query_template": "SELECT * FROM user_behavior WHERE created_at >= %s",
        "csv_filename": "user_behavior_{}.csv",
        "s3_key": "analytics/user_behavior/{}.csv"
    },
    {
        "table_name": "product_sales",
        "query_template": "SELECT product_id, SUM(amount) FROM product_sales WHERE sale_time >= %s GROUP BY product_id",
        "csv_filename": "product_sales_summary_{}.csv",
        "s3_key": "analytics/sales/{}.csv"
    },
    # 剩下58张表的配置依次往下加...
]

这里的query_template用占位符预留current_time的位置,后续统一传参;csv_filenames3_key里的{}用来格式化时间戳,方便区分不同批次的文件。

2. 编写通用处理函数

写一个通用函数,接收单张表的配置、核心依赖(数据库引擎、AWS凭证、当前时间戳),完成「查询→转CSV→上传S3」的全流程:

import pandas as pd
import boto3
from sqlalchemy import text
from io import BytesIO

def process_table(table_config, engine, aws_credentials, current_time, bucket_name):
    # 1. 执行数据库查询(用参数绑定更安全,避免SQL注入)
    query = text(table_config["query_template"])
    with engine.connect() as conn:
        df = pd.read_sql(query, conn, params=(current_time,))
    
    # 2. 生成目标S3路径
    s3_key = table_config["s3_key"].format(current_time)
    
    # 3. 内存中生成CSV,避免占用本地磁盘
    csv_buffer = BytesIO()
    df.to_csv(csv_buffer, index=False)
    csv_buffer.seek(0)
    
    # 4. 上传到S3
    s3 = boto3.client(
        's3',
        aws_access_key_id=aws_credentials["access_key"],
        aws_secret_access_key=aws_credentials["secret_key"]
    )
    s3.put_object(
        Bucket=bucket_name,
        Key=s3_key,
        Body=csv_buffer.getvalue()
    )
    
    print(f"✅ 完成表 {table_config['table_name']} 的导出")

3. 合理管理全局变量,避免全局污染

你提到的engine、AWS凭证这些,尽量不要用全局变量,而是通过参数传递或者封装成类的属性,这样代码更易测试、更清晰:

方式一:用类封装依赖(推荐,适合长期维护)

把所有依赖初始化到类里,处理方法作为类的成员函数,逻辑更内聚:

class TableExporter:
    def __init__(self, engine, aws_credentials, bucket_name):
        self.engine = engine
        self.s3 = boto3.client(
            's3',
            aws_access_key_id=aws_credentials["access_key"],
            aws_secret_access_key=aws_credentials["secret_key"]
        )
        self.bucket_name = bucket_name
    
    def _process_single_table(self, table_config, current_time):
        # 复用通用查询、转CSV、上传逻辑
        query = text(table_config["query_template"])
        with self.engine.connect() as conn:
            df = pd.read_sql(query, conn, params=(current_time,))
        
        csv_buffer = BytesIO()
        df.to_csv(csv_buffer, index=False)
        csv_buffer.seek(0)
        
        s3_key = table_config["s3_key"].format(current_time)
        self.s3.put_object(
            Bucket=self.bucket_name,
            Key=s3_key,
            Body=csv_buffer.getvalue()
        )
        print(f"✅ 表 {table_config['table_name']} 处理完成")
    
    def process_all_tables(self, table_configs, current_time):
        # 遍历所有配置,批量处理
        for config in table_configs:
            self._process_single_table(config, current_time)

然后主函数里只需要初始化类,调用批量处理方法:

import os
import time
from sqlalchemy import create_engine
from config import TABLE_CONFIGS

def main():
    # 从环境变量读取敏感信息,不要硬编码
    engine = create_engine(os.getenv("DB_CONNECTION_URL"))
    aws_credentials = {
        "access_key": os.getenv("AWS_ACCESS_KEY"),
        "secret_key": os.getenv("AWS_SECRET_KEY")
    }
    current_time = int(time.time())
    bucket_name = os.getenv("S3_BUCKET_NAME")
    
    exporter = TableExporter(engine, aws_credentials, bucket_name)
    exporter.process_all_tables(TABLE_CONFIGS, current_time)

if __name__ == "__main__":
    main()

方式二:直接参数传递(适合简单场景)

如果不想用类,也可以直接把依赖作为参数传给通用函数,主函数遍历配置列表调用即可:

def main():
    # 初始化依赖
    engine = create_engine(os.getenv("DB_CONNECTION_URL"))
    aws_credentials = {...}
    current_time = int(time.time())
    bucket_name = "your-bucket-name"
    
    # 遍历配置批量处理
    for config in TABLE_CONFIGS:
        process_table(config, engine, aws_credentials, current_time, bucket_name)

4. 可选优化:并行处理提升效率

如果60张表串行处理太慢,可以用并行来加速,比如用concurrent.futures.ThreadPoolExecutor(注意数据库连接池的并发限制,不要开太多线程):

from concurrent.futures import ThreadPoolExecutor

def main():
    # 初始化依赖
    engine = create_engine(os.getenv("DB_CONNECTION_URL"))
    aws_credentials = {...}
    current_time = int(time.time())
    bucket_name = "your-bucket-name"
    
    # 用线程池并行处理,线程数根据数据库连接池大小调整
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [
            executor.submit(process_table, config, engine, aws_credentials, current_time, bucket_name)
            for config in TABLE_CONFIGS
        ]
        # 等待所有任务完成,捕获可能的异常
        for future in futures:
            try:
                future.result()
            except Exception as e:
                print(f"❌ 任务失败: {str(e)}")

5. 代码结构分层(大型项目推荐)

如果后续还要扩展功能,可以把代码分成几个模块,职责更清晰:

  • config.py:存储表配置、数据库连接信息、S3 bucket名称等
  • database.py:封装数据库查询相关的工具函数
  • s3_utils.py:封装S3上传/下载的工具函数
  • exporter.py:封装核心的表导出逻辑
  • main.py:入口函数,负责初始化依赖和触发批量处理

内容的提问来源于stack exchange,提问作者python_noob

火山引擎 最新活动