如何高效合并MySQL中多数据库内的同名同结构inventory表
嘿,我之前刚好碰到过一模一样的情况——手动写22个UNION ALL不仅麻烦,要是以后新增数据库,维护起来也头疼。给你几个更简洁高效的方案,按实用性排序:
方案1:用存储过程动态生成查询语句(适合重复使用)
这个方案会自动遍历所有非系统数据库,拼接出完整的UNION ALL查询,以后新增同结构数据库也不用修改代码,直接调用存储过程就行:
DELIMITER // CREATE PROCEDURE GetAllInventoryData() BEGIN DECLARE done INT DEFAULT FALSE; DECLARE db_name VARCHAR(255); -- 游标获取所有非系统数据库 DECLARE cur CURSOR FOR SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys'); DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; SET @sql = ''; OPEN cur; read_loop: LOOP FETCH cur INTO db_name; IF done THEN LEAVE read_loop; END IF; -- 拼接SELECT语句,第一个语句不用加UNION ALL IF @sql = '' THEN SET @sql = CONCAT('SELECT * FROM `', db_name, '`.`inventory`'); ELSE SET @sql = CONCAT(@sql, ' UNION ALL SELECT * FROM `', db_name, '`.`inventory`'); END IF; END LOOP; CLOSE cur; -- 执行动态生成的SQL PREPARE stmt FROM @sql; EXECUTE stmt; DEALLOCATE PREPARE stmt; END // DELIMITER ; -- 调用存储过程获取所有数据 CALL GetAllInventoryData();
方案2:用GROUP_CONCAT快速生成UNION ALL语句(适合临时一次性操作)
如果只是临时执行一次查询,不用创建存储过程,用GROUP_CONCAT可以一行生成完整的UNION ALL语句,非常简洁:
-- 先临时调整GROUP_CONCAT的长度限制(默认可能不够长,23个库完全没问题) SET SESSION group_concat_max_len = 1000000; -- 生成并执行动态SQL SET @sql = ( SELECT GROUP_CONCAT( CONCAT('SELECT * FROM `', schema_name, '`.`inventory`') SEPARATOR ' UNION ALL ' ) FROM information_schema.schemata WHERE schema_name NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys') ); PREPARE stmt FROM @sql; EXECUTE stmt; DEALLOCATE PREPARE stmt;
方案3:用脚本语言批量查询合并(适合需要后续数据处理的场景)
如果需要把数据导出到文件、做进一步分析,用Python/Shell这类脚本会更灵活。比如用Python的mysql-connector实现:
import mysql.connector from mysql.connector import Error def fetch_all_inventory(): try: # 连接数据库(替换成你的连接信息) conn = mysql.connector.connect( host="your_host", user="your_username", password="your_password" ) if conn.is_connected(): cursor = conn.cursor(dictionary=True) # 获取所有目标数据库 cursor.execute(""" SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys') """) databases = cursor.fetchall() all_inventory_data = [] # 遍历每个数据库查询inventory表 for db in databases: db_name = db["schema_name"] cursor.execute(f"SELECT * FROM `{db_name}`.`inventory`") rows = cursor.fetchall() all_inventory_data.extend(rows) # 这里可以根据需求处理数据,比如打印、写入CSV for row in all_inventory_data: print(row) except Error as e: print(f"数据库连接/查询错误: {e}") finally: if conn.is_connected(): cursor.close() conn.close() fetch_all_inventory()
总结
- 如果需要重复执行查询,优先选存储过程,一劳永逸;
- 临时一次性操作,用GROUP_CONCAT的方法最快捷;
- 需要对数据做后续处理(导出、分析),用脚本语言更灵活。
内容的提问来源于stack exchange,提问作者newby100




