Python多数据库表查询存S3脚本的代码架构优化咨询
优化60张表查询与S3存储的代码组织方案
嘿,这种维护60个重复函数的痛苦我太懂了——不仅主函数里要挨个调用,后续改逻辑或者加新表都要重复写一堆代码,完全不符合DRY(Don't Repeat Yourself)原则。咱们从配置驱动、逻辑复用、合理封装这几个角度来重构代码,让它清爽又好维护:
1. 用配置列表替代60个重复函数
把每张表的查询规则、输出路径这些信息抽成结构化配置,比如一个字典列表,所有表的差异都放在配置里,不用再写重复的函数:
# 可以把这个配置单独放到config.py文件里,方便管理 TABLE_CONFIGS = [ { "table_name": "user_behavior", "query_template": "SELECT * FROM user_behavior WHERE created_at >= %s", "csv_filename": "user_behavior_{}.csv", "s3_key": "analytics/user_behavior/{}.csv" }, { "table_name": "product_sales", "query_template": "SELECT product_id, SUM(amount) FROM product_sales WHERE sale_time >= %s GROUP BY product_id", "csv_filename": "product_sales_summary_{}.csv", "s3_key": "analytics/sales/{}.csv" }, # 剩下58张表的配置依次往下加... ]
这里的query_template用占位符预留current_time的位置,后续统一传参;csv_filename和s3_key里的{}用来格式化时间戳,方便区分不同批次的文件。
2. 编写通用处理函数
写一个通用函数,接收单张表的配置、核心依赖(数据库引擎、AWS凭证、当前时间戳),完成「查询→转CSV→上传S3」的全流程:
import pandas as pd import boto3 from sqlalchemy import text from io import BytesIO def process_table(table_config, engine, aws_credentials, current_time, bucket_name): # 1. 执行数据库查询(用参数绑定更安全,避免SQL注入) query = text(table_config["query_template"]) with engine.connect() as conn: df = pd.read_sql(query, conn, params=(current_time,)) # 2. 生成目标S3路径 s3_key = table_config["s3_key"].format(current_time) # 3. 内存中生成CSV,避免占用本地磁盘 csv_buffer = BytesIO() df.to_csv(csv_buffer, index=False) csv_buffer.seek(0) # 4. 上传到S3 s3 = boto3.client( 's3', aws_access_key_id=aws_credentials["access_key"], aws_secret_access_key=aws_credentials["secret_key"] ) s3.put_object( Bucket=bucket_name, Key=s3_key, Body=csv_buffer.getvalue() ) print(f"✅ 完成表 {table_config['table_name']} 的导出")
3. 合理管理全局变量,避免全局污染
你提到的engine、AWS凭证这些,尽量不要用全局变量,而是通过参数传递或者封装成类的属性,这样代码更易测试、更清晰:
方式一:用类封装依赖(推荐,适合长期维护)
把所有依赖初始化到类里,处理方法作为类的成员函数,逻辑更内聚:
class TableExporter: def __init__(self, engine, aws_credentials, bucket_name): self.engine = engine self.s3 = boto3.client( 's3', aws_access_key_id=aws_credentials["access_key"], aws_secret_access_key=aws_credentials["secret_key"] ) self.bucket_name = bucket_name def _process_single_table(self, table_config, current_time): # 复用通用查询、转CSV、上传逻辑 query = text(table_config["query_template"]) with self.engine.connect() as conn: df = pd.read_sql(query, conn, params=(current_time,)) csv_buffer = BytesIO() df.to_csv(csv_buffer, index=False) csv_buffer.seek(0) s3_key = table_config["s3_key"].format(current_time) self.s3.put_object( Bucket=self.bucket_name, Key=s3_key, Body=csv_buffer.getvalue() ) print(f"✅ 表 {table_config['table_name']} 处理完成") def process_all_tables(self, table_configs, current_time): # 遍历所有配置,批量处理 for config in table_configs: self._process_single_table(config, current_time)
然后主函数里只需要初始化类,调用批量处理方法:
import os import time from sqlalchemy import create_engine from config import TABLE_CONFIGS def main(): # 从环境变量读取敏感信息,不要硬编码 engine = create_engine(os.getenv("DB_CONNECTION_URL")) aws_credentials = { "access_key": os.getenv("AWS_ACCESS_KEY"), "secret_key": os.getenv("AWS_SECRET_KEY") } current_time = int(time.time()) bucket_name = os.getenv("S3_BUCKET_NAME") exporter = TableExporter(engine, aws_credentials, bucket_name) exporter.process_all_tables(TABLE_CONFIGS, current_time) if __name__ == "__main__": main()
方式二:直接参数传递(适合简单场景)
如果不想用类,也可以直接把依赖作为参数传给通用函数,主函数遍历配置列表调用即可:
def main(): # 初始化依赖 engine = create_engine(os.getenv("DB_CONNECTION_URL")) aws_credentials = {...} current_time = int(time.time()) bucket_name = "your-bucket-name" # 遍历配置批量处理 for config in TABLE_CONFIGS: process_table(config, engine, aws_credentials, current_time, bucket_name)
4. 可选优化:并行处理提升效率
如果60张表串行处理太慢,可以用并行来加速,比如用concurrent.futures.ThreadPoolExecutor(注意数据库连接池的并发限制,不要开太多线程):
from concurrent.futures import ThreadPoolExecutor def main(): # 初始化依赖 engine = create_engine(os.getenv("DB_CONNECTION_URL")) aws_credentials = {...} current_time = int(time.time()) bucket_name = "your-bucket-name" # 用线程池并行处理,线程数根据数据库连接池大小调整 with ThreadPoolExecutor(max_workers=10) as executor: futures = [ executor.submit(process_table, config, engine, aws_credentials, current_time, bucket_name) for config in TABLE_CONFIGS ] # 等待所有任务完成,捕获可能的异常 for future in futures: try: future.result() except Exception as e: print(f"❌ 任务失败: {str(e)}")
5. 代码结构分层(大型项目推荐)
如果后续还要扩展功能,可以把代码分成几个模块,职责更清晰:
config.py:存储表配置、数据库连接信息、S3 bucket名称等database.py:封装数据库查询相关的工具函数s3_utils.py:封装S3上传/下载的工具函数exporter.py:封装核心的表导出逻辑main.py:入口函数,负责初始化依赖和触发批量处理
内容的提问来源于stack exchange,提问作者python_noob




