You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
ByteHouse-CDW 实时云数仓开发
Flink SQL CDC 数据同步
复制全文
下载 pdf
Flink SQL CDC 数据同步

1. 概述

Flink SQL CDC 源是 Flink 的一组源连接器,通过变更数据捕获(CDC)从不同数据库中摄取变更数据。部分 CDC 源集成了 Debezium 作为捕获数据变更的引擎,因此能够充分利用 Debezium 的能力。本手册将指导您如何使用 Flink CDC 3.x 进行 MySQL 到 ByteHouse-CDW 的数据同步任务。

在 ByteHouse-CDW 场景,Flink CDC 以下功能:

  • ✅ 数据表实时同步
  • ✅ 数据分库分表实时同步
  • ✅ 数据转换支持灵活的实时 ETL 加工
  • ❌ 表结构自动变更(需要提前手动同步 ByteHouse 数据库表变化)

当前 Flink CDC 部分已经支持的数据库版本如下。本文档中将以 MySQL 为例,介绍如何使用 Flink CDC 进行数据同步。

Connector

Database

Driver

mongodb-cdc

MongoDB: 3.6, 4.x, 5.0, 6.0, 6.1, 7.0

MongoDB Driver: 4.11.2

mysql-cdc

MySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
veDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1

JDBC Driver: 8.0.28

postgres-cdc

PostgreSQL: 9.6, 10, 11, 12, 13, 14

JDBC Driver: 42.5.1

sqlserver-cdc

Sqlserver: 2012, 2014, 2016, 2017, 2019

JDBC Driver: 9.4.1.jre8

2. 环境准备

确保您已经:

  1. 开通了流式计算 Flink 版产品,并能够在作业开发中创建 Flink SQL 任务。
  2. 已经在资源管理 - 资源池功能模块购买了按量或者包年包月资源池,可以正常提交 Flink 任务。
  3. Flink 版本需 >= 1.16, 内置支持 Flink CDC 和 ByteHouse CDW Connector。

3. 准备步骤

3.1 基础设置

当前 Flink CDC 支持从 MySQL 同步。需要 MySQL有如下前置条件(本文以云数据库 MySQL 版为例进行 Demo):

  1. MySQL 和 Flink 网络需要联通,建议MySQL 和 Flink 使用同一个 VPC 和子网,并且在 MySQL 为 Flink 的安全组设置白名单。参考创建白名单绑定白名单到实例

注意:火山引擎 MySQL 实例创建后默认不绑定任何白名单,任何 IP 均无法访问该 MySQL 实例。

在 Flink 资源管理 - 资源池 - 资源池详情,中查看资源池网段或者安全组。
Image
将以上查出来的网段或者安全组加入 MySQL 白名单即可。
Image

  1. 另外需要使用 MySQL 创建访问用户,并且赋予用户只读权限以及 REPLICATION SLAVEREPLICATION CLIENT用以访问 binlog。本文创建 flinkcdc 账号。

Image

3.2 MySQL 样例数据

为了测试的目的,可以使用以下 SQL 创建数据库:

-- create database
CREATE DATABASE app_db;

USE app_db;

-- create orders table
CREATE TABLE `orders` (
    `id` INT NOT NULL,
    `price` DECIMAL(10,2) NOT NULL,
    PRIMARY KEY (`id`)
);

可以通过如下 SQL 导入部分 Demo 数据:

-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);

3.3 ByteHouse 库表准备

在此步骤操作前,请确保 ByteHouse 和 Flink 资源池 VPC 相同,并且网络可以联通(可以参考 调试网络连通性),并且设置了合理的权限,以及签发了正确的 API-KEY。

手动创建数据库表:

  1. 目标数据表数据量如果较小(不超过1亿条),建议使用 MySQL 建表语句,在 ByteHouse SQL 工作表中选择 MySQL 方言执行建表语句。

注意:对于存量的 MySQL 实例,可以参考如下命令导出所有数据库建表语句
mysqldump -u <mysql_user> -p --no-data --skip-opt <target_db> > ddl.sql

Image

  1. 数据量超过 1 亿以上的数据表,建议使用 ByteHouse CDW 的建表工具,对数据表参数结合入库、查询等要求进行仔细调优,创建后启动 Flink CDC 任务导入数据。

Image

4. 配置 SQL CDC 同步任务

操作路径:作业开发 - Flink SQL 作业 - 创建作业
参考文档创建 Flink SQL 任务

  1. 使用引导中创建 Flink SQL 作业

Image

  1. 选择 Flink SQL 流式作业,以及选择所需模板

Image

  1. 设置作业的名称、存储目录和 Flink 引擎版本

Image

4.2 上传 MySQL Driver 文件

在 CDC 同步任务创建后,需要参考 使用 JDBC 或者 MySQL-CDC 数据源文档,下载 MySQL Driver(建议 8.0.27 版本),上传到 CDC 任务的参数配置 - 依赖文件中。

  1. 前往 MySQL 官网 ,选择 MySQL 驱动版本 8.0.27,选择操作系统为 “Platform Independent”。

Image

  1. 选择 TAR 包或者 ZIP 包,点击 Download 按钮进行下载。对文件进行解压,找到 mysql-connector-java-8.0.27.jar 文件。
  2. 将解压后的 JAR 包上传到文件管理模块

Image

  1. 在 Flink SQL 任务中引用 JAR 文件。

Image

Flink CDC 详细的配置可以参考以下文档,以下文档只是展示了最简单的配置项:

  1. MySQL (以下示例内容):参考 MySQL CDC

  2. MongoDB:参考 MongoDB CDC

  3. Postgres SQL:参考 Postgres CDC

  4. SQL Server:参考 SQLServer CDC

  5. 创建 Flink MySQL CDC 数据源

CREATE TABLE orders (
    id INT,
    price DECIMAL(10, 2),
    PRIMARY KEY (id) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义主键。
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database-name' = 'mydb',
    'table-name' = 'orders'
);
  1. 创建 ByteHouse-CDW 数据下游,详细参数可参考 ByteHouse CDW
CREATE TABLE orders_bh (
    `id` INT,
    `price` DECIMAL(10, 2),
    PRIMARY KEY (id) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义主键。
) WITH (
    'connector' = 'bytehouse-cdw',
    'jdbc.enable-gateway-connection' = 'true',
    'bytehouse.gateway.region' = 'xxxx',
    'bytehouse.gateway.host' = 'tenant-{TENANT-ID}-{REGION}.bytehouse.ivolces.com',
    'bytehouse.gateway.port' = '19000',
    'bytehouse.gateway.api-token' = '<API_KEY>',
    'bytehouse.gateway.virtual-warehouse' = '<VIRTUAL_WAREHOUSE>',
    'database' = '<BYTEHOUSE_DB>',
    'table-name' = '<BYTEHOUSE_TABLE>'
);

  1. 通过 Insert 语句将数据从 MySQL CDC 实时同步到 ByteHouse-CDW
INSERT INTO orders_bh
SELECT * FROM orders;

5. 上线任务

5.1 任务上线

此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适资源池进行上线。

Image

上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。

5.2 确认任务执行成功

可以进入 ByteHouse-CDW 的 SQL 工作表功能,通过 SQL 查询是否数据已经正确同步到数据库中。
Image

6 任务阶段查询

6.1 确认任务处于全量阶段同步进度:

查看 JMJobManager日志,开始切分 split 的关键字如下:

# 开始切分 split
2025-01-12 22:08:06,122 INFO  org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner [] - Start splitting table app_db.con_bt_point_info into chunks...

切分完之后,开始下发 snapshot split,其中 app_db.con_bt_point_info:0 表示第一个 split,根据 split 编号可以看到 split 处理的进度:

2025-01-12 22:08:42,644 INFO  org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - The enumerator assigns split MySqlSnapshotSplit{tableId=app_db.con_bt_point_info, splitId='app_db.con_bt_point_info:0', splitKeyType=[`point_id` BIGINT NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} to subtask 1

6.2 确认任务处于增量阶段同步进度:

查看 JobManager 日志查看分配 binlog split 的 subtask index:

2025-01-13 11:16:20,315 INFO  org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator [] - The enumerator assigns split MySqlBinlogSplit{splitId='binlog-split', tables=[], offset={ts_sec=0, file=binlog.000426, pos=520322, kind=SPECIFIC, gtids=da643d88-c37e-11ef-a8e8-5254ac1c7b41:1-1521914, row=0, event=0}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, kind=NON_STOPPING, row=0, event=0}, isSuspended=false} to subtask 0

查找 subtask 0 对应的 TM 日志:

搜索 TaskManager 日志里的关键字,查看 binlog 处理进度,其中 file 和 pos 分别是读取的 binlog 文件和位置:

2025-01-13 11:21:19,039 INFO  org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader [] - Binlog offset for tables [app_db.shipments, app_db.shipments90, app_db.shipments91, app_db.shipments89, app_db.shipments81, app_db.shipments82, app_db.shipments83, app_db.shipments84, app_db.shipments85, app_db.shipments86, app_db.shipments87, app_db.shipments88, app_db.shipments12] on checkpoint 2: {transaction_id=null, ts_sec=0, file=binlog.000426, pos=687715, kind=SPECIFIC, gtids=da643d88-c37e-11ef-a8e8-5254ac1c7b41:1-1522412, row=0, event=0, server_id=2051901732}

7. 常见问题

7.1 JDBC Driver 找不到错误

问题现象:如果任务启动失败,在日志中出现如下异常信息 Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

Image
解决方案:该问题是因为合规性原因,Flink CDC 没有内置 MySQL Driver,请参考 使用 JDBC 或者 MySQL-CDC 数据源 文档,下载 MySQL 官方 Driver (建议 8.0.27 版本),并且上传到 Flink CDC 任务的依赖文件中。

Image

7.2 MySQL Datetime 和 ByteHouse Datetime 类型时间有差异

问题现象:MySQL 的 datetime 是一个无时区信息的数据类型,很多用户会用这个类型存取本地时间。比如 UTC+8 时区的 2025-01-09 10:00:00。因为无法确认时区,所以 Flink & ByteHouse-CDW 会在转型过程中默认使用 UTC 时间戳。所以在写入之后,使用 UTC+8 的时间再去查看这个时间结果为 2025-01-09 18:00:00。导致与 MySQL 中时间出现差异。
解决方案:

  1. 在 MySQL 中避免使用 datetime 类型字段,而采用有时区信息的 timestamp 类型字段。这样 Flink 会结合 server-time-zone 信息,进行时区转换
  2. 可以通过 Flink CDC 中设置 timestamp-offset: -8h,可以在同步任务过程中,自动将时区进行偏移,减去 8 小时。注意这个时区偏移仅对 MySQL Datetime 类型生效。
最近更新时间:2025.09.18 14:43:30
这个页面对您有帮助吗?
有用
有用
无用
无用