You need to enable JavaScript to run this app.
导航
Flink SQL CDC 数据同步
最近更新时间:2025.09.18 14:43:30首次发布时间:2025.07.23 16:41:11
复制全文
我的收藏
有用
有用
无用
无用

1. 概述

Flink SQL CDC 源是 Flink 的一组源连接器,通过变更数据捕获(CDC)从不同数据库中摄取变更数据。部分 CDC 源集成了 Debezium 作为捕获数据变更的引擎,因此能够充分利用 Debezium 的能力。本手册将指导您如何使用 Flink CDC 3.x 进行 MySQL 到 ByteHouse-CDW 的数据同步任务。

在 ByteHouse-CDW 场景,Flink CDC 以下功能:

  • ✅ 数据表实时同步
  • ✅ 数据分库分表实时同步
  • ✅ 数据转换支持灵活的实时 ETL 加工
  • ❌ 表结构自动变更(需要提前手动同步 ByteHouse 数据库表变化)

当前 Flink CDC 部分已经支持的数据库版本如下。本文档中将以 MySQL 为例,介绍如何使用 Flink CDC 进行数据同步。

Connector

Database

Driver

mongodb-cdc

MongoDB: 3.6, 4.x, 5.0, 6.0, 6.1, 7.0

MongoDB Driver: 4.11.2

mysql-cdc

MySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
veDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1

JDBC Driver: 8.0.28

postgres-cdc

PostgreSQL: 9.6, 10, 11, 12, 13, 14

JDBC Driver: 42.5.1

sqlserver-cdc

Sqlserver: 2012, 2014, 2016, 2017, 2019

JDBC Driver: 9.4.1.jre8

2. 环境准备

确保您已经:

  1. 开通了流式计算 Flink 版产品,并能够在作业开发中创建 Flink SQL 任务。
  2. 已经在资源管理 - 资源池功能模块购买了按量或者包年包月资源池,可以正常提交 Flink 任务。
  3. Flink 版本需 >= 1.16, 内置支持 Flink CDC 和 ByteHouse CDW Connector。

3. 准备步骤

3.1 基础设置

当前 Flink CDC 支持从 MySQL 同步。需要 MySQL有如下前置条件(本文以云数据库 MySQL 版为例进行 Demo):

  1. MySQL 和 Flink 网络需要联通,建议MySQL 和 Flink 使用同一个 VPC 和子网,并且在 MySQL 为 Flink 的安全组设置白名单。参考创建白名单绑定白名单到实例

注意:火山引擎 MySQL 实例创建后默认不绑定任何白名单,任何 IP 均无法访问该 MySQL 实例。

在 Flink 资源管理 - 资源池 - 资源池详情,中查看资源池网段或者安全组。
Image
将以上查出来的网段或者安全组加入 MySQL 白名单即可。
Image

  1. 另外需要使用 MySQL 创建访问用户,并且赋予用户只读权限以及 REPLICATION SLAVEREPLICATION CLIENT用以访问 binlog。本文创建 flinkcdc 账号。

Image

3.2 MySQL 样例数据

为了测试的目的,可以使用以下 SQL 创建数据库:

-- create database
CREATE DATABASE app_db;

USE app_db;

-- create orders table
CREATE TABLE `orders` (
    `id` INT NOT NULL,
    `price` DECIMAL(10,2) NOT NULL,
    PRIMARY KEY (`id`)
);

可以通过如下 SQL 导入部分 Demo 数据:

-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);

3.3 ByteHouse 库表准备

在此步骤操作前,请确保 ByteHouse 和 Flink 资源池 VPC 相同,并且网络可以联通(可以参考 调试网络连通性),并且设置了合理的权限,以及签发了正确的 API-KEY。

手动创建数据库表:

  1. 目标数据表数据量如果较小(不超过1亿条),建议使用 MySQL 建表语句,在 ByteHouse SQL 工作表中选择 MySQL 方言执行建表语句。

注意:对于存量的 MySQL 实例,可以参考如下命令导出所有数据库建表语句
mysqldump -u <mysql_user> -p --no-data --skip-opt <target_db> > ddl.sql

Image

  1. 数据量超过 1 亿以上的数据表,建议使用 ByteHouse CDW 的建表工具,对数据表参数结合入库、查询等要求进行仔细调优,创建后启动 Flink CDC 任务导入数据。

Image

4. 配置 SQL CDC 同步任务

操作路径:作业开发 - Flink SQL 作业 - 创建作业
参考文档创建 Flink SQL 任务

  1. 使用引导中创建 Flink SQL 作业

Image

  1. 选择 Flink SQL 流式作业,以及选择所需模板

Image

  1. 设置作业的名称、存储目录和 Flink 引擎版本

Image

4.2 上传 MySQL Driver 文件

在 CDC 同步任务创建后,需要参考 使用 JDBC 或者 MySQL-CDC 数据源文档,下载 MySQL Driver(建议 8.0.27 版本),上传到 CDC 任务的参数配置 - 依赖文件中。

  1. 前往 MySQL 官网 ,选择 MySQL 驱动版本 8.0.27,选择操作系统为 “Platform Independent”。

Image

  1. 选择 TAR 包或者 ZIP 包,点击 Download 按钮进行下载。对文件进行解压,找到 mysql-connector-java-8.0.27.jar 文件。
  2. 将解压后的 JAR 包上传到文件管理模块

Image

  1. 在 Flink SQL 任务中引用 JAR 文件。

Image

Flink CDC 详细的配置可以参考以下文档,以下文档只是展示了最简单的配置项:

  1. MySQL (以下示例内容):参考 MySQL CDC

  2. MongoDB:参考 MongoDB CDC

  3. Postgres SQL:参考 Postgres CDC

  4. SQL Server:参考 SQLServer CDC

  5. 创建 Flink MySQL CDC 数据源

CREATE TABLE orders (
    id INT,
    price DECIMAL(10, 2),
    PRIMARY KEY (id) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义主键。
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database-name' = 'mydb',
    'table-name' = 'orders'
);
  1. 创建 ByteHouse-CDW 数据下游,详细参数可参考 ByteHouse CDW
CREATE TABLE orders_bh (
    `id` INT,
    `price` DECIMAL(10, 2),
    PRIMARY KEY (id) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义主键。
) WITH (
    'connector' = 'bytehouse-cdw',
    'jdbc.enable-gateway-connection' = 'true',
    'bytehouse.gateway.region' = 'xxxx',
    'bytehouse.gateway.host' = 'tenant-{TENANT-ID}-{REGION}.bytehouse.ivolces.com',
    'bytehouse.gateway.port' = '19000',
    'bytehouse.gateway.api-token' = '<API_KEY>',
    'bytehouse.gateway.virtual-warehouse' = '<VIRTUAL_WAREHOUSE>',
    'database' = '<BYTEHOUSE_DB>',
    'table-name' = '<BYTEHOUSE_TABLE>'
);

  1. 通过 Insert 语句将数据从 MySQL CDC 实时同步到 ByteHouse-CDW
INSERT INTO orders_bh
SELECT * FROM orders;

5. 上线任务

5.1 任务上线

此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适资源池进行上线。

Image

上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。

5.2 确认任务执行成功

可以进入 ByteHouse-CDW 的 SQL 工作表功能,通过 SQL 查询是否数据已经正确同步到数据库中。
Image

6 任务阶段查询

6.1 确认任务处于全量阶段同步进度:

查看 JMJobManager日志,开始切分 split 的关键字如下:

# 开始切分 split
2025-01-12 22:08:06,122 INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner [] - Start splitting table app_db.con_bt_point_info into chunks...

切分完之后,开始下发 snapshot split,其中 app_db.con_bt_point_info:0 表示第一个 split,根据 split 编号可以看到 split 处理的进度:

2025-01-12 22:08:42,644 INFO  org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - The enumerator assigns split MySqlSnapshotSplit{tableId=app_db.con_bt_point_info, splitId='app_db.con_bt_point_info:0', splitKeyType=[`point_id` BIGINT NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} to subtask 1

6.2 确认任务处于增量阶段同步进度:

查看 JobManager 日志查看分配 binlog split 的 subtask index:

2025-01-13 11:16:20,315 INFO  org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - The enumerator assigns split MySqlBinlogSplit{splitId='binlog-split', tables=[], offset={ts_sec=0, file=binlog.000426, pos=520322, kind=SPECIFIC, gtids=da643d88-c37e-11ef-a8e8-5254ac1c7b41:1-1521914, row=0, event=0}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, kind=NON_STOPPING, row=0, event=0}, isSuspended=false} to subtask 0

查找 subtask 0 对应的 TM 日志:

搜索 TaskManager 日志里的关键字,查看 binlog 处理进度,其中 file 和 pos 分别是读取的 binlog 文件和位置:

2025-01-13 11:21:19,039 INFO  org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader [] - Binlog offset for tables [app_db.shipments, app_db.shipments90, app_db.shipments91, app_db.shipments89, app_db.shipments81, app_db.shipments82, app_db.shipments83, app_db.shipments84, app_db.shipments85, app_db.shipments86, app_db.shipments87, app_db.shipments88, app_db.shipments12] on checkpoint 2: {transaction_id=null, ts_sec=0, file=binlog.000426, pos=687715, kind=SPECIFIC, gtids=da643d88-c37e-11ef-a8e8-5254ac1c7b41:1-1522412, row=0, event=0, server_id=2051901732}

7. 常见问题

7.1 JDBC Driver 找不到错误

问题现象:如果任务启动失败,在日志中出现如下异常信息 Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

Image
解决方案:该问题是因为合规性原因,Flink CDC 没有内置 MySQL Driver,请参考 使用 JDBC 或者 MySQL-CDC 数据源 文档,下载 MySQL 官方 Driver (建议 8.0.27 版本),并且上传到 Flink CDC 任务的依赖文件中。

Image

7.2 MySQL Datetime 和 ByteHouse Datetime 类型时间有差异

问题现象:MySQL 的 datetime 是一个无时区信息的数据类型,很多用户会用这个类型存取本地时间。比如 UTC+8 时区的 2025-01-09 10:00:00。因为无法确认时区,所以 Flink & ByteHouse-CDW 会在转型过程中默认使用 UTC 时间戳。所以在写入之后,使用 UTC+8 的时间再去查看这个时间结果为 2025-01-09 18:00:00。导致与 MySQL 中时间出现差异。
解决方案:

  1. 在 MySQL 中避免使用 datetime 类型字段,而采用有时区信息的 timestamp 类型字段。这样 Flink 会结合 server-time-zone 信息,进行时区转换
  2. 可以通过 Flink CDC 中设置 timestamp-offset: -8h,可以在同步任务过程中,自动将时区进行偏移,减去 8 小时。注意这个时区偏移仅对 MySQL Datetime 类型生效。