Go语言实现PostgreSQL到MySQL的users表增量迁移(不存在则插入)
在Go中实现PostgreSQL到MySQL的users表增量同步
嘿,这个增量同步的需求我之前也帮人解决过,用Go来实现其实逻辑很清晰,我给你一步步拆解怎么操作:
1. 核心思路先明确
我们要做的就是:从PostgreSQL的users表拉取数据,和MySQL的users表对比,只插入不存在的记录。这里的“不存在”得先定好判断标准——比如用user_id或者email这类唯一标识字段,我下面就假设用user_id作为判断依据,你可以根据自己的表结构调整。
2. 先装必要的驱动
Go操作MySQL和PostgreSQL需要对应的驱动包,在终端执行这两个命令安装:
go get github.com/go-sql-driver/mysql go get github.com/lib/pq
3. 同时连接两个数据库
你已经有MySQL的连接代码了,我把PostgreSQL的连接逻辑补上,别忘了用Ping()校验连接是否真的成功:
package main import ( "database/sql" "fmt" "log" _ "github.com/go-sql-driver/mysql" _ "github.com/lib/pq" ) // 定义User结构体,要和两张表的字段完全匹配 type User struct { UserID int `db:"user_id"` Username string `db:"username"` Email string `db:"email"` // 其他字段根据你的实际表结构补充 } func main() { // 连接MySQL,替换成你的实际连接信息 mysqlDB, err := sql.Open("mysql", "root:root@tcp(127.0.0.1:3306)/dbusers?parseTime=true") if err != nil { log.Fatalf("连接MySQL失败: %v", err) } defer mysqlDB.Close() // 校验MySQL连接是否可用 if err := mysqlDB.Ping(); err != nil { log.Fatalf("MySQL连接校验失败: %v", err) } // 连接PostgreSQL,替换成你的实际连接信息 pgDB, err := sql.Open("postgres", "host=127.0.0.1 port=5432 user=postgres password=postgres dbname=dbusers sslmode=disable") if err != nil { log.Fatalf("连接PostgreSQL失败: %v", err) } defer pgDB.Close() // 校验PG连接是否可用 if err := pgDB.Ping(); err != nil { log.Fatalf("PostgreSQL连接校验失败: %v", err) }
4. 从PostgreSQL读取所有用户数据
接下来把PG里的users表数据读出来,用我们定义的User结构体接收:
// 从PostgreSQL读取users表数据 var pgUsers []User rows, err := pgDB.Query("SELECT user_id, username, email FROM users") if err != nil { log.Fatalf("查询PG users表失败: %v", err) } defer rows.Close() for rows.Next() { var u User if err := rows.Scan(&u.UserID, &u.Username, &u.Email); err != nil { log.Printf("读取PG用户数据失败: %v", err) continue } pgUsers = append(pgUsers, u) } if err := rows.Err(); err != nil { log.Fatalf("遍历PG数据行出错: %v", err) }
5. 增量插入到MySQL
这里给你两种实现方式,按需选择:
方式一:逐条检查插入(简单易理解,适合数据量小的场景)
// 逐条处理PG用户,仅插入MySQL中不存在的记录 for _, u := range pgUsers { // 先检查MySQL中是否存在该user_id var exists bool err := mysqlDB.QueryRow("SELECT EXISTS(SELECT 1 FROM users WHERE user_id = ?)", u.UserID).Scan(&exists) if err != nil { log.Printf("检查MySQL用户是否存在失败: %v", err) continue } if !exists { // 插入不存在的用户 _, err := mysqlDB.Exec("INSERT INTO users (user_id, username, email) VALUES (?, ?, ?)", u.UserID, u.Username, u.Email) if err != nil { log.Printf("插入用户%d到MySQL失败: %v", u.UserID, err) } else { fmt.Printf("成功插入用户%d到MySQL\n", u.UserID) } } else { fmt.Printf("用户%d已存在于MySQL,跳过\n", u.UserID) } }
方式二:批量优化(效率更高,适合数据量大的场景)
如果PG里的数据很多,逐条检查会很慢,可以先把MySQL中已有的user_id全部拉出来,在Go代码里过滤后批量插入:
// 先获取MySQL中所有已存在的user_id idMap := make(map[int]bool) rows, err := mysqlDB.Query("SELECT user_id FROM users") if err != nil { log.Fatalf("查询MySQL已存在的user_id失败: %v", err) } defer rows.Close() for rows.Next() { var id int if err := rows.Scan(&id); err != nil { log.Printf("读取MySQL已存在user_id失败: %v", err) continue } idMap[id] = true } if err := rows.Err(); err != nil { log.Fatalf("遍历MySQL user_id行出错: %v", err) } // 过滤出PG中不存在于MySQL的用户 var toInsert []User for _, u := range pgUsers { if !idMap[u.UserID] { toInsert = append(toInsert, u) } } // 批量插入(用事务保证原子性) if len(toInsert) > 0 { stmt, err := mysqlDB.Prepare("INSERT INTO users (user_id, username, email) VALUES (?, ?, ?)") if err != nil { log.Fatalf("准备批量插入语句失败: %v", err) } defer stmt.Close() tx, err := mysqlDB.Begin() if err != nil { log.Fatalf("开启事务失败: %v", err) } for _, u := range toInsert { _, err := tx.Stmt(stmt).Exec(u.UserID, u.Username, u.Email) if err != nil { log.Printf("批量插入用户%d失败: %v", u.UserID, err) // 插入失败就回滚事务 if rollbackErr := tx.Rollback(); rollbackErr != nil { log.Fatalf("回滚事务失败: %v", rollbackErr) } return } } // 提交事务 if err := tx.Commit(); err != nil { log.Fatalf("提交事务失败: %v", err) } fmt.Printf("成功批量插入%d个用户到MySQL\n", len(toInsert)) } else { fmt.Println("没有新用户需要插入") } }
6. 一些重要的注意事项
- 字段匹配:一定要保证
User结构体的字段和两张表的字段(包括名称、数据类型)完全对应,不然读取或插入时会出错。 - 唯一键兜底:建议在MySQL的
users表给判断用的字段(比如user_id)加唯一索引,即使代码判断漏了,数据库层面也能防止重复插入。 - 事务的重要性:批量插入时一定要用事务,确保要么全部插入成功,要么全部回滚,避免数据不一致。
- 大数据量优化:如果PG里的数据特别多,建议分页读取(比如用
LIMIT和OFFSET),避免一次性把所有数据加载到内存导致内存溢出。
内容的提问来源于stack exchange,提问作者Medone




