Flink CDC 3 是 Apache Flink 的新一代 CDC 解决方案,支持高效捕获和处理数据库变更数据,实现全量与增量数据的无缝同步。而 Kafka 是大规模、高吞吐的消息队列,可以作为 CDC 数据同步的中间媒介,实现数据订阅、跨云同步等多种场景。本手册将指导您如何使用 Flink CDC 进行 MySQL 到 Kafka 的数据同步任务。
注意,除了 MySQL 之外以下是所有支持的 CDC 数据源以及数据库版本:
Connector | Database | Driver |
|---|---|---|
JDBC Driver: 8.0.28 |
适用场景:
确保您已经:
当前 Flink CDC 支持从 MySQL 自动同步。需要 MySQL有如下前置条件(本文以云数据库 MySQL 版为例进行 Demo):
注意:火山引擎 MySQL 实例创建后默认不绑定任何白名单,任何 IP 均无法访问该 MySQL 实例。
REPLICATION SLAVE、REPLICATION CLIENT用以访问 binlog。本文创建 flinkcdc 账号。为了测试的目的,可以使用以下 SQL 创建数据库,并且导入部分 Demo 数据:
-- create database CREATE DATABASE app_db; USE app_db; -- create orders table CREATE TABLE `orders` ( `id` INT NOT NULL, `price` DECIMAL(10,2) NOT NULL, PRIMARY KEY (`id`) ); -- insert records INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00); -- create shipments table CREATE TABLE `shipments` ( `id` INT NOT NULL, `city` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- insert records INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian'); -- create products table CREATE TABLE `products` ( `id` INT NOT NULL, `product` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- insert records INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
可以参考创建 Kafka Topic,在消息队列 Kafka 版中创建 Topic ,用来接收投递的 CDC 数据。本文中采用 flink-cdc作为 Topic 名称。
操作路径:作业开发 - Flink CDC 作业 - 创建作业
参考文档:开发 Flink CDC 任务
sources: - source: type: mysql name: MySQL Source hostname: ${secret_values.mysql-host} port: 3306 username: ${secret_values.mysql-username} password: ${secret_values.mysql-passwd} # 通过通配符筛选需要同步的数据库表 tables: app_db.\.* scan.newly-added-table.enabled: 'true' sink: type: kafka name: Kafka Sink properties.bootstrap.servers: PLAINTEXT://${secret_values.kafka-host} # 需要提前创建 Topic topic: flink-cdc key.format: json # 仅支持 debezium-json 和 canal-json value.format: debezium-json pipeline: name: mysql-kafka # 同步任务的并行度 parallelism: 2
注意:
sink.tableId-to-topic.mapping进行配置。为每个表配置从上游表 ID(tableId)到下游 Kafka 主题的自定义表映射。每个映射以;分隔,上游表 ID 与下游 Kafka 主题之间以:分隔。例如,可将sink.tableId-to-topic.mapping配置为mydb.mytable1:topic1;mydb.mytable2:topic2。在 SQL 同步任务创建后,需要参考 使用 JDBC 或者 MySQL-CDC 数据源文档,下载 MySQL Driver(建议 8.0.27 版本),上传到 SQL 任务的参数配置 - 依赖文件中。
此时可以点击工具栏中的验证按钮检查一些基本的 CDC 语法问题。
如果 CDC 没有其他错误之后,可以选择上线 CDC 任务,选择工具栏 - 上线,选择合适资源池进行上线。
上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。
可以进入流式计算 Kafka 版管理界面(需要账户有相关访问权限),可以看到已经在 Topic 目录下,产生了相关的CDC 数据。则说明数据写入成功。
数据格式如下:
## Insert 事件 {"before":null,"after":{"db_name":"app_db","table_name":"orders","operation_ts":"2025-11-13 08:19:42Z","id":3,"price":12},"op":"c"} ## Delete 事件 {"before":{"db_name":"app_db","table_name":"orders","operation_ts":"2025-11-13 08:19:42Z","id":3,"price":1E+1},"after":null,"op":"d"}
问题现象:如果任务启动失败,在日志中出现如下异常信息 Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver。
解决方案:该问题是因为合规性原因,Flink CDC 没有内置 MySQL Driver,请参考 使用 JDBC 或者 MySQL-CDC 数据源 文档,下载 MySQL 官方 Driver (建议 8.0.27 版本),并且上传到 Flink CDC 任务的依赖文件中。
问题现象:投递结果中没有更新事件。
解决方案:Flink CDC 会将更新事件拆分成 -D 和 +I 事件,所以在 debezium-json 等格式数据中可以看到更新事件对应两条数据,即删除后拆入的模式。