You need to enable JavaScript to run this app.
导航
Flink CDC 投递 Kafka 数据
最近更新时间:2025.12.04 15:55:12首次发布时间:2025.12.04 15:55:12
复制全文
我的收藏
有用
有用
无用
无用

1. 概述

Flink CDC 3 是 Apache Flink 的新一代 CDC 解决方案,支持高效捕获和处理数据库变更数据,实现全量与增量数据的无缝同步。而 Kafka 是大规模、高吞吐的消息队列,可以作为 CDC 数据同步的中间媒介,实现数据订阅、跨云同步等多种场景。本手册将指导您如何使用 Flink CDC 进行 MySQL 到 Kafka 的数据同步任务。

注意,除了 MySQL 之外以下是所有支持的 CDC 数据源以及数据库版本:

Connector

Database

Driver

mysql-cdc

MySQL
veDB MySQL

JDBC Driver: 8.0.28

适用场景:

  1. 需要将数据库变更日志投递到 Kafka 的场景。Flink CDC 当前支持使用 Kafka + CDC 格式(支持 Debezium、Canal)进行投递。

2. 环境准备

确保您已经:

  1. 开通了流式计算 Flink 版产品,并能够在作业开发中创建 Flink CDC 任务,Flink CDC 版本建议需要 3.4 及以上。
  2. 已经在资源管理 - 资源池功能模块购买了按量或者包年包月资源池,可以正常提交 Flink 任务。
  3. 已经在消息队列 Kafka 版开通 Kafka 实例,并且和 Flink 资源池网络打通(建议使用同一 VPC)。

3. MySQL 设置

3.1 基础设置

当前 Flink CDC 支持从 MySQL 自动同步。需要 MySQL有如下前置条件(本文以云数据库 MySQL 版为例进行 Demo):

  1. MySQL 和 Flink 网络需要联通,建议MySQL 和 Flink 使用同一个 VPC 和子网,并且在 MySQL 为 Flink 的安全组设置白名单。参考创建白名单绑定白名单到实例

注意:火山引擎 MySQL 实例创建后默认不绑定任何白名单,任何 IP 均无法访问该 MySQL 实例。

Image

  1. 另外需要使用 MySQL 创建访问用户,并且赋予用户只读权限以及 REPLICATION SLAVEREPLICATION CLIENT用以访问 binlog。本文创建 flinkcdc 账号。

Image

3.2 MySQL 样例数据

为了测试的目的,可以使用以下 SQL 创建数据库,并且导入部分 Demo 数据:

-- create database
CREATE DATABASE app_db;

USE app_db;

-- create orders table
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);

-- create shipments table
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');

-- create products table
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

3.2 Kafka Topic 创建

可以参考创建 Kafka Topic,在消息队列 Kafka 版中创建 Topic ,用来接收投递的 CDC 数据。本文中采用 flink-cdc作为 Topic 名称。

Image

4. 配置 CDC 同步任务

操作路径:作业开发 - Flink CDC 作业 - 创建作业
参考文档开发 Flink CDC 任务

sources:
- source:
    type: mysql
    name: MySQL Source
    hostname: ${secret_values.mysql-host}
    port: 3306
    username: ${secret_values.mysql-username}
    password: ${secret_values.mysql-passwd}
    # 通过通配符筛选需要同步的数据库表
    tables: app_db.\.*
    scan.newly-added-table.enabled: 'true'
sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: PLAINTEXT://${secret_values.kafka-host}
  # 需要提前创建 Topic
  topic: flink-cdc
  key.format: json
  # 仅支持 debezium-json 和 canal-json
  value.format: debezium-json
pipeline:
  name: mysql-kafka
  # 同步任务的并行度
  parallelism: 2

注意:

  1. CDC 数据投递 Kafka,不能使用 JSON、CSV 等 APPEND 流数据,需要采用 Debezium-JSONCanal-JSON CDC 数据格式支持。
  2. 投递的 Kafka Topic 需要提前创建,暂不支持 Topic 自动创建。如果需要不同表投递不同 Topic,可以通过sink.tableId-to-topic.mapping进行配置。为每个表配置从上游表 ID(tableId)到下游 Kafka 主题的自定义表映射。每个映射以;分隔,上游表 ID 与下游 Kafka 主题之间以:分隔。例如,可将sink.tableId-to-topic.mapping配置为mydb.mytable1:topic1;mydb.mytable2:topic2

4.2 上传 MySQL Driver 文件

在 SQL 同步任务创建后,需要参考 使用 JDBC 或者 MySQL-CDC 数据源文档,下载 MySQL Driver(建议 8.0.27 版本),上传到 SQL 任务的参数配置 - 依赖文件中。
Image

5. 上线任务

5.1 任务上线

此时可以点击工具栏中的验证按钮检查一些基本的 CDC 语法问题。
如果 CDC 没有其他错误之后,可以选择上线 CDC 任务,选择工具栏 - 上线,选择合适资源池进行上线。

Image

上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。

5.2 确认任务执行成功

可以进入流式计算 Kafka 版管理界面(需要账户有相关访问权限),可以看到已经在 Topic 目录下,产生了相关的CDC 数据。则说明数据写入成功。
Image

数据格式如下:

## Insert 事件
{"before":null,"after":{"db_name":"app_db","table_name":"orders","operation_ts":"2025-11-13 08:19:42Z","id":3,"price":12},"op":"c"}

## Delete 事件
{"before":{"db_name":"app_db","table_name":"orders","operation_ts":"2025-11-13 08:19:42Z","id":3,"price":1E+1},"after":null,"op":"d"}

8. 常见问题

8.1 JDBC Driver 找不到错误

问题现象:如果任务启动失败,在日志中出现如下异常信息 Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

Image
解决方案:该问题是因为合规性原因,Flink CDC 没有内置 MySQL Driver,请参考 使用 JDBC 或者 MySQL-CDC 数据源 文档,下载 MySQL 官方 Driver (建议 8.0.27 版本),并且上传到 Flink CDC 任务的依赖文件中。

Image

8.2 更新事件不存在

问题现象:投递结果中没有更新事件。
解决方案:Flink CDC 会将更新事件拆分成 -D 和 +I 事件,所以在 debezium-json 等格式数据中可以看到更新事件对应两条数据,即删除后拆入的模式。