You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
Flink CDC 数据同步
Flink CDC 投递 Kafka 数据
复制全文
Flink CDC 投递 Kafka 数据

1. 概述

Flink CDC 3 是 Apache Flink 的新一代 CDC 解决方案,支持高效捕获和处理数据库变更数据,实现全量与增量数据的无缝同步。而 Kafka 是大规模、高吞吐的消息队列,可以作为 CDC 数据同步的中间媒介,实现数据订阅、跨云同步等多种场景。本手册将指导您如何使用 Flink CDC 进行 MySQL 到 Kafka 的数据同步任务。

注意,除了 MySQL 之外以下是所有支持的 CDC 数据源以及数据库版本:

Connector

Database

Driver

mysql-cdc

MySQL
veDB MySQL

JDBC Driver: 8.0.28

适用场景:

  1. 需要将数据库变更日志投递到 Kafka 的场景。Flink CDC 当前支持使用 Kafka + CDC 格式(支持 Debezium、Canal)进行投递。

2. 环境准备

确保您已经:

  1. 开通了流式计算 Flink 版产品,并能够在作业开发中创建 Flink CDC 任务,Flink CDC 版本建议需要 3.4 及以上。
  2. 已经在资源管理 - 资源池功能模块购买了按量或者包年包月资源池,可以正常提交 Flink 任务。
  3. 已经在消息队列 Kafka 版开通 Kafka 实例,并且和 Flink 资源池网络打通(建议使用同一 VPC)。

3. MySQL 设置

3.1 基础设置

当前 Flink CDC 支持从 MySQL 自动同步。需要 MySQL有如下前置条件(本文以云数据库 MySQL 版为例进行 Demo):

  1. MySQL 和 Flink 网络需要联通,建议MySQL 和 Flink 使用同一个 VPC 和子网,并且在 MySQL 为 Flink 的安全组设置白名单。参考创建白名单绑定白名单到实例

注意:火山引擎 MySQL 实例创建后默认不绑定任何白名单,任何 IP 均无法访问该 MySQL 实例。

Image

  1. 另外需要使用 MySQL 创建访问用户,并且赋予用户只读权限以及 REPLICATION SLAVEREPLICATION CLIENT用以访问 binlog。本文创建 flinkcdc 账号。

Image

3.2 MySQL 样例数据

为了测试的目的,可以使用以下 SQL 创建数据库,并且导入部分 Demo 数据:

-- create database
CREATE DATABASE app_db;

USE app_db;

-- create orders table
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);

-- create shipments table
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');

-- create products table
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

3.2 Kafka Topic 创建

可以参考创建 Kafka Topic,在消息队列 Kafka 版中创建 Topic ,用来接收投递的 CDC 数据。本文中采用 flink-cdc作为 Topic 名称。

Image

4. 配置 CDC 同步任务

操作路径:作业开发 - Flink CDC 作业 - 创建作业
参考文档开发 Flink CDC 任务

sources:
- source:
    type: mysql
    name: MySQL Source
    hostname: ${secret_values.mysql-host}
    port: 3306
    username: ${secret_values.mysql-username}
    password: ${secret_values.mysql-passwd}
    # 通过通配符筛选需要同步的数据库表
    tables: app_db.\.*
    scan.newly-added-table.enabled: 'true'
sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: PLAINTEXT://${secret_values.kafka-host}
  # 需要提前创建 Topic
  topic: flink-cdc
  key.format: json
  # 仅支持 debezium-json 和 canal-json
  value.format: debezium-json
pipeline:
  name: mysql-kafka
  # 同步任务的并行度
  parallelism: 2

注意:

  1. CDC 数据投递 Kafka,不能使用 JSON、CSV 等 APPEND 流数据,需要采用 Debezium-JSONCanal-JSON CDC 数据格式支持。
  2. 投递的 Kafka Topic 需要提前创建,暂不支持 Topic 自动创建。如果需要不同表投递不同 Topic,可以通过sink.tableId-to-topic.mapping进行配置。为每个表配置从上游表 ID(tableId)到下游 Kafka 主题的自定义表映射。每个映射以;分隔,上游表 ID 与下游 Kafka 主题之间以:分隔。例如,可将sink.tableId-to-topic.mapping配置为mydb.mytable1:topic1;mydb.mytable2:topic2

4.2 上传 MySQL Driver 文件

在 SQL 同步任务创建后,需要参考 使用 JDBC 或者 MySQL-CDC 数据源文档,下载 MySQL Driver(建议 8.0.27 版本),上传到 SQL 任务的参数配置 - 依赖文件中。
Image

5. 上线任务

5.1 任务上线

此时可以点击工具栏中的验证按钮检查一些基本的 CDC 语法问题。
如果 CDC 没有其他错误之后,可以选择上线 CDC 任务,选择工具栏 - 上线,选择合适资源池进行上线。

Image

上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。

5.2 确认任务执行成功

可以进入流式计算 Kafka 版管理界面(需要账户有相关访问权限),可以看到已经在 Topic 目录下,产生了相关的CDC 数据。则说明数据写入成功。
Image

数据格式如下:

## Insert 事件
{"before":null,"after":{"db_name":"app_db","table_name":"orders","operation_ts":"2025-11-13 08:19:42Z","id":3,"price":12},"op":"c"}

## Delete 事件
{"before":{"db_name":"app_db","table_name":"orders","operation_ts":"2025-11-13 08:19:42Z","id":3,"price":1E+1},"after":null,"op":"d"}

8. 常见问题

8.1 JDBC Driver 找不到错误

问题现象:如果任务启动失败,在日志中出现如下异常信息 Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

Image
解决方案:该问题是因为合规性原因,Flink CDC 没有内置 MySQL Driver,请参考 使用 JDBC 或者 MySQL-CDC 数据源 文档,下载 MySQL 官方 Driver (建议 8.0.27 版本),并且上传到 Flink CDC 任务的依赖文件中。

Image

8.2 更新事件不存在

问题现象:投递结果中没有更新事件。
解决方案:Flink CDC 会将更新事件拆分成 -D 和 +I 事件,所以在 debezium-json 等格式数据中可以看到更新事件对应两条数据,即删除后拆入的模式。

最近更新时间:2025.12.04 15:55:12
这个页面对您有帮助吗?
有用
有用
无用
无用