You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Go语言实现PostgreSQL到MySQL的users表增量迁移(不存在则插入)

在Go中实现PostgreSQL到MySQL的users表增量同步

嘿,这个增量同步的需求我之前也帮人解决过,用Go来实现其实逻辑很清晰,我给你一步步拆解怎么操作:

1. 核心思路先明确

我们要做的就是:从PostgreSQL的users表拉取数据,和MySQL的users表对比,只插入不存在的记录。这里的“不存在”得先定好判断标准——比如用user_id或者email这类唯一标识字段,我下面就假设用user_id作为判断依据,你可以根据自己的表结构调整。

2. 先装必要的驱动

Go操作MySQL和PostgreSQL需要对应的驱动包,在终端执行这两个命令安装:

go get github.com/go-sql-driver/mysql
go get github.com/lib/pq

3. 同时连接两个数据库

你已经有MySQL的连接代码了,我把PostgreSQL的连接逻辑补上,别忘了用Ping()校验连接是否真的成功:

package main

import (
	"database/sql"
	"fmt"
	"log"

	_ "github.com/go-sql-driver/mysql"
	_ "github.com/lib/pq"
)

// 定义User结构体,要和两张表的字段完全匹配
type User struct {
	UserID   int    `db:"user_id"`
	Username string `db:"username"`
	Email    string `db:"email"`
	// 其他字段根据你的实际表结构补充
}

func main() {
	// 连接MySQL,替换成你的实际连接信息
	mysqlDB, err := sql.Open("mysql", "root:root@tcp(127.0.0.1:3306)/dbusers?parseTime=true")
	if err != nil {
		log.Fatalf("连接MySQL失败: %v", err)
	}
	defer mysqlDB.Close()
	// 校验MySQL连接是否可用
	if err := mysqlDB.Ping(); err != nil {
		log.Fatalf("MySQL连接校验失败: %v", err)
	}

	// 连接PostgreSQL,替换成你的实际连接信息
	pgDB, err := sql.Open("postgres", "host=127.0.0.1 port=5432 user=postgres password=postgres dbname=dbusers sslmode=disable")
	if err != nil {
		log.Fatalf("连接PostgreSQL失败: %v", err)
	}
	defer pgDB.Close()
	// 校验PG连接是否可用
	if err := pgDB.Ping(); err != nil {
		log.Fatalf("PostgreSQL连接校验失败: %v", err)
	}

4. 从PostgreSQL读取所有用户数据

接下来把PG里的users表数据读出来,用我们定义的User结构体接收:

// 从PostgreSQL读取users表数据
	var pgUsers []User
	rows, err := pgDB.Query("SELECT user_id, username, email FROM users")
	if err != nil {
		log.Fatalf("查询PG users表失败: %v", err)
	}
	defer rows.Close()

	for rows.Next() {
		var u User
		if err := rows.Scan(&u.UserID, &u.Username, &u.Email); err != nil {
			log.Printf("读取PG用户数据失败: %v", err)
			continue
		}
		pgUsers = append(pgUsers, u)
	}
	if err := rows.Err(); err != nil {
		log.Fatalf("遍历PG数据行出错: %v", err)
	}

5. 增量插入到MySQL

这里给你两种实现方式,按需选择:

方式一:逐条检查插入(简单易理解,适合数据量小的场景)

// 逐条处理PG用户,仅插入MySQL中不存在的记录
	for _, u := range pgUsers {
		// 先检查MySQL中是否存在该user_id
		var exists bool
		err := mysqlDB.QueryRow("SELECT EXISTS(SELECT 1 FROM users WHERE user_id = ?)", u.UserID).Scan(&exists)
		if err != nil {
			log.Printf("检查MySQL用户是否存在失败: %v", err)
			continue
		}

		if !exists {
			// 插入不存在的用户
			_, err := mysqlDB.Exec("INSERT INTO users (user_id, username, email) VALUES (?, ?, ?)", u.UserID, u.Username, u.Email)
			if err != nil {
				log.Printf("插入用户%d到MySQL失败: %v", u.UserID, err)
			} else {
				fmt.Printf("成功插入用户%d到MySQL\n", u.UserID)
			}
		} else {
			fmt.Printf("用户%d已存在于MySQL,跳过\n", u.UserID)
		}
	}

方式二:批量优化(效率更高,适合数据量大的场景)

如果PG里的数据很多,逐条检查会很慢,可以先把MySQL中已有的user_id全部拉出来,在Go代码里过滤后批量插入:

// 先获取MySQL中所有已存在的user_id
	idMap := make(map[int]bool)
	rows, err := mysqlDB.Query("SELECT user_id FROM users")
	if err != nil {
		log.Fatalf("查询MySQL已存在的user_id失败: %v", err)
	}
	defer rows.Close()

	for rows.Next() {
		var id int
		if err := rows.Scan(&id); err != nil {
			log.Printf("读取MySQL已存在user_id失败: %v", err)
			continue
		}
		idMap[id] = true
	}
	if err := rows.Err(); err != nil {
		log.Fatalf("遍历MySQL user_id行出错: %v", err)
	}

	// 过滤出PG中不存在于MySQL的用户
	var toInsert []User
	for _, u := range pgUsers {
		if !idMap[u.UserID] {
			toInsert = append(toInsert, u)
		}
	}

	// 批量插入(用事务保证原子性)
	if len(toInsert) > 0 {
		stmt, err := mysqlDB.Prepare("INSERT INTO users (user_id, username, email) VALUES (?, ?, ?)")
		if err != nil {
			log.Fatalf("准备批量插入语句失败: %v", err)
		}
		defer stmt.Close()

		tx, err := mysqlDB.Begin()
		if err != nil {
			log.Fatalf("开启事务失败: %v", err)
		}

		for _, u := range toInsert {
			_, err := tx.Stmt(stmt).Exec(u.UserID, u.Username, u.Email)
			if err != nil {
				log.Printf("批量插入用户%d失败: %v", u.UserID, err)
				// 插入失败就回滚事务
				if rollbackErr := tx.Rollback(); rollbackErr != nil {
					log.Fatalf("回滚事务失败: %v", rollbackErr)
				}
				return
			}
		}

		// 提交事务
		if err := tx.Commit(); err != nil {
			log.Fatalf("提交事务失败: %v", err)
		}
		fmt.Printf("成功批量插入%d个用户到MySQL\n", len(toInsert))
	} else {
		fmt.Println("没有新用户需要插入")
	}
}

6. 一些重要的注意事项

  • 字段匹配:一定要保证User结构体的字段和两张表的字段(包括名称、数据类型)完全对应,不然读取或插入时会出错。
  • 唯一键兜底:建议在MySQL的users表给判断用的字段(比如user_id)加唯一索引,即使代码判断漏了,数据库层面也能防止重复插入。
  • 事务的重要性:批量插入时一定要用事务,确保要么全部插入成功,要么全部回滚,避免数据不一致。
  • 大数据量优化:如果PG里的数据特别多,建议分页读取(比如用LIMITOFFSET),避免一次性把所有数据加载到内存导致内存溢出。

内容的提问来源于stack exchange,提问作者Medone

火山引擎 最新活动