Flink SQL CDC 数据源是 Apache Flink 支持高效捕获和处理数据库变更数据,实现全量与增量数据的无缝同步。而 Kafka 是大规模、高吞吐的消息队列,可以作为 CDC 数据同步的中间媒介,实现数据订阅、跨云同步等多种场景。本手册将指导您如何使用 Flink SQL 进行 MySQL 到 Kafka 的数据同步任务。
注意,除了 MySQL 之外以下是支持的 CDC 数据源以及数据库版本:
Connector | Database | Driver |
|---|---|---|
MongoDB Driver: 4.11.2 | ||
JDBC Driver: 8.0.28 | ||
JDBC Driver: 42.5.1 | ||
JDBC Driver: 9.4.1.jre8 |
适用场景:
确保您已经:
当前 Flink CDC 支持从 MySQL 自动同步。需要 MySQL有如下前置条件(本文以云数据库 MySQL 版为例进行 Demo):
注意:火山引擎 MySQL 实例创建后默认不绑定任何白名单,任何 IP 均无法访问该 MySQL 实例。
REPLICATION SLAVE、REPLICATION CLIENT用以访问 binlog。本文创建 flinkcdc 账号。为了测试的目的,可以使用以下 SQL 创建数据库,并且导入部分 Demo 数据:
-- create database CREATE DATABASE app_db; USE app_db; -- create orders table CREATE TABLE `orders` ( `id` INT NOT NULL, `price` DECIMAL(10,2) NOT NULL, PRIMARY KEY (`id`) ); -- insert records INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00); -- create shipments table CREATE TABLE `shipments` ( `id` INT NOT NULL, `city` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- insert records INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian'); -- create products table CREATE TABLE `products` ( `id` INT NOT NULL, `product` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- insert records INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
可以参考创建 Kafka Topic,在消息队列 Kafka 版中创建 Topic ,用来接收投递的 CDC 数据。本文中采用 flink-cdc作为 Topic 名称。
操作路径:作业开发 - Flink SQL 作业 - 创建作业
参考文档:开发 Flink SQL 任务
-- mysql-cdc 同步 orders 数据表 CREATE TEMPORARY TABLE `orders` ( `db_name` STRING METADATA FROM 'database_name' VIRTUAL, `table_name` STRING METADATA FROM 'table_name' VIRTUAL, `operation_ts` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, `id` int NOT NULL, `price` decimal(10,2) NOT NULL, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '${secret_values.mysql-host}', 'port' = '3306', 'username' = '${secret_values.mysql-username}', 'password' = '${secret_values.mysql-passwd}', 'database-name' = 'app_db', 'table-name' = 'orders' ); -- 投递 Kafka 下游数据表 CREATE TEMPORARY TABLE `orders_kfk` ( `db_name` STRING, `table_name` STRING, `operation_ts` TIMESTAMP_LTZ(3), `id` int NOT NULL, `price` decimal(10,2) NOT NULL, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', -- Topic 需要提前创建 'topic' = 'flink-cdc', 'properties.bootstrap.servers' = '${secret_values.kafka-host}', 'properties.group.id' = 'testGroup', -- 可以按照 db 和 table 作为分区 key 字段,保证同一个数据表的数据在同一个分区中 'key.fields' = 'db_name;table_name', 'key.format' = 'csv', 'value.format' = 'debezium-json' ); INSERT INTO `orders_kfk` SELECT * FROM `orders`;
注意:
key.fields可以指定用来分区的字段,能够保证同样的数据表会投递到同一个分区内,并且在分区内保证有序。具体参数可以参考 Kafka/BMQ 。在 SQL 同步任务创建后,需要参考 使用 JDBC 或者 MySQL-CDC 数据源文档,下载 MySQL Driver(建议 8.0.27 版本),上传到 SQL 任务的参数配置 - 依赖文件中。
此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适资源池进行上线。
上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。
可以进入流式计算 Kafka 版管理界面(需要账户有相关访问权限),可以看到已经在 Topic 目录下,产生了相关的CDC 数据。则说明数据写入成功。
数据格式如下:
## Insert 事件 {"before":null,"after":{"db_name":"app_db","table_name":"orders","operation_ts":"2025-11-13 08:19:42Z","id":3,"price":12},"op":"c"} ## Delete 事件 {"before":{"db_name":"app_db","table_name":"orders","operation_ts":"2025-11-13 08:19:42Z","id":3,"price":1E+1},"after":null,"op":"d"}
问题现象:如果任务启动失败,在日志中出现如下异常信息 Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver。
解决方案:该问题是因为合规性原因,Flink CDC 没有内置 MySQL Driver,请参考 使用 JDBC 或者 MySQL-CDC 数据源 文档,下载 MySQL 官方 Driver (建议 8.0.27 版本),并且上传到 Flink CDC 任务的依赖文件中。
问题现象:投递结果中没有更新事件。
解决方案:Flink SQL 会将更新事件拆分成 -D 和 +I 事件,所以在 debezium-json 等格式数据中可以看到更新事件对应两条数据,即删除后拆入的模式。