You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何高效合并MySQL中多数据库内的同名同结构inventory表

嘿,我之前刚好碰到过一模一样的情况——手动写22个UNION ALL不仅麻烦,要是以后新增数据库,维护起来也头疼。给你几个更简洁高效的方案,按实用性排序:

方案1:用存储过程动态生成查询语句(适合重复使用)

这个方案会自动遍历所有非系统数据库,拼接出完整的UNION ALL查询,以后新增同结构数据库也不用修改代码,直接调用存储过程就行:

DELIMITER //
CREATE PROCEDURE GetAllInventoryData()
BEGIN
    DECLARE done INT DEFAULT FALSE;
    DECLARE db_name VARCHAR(255);
    -- 游标获取所有非系统数据库
    DECLARE cur CURSOR FOR 
        SELECT schema_name 
        FROM information_schema.schemata 
        WHERE schema_name NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys');
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;

    SET @sql = '';

    OPEN cur;
    read_loop: LOOP
        FETCH cur INTO db_name;
        IF done THEN
            LEAVE read_loop;
        END IF;
        -- 拼接SELECT语句,第一个语句不用加UNION ALL
        IF @sql = '' THEN
            SET @sql = CONCAT('SELECT * FROM `', db_name, '`.`inventory`');
        ELSE
            SET @sql = CONCAT(@sql, ' UNION ALL SELECT * FROM `', db_name, '`.`inventory`');
        END IF;
    END LOOP;
    CLOSE cur;

    -- 执行动态生成的SQL
    PREPARE stmt FROM @sql;
    EXECUTE stmt;
    DEALLOCATE PREPARE stmt;
END //
DELIMITER ;

-- 调用存储过程获取所有数据
CALL GetAllInventoryData();
方案2:用GROUP_CONCAT快速生成UNION ALL语句(适合临时一次性操作)

如果只是临时执行一次查询,不用创建存储过程,用GROUP_CONCAT可以一行生成完整的UNION ALL语句,非常简洁:

-- 先临时调整GROUP_CONCAT的长度限制(默认可能不够长,23个库完全没问题)
SET SESSION group_concat_max_len = 1000000;

-- 生成并执行动态SQL
SET @sql = (
    SELECT GROUP_CONCAT(
        CONCAT('SELECT * FROM `', schema_name, '`.`inventory`')
        SEPARATOR ' UNION ALL '
    )
    FROM information_schema.schemata
    WHERE schema_name NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys')
);

PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
方案3:用脚本语言批量查询合并(适合需要后续数据处理的场景)

如果需要把数据导出到文件、做进一步分析,用Python/Shell这类脚本会更灵活。比如用Python的mysql-connector实现:

import mysql.connector
from mysql.connector import Error

def fetch_all_inventory():
    try:
        # 连接数据库(替换成你的连接信息)
        conn = mysql.connector.connect(
            host="your_host",
            user="your_username",
            password="your_password"
        )
        if conn.is_connected():
            cursor = conn.cursor(dictionary=True)
            
            # 获取所有目标数据库
            cursor.execute("""
                SELECT schema_name 
                FROM information_schema.schemata 
                WHERE schema_name NOT IN ('mysql', 'information_schema', 'performance_schema', 'sys')
            """)
            databases = cursor.fetchall()
            
            all_inventory_data = []
            # 遍历每个数据库查询inventory表
            for db in databases:
                db_name = db["schema_name"]
                cursor.execute(f"SELECT * FROM `{db_name}`.`inventory`")
                rows = cursor.fetchall()
                all_inventory_data.extend(rows)
            
            # 这里可以根据需求处理数据,比如打印、写入CSV
            for row in all_inventory_data:
                print(row)
                
    except Error as e:
        print(f"数据库连接/查询错误: {e}")
    finally:
        if conn.is_connected():
            cursor.close()
            conn.close()

fetch_all_inventory()

总结

  • 如果需要重复执行查询,优先选存储过程,一劳永逸;
  • 临时一次性操作,用GROUP_CONCAT的方法最快捷;
  • 需要对数据做后续处理(导出、分析),用脚本语言更灵活。

内容的提问来源于stack exchange,提问作者newby100

火山引擎 最新活动