You need to enable JavaScript to run this app.
导航
Flink SQL 投递 Kafka CDC 数据
最近更新时间:2025.11.24 22:45:20首次发布时间:2025.11.24 22:45:20
复制全文
我的收藏
有用
有用
无用
无用

1. 概述

Flink SQL CDC 数据源是 Apache Flink 支持高效捕获和处理数据库变更数据,实现全量与增量数据的无缝同步。而 Kafka 是大规模、高吞吐的消息队列,可以作为 CDC 数据同步的中间媒介,实现数据订阅、跨云同步等多种场景。本手册将指导您如何使用 Flink SQL 进行 MySQL 到 Kafka 的数据同步任务。

注意,除了 MySQL 之外以下是支持的 CDC 数据源以及数据库版本:

Connector

Database

Driver

MongoDB CDC

MongoDB

MongoDB Driver: 4.11.2

MySQL CDC

MySQL
veDB MySQL

JDBC Driver: 8.0.28

Postgres CDC

PostgreSQL

JDBC Driver: 42.5.1

SQLServer CDC

Sqlserver

JDBC Driver: 9.4.1.jre8

适用场景:

  1. 需要将数据库变更日志投递到 Kafka 的场景。一般有以下几种技术选型:
    1. 【不建议】Upsert-Kafka 支持投递 CDC 数据,但是在消费 Upsert-Kafka 数据时,会产生 ChangelogNormalizer 用来保证数据顺序,会增加 Flink 的开销。
    2. 【建议】使用 Kafka + CDC 格式(Debezium、Canal 等),可以直接消费 CDC 数据,减少 ChangelogNormalizer 的开销。

2. 环境准备

确保您已经:

  1. 开通了流式计算 Flink 版产品,并能够在作业开发中创建 Flink SQL 任务,Flink 版本需要 1.16 以上。
  2. 已经在资源管理 - 资源池功能模块购买了按量或者包年包月资源池,可以正常提交 Flink 任务。
  3. 已经在消息队列 Kafka 版开通 Kafka 实例,并且和 Flink 资源池网络打通(建议使用同一 VPC)。

3. MySQL 设置

3.1 基础设置

当前 Flink CDC 支持从 MySQL 自动同步。需要 MySQL有如下前置条件(本文以云数据库 MySQL 版为例进行 Demo):

  1. MySQL 和 Flink 网络需要联通,建议MySQL 和 Flink 使用同一个 VPC 和子网,并且在 MySQL 为 Flink 的安全组设置白名单。参考创建白名单绑定白名单到实例

注意:火山引擎 MySQL 实例创建后默认不绑定任何白名单,任何 IP 均无法访问该 MySQL 实例。

Image

  1. 另外需要使用 MySQL 创建访问用户,并且赋予用户只读权限以及 REPLICATION SLAVEREPLICATION CLIENT用以访问 binlog。本文创建 flinkcdc 账号。

Image

3.2 MySQL 样例数据

为了测试的目的,可以使用以下 SQL 创建数据库,并且导入部分 Demo 数据:

-- create database
CREATE DATABASE app_db;

USE app_db;

-- create orders table
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);

-- create shipments table
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');

-- create products table
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

3.2 Kafka Topic 创建

可以参考创建 Kafka Topic,在消息队列 Kafka 版中创建 Topic ,用来接收投递的 CDC 数据。本文中采用 flink-cdc作为 Topic 名称。

Image

4. 配置 SQL 同步任务

操作路径:作业开发 - Flink SQL 作业 - 创建作业
参考文档开发 Flink SQL 任务

-- mysql-cdc 同步 orders 数据表
CREATE TEMPORARY TABLE `orders` (
    `db_name` STRING METADATA FROM 'database_name' VIRTUAL,
    `table_name` STRING METADATA  FROM 'table_name' VIRTUAL,
    `operation_ts` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    `id` int NOT NULL,
    `price` decimal(10,2) NOT NULL,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '${secret_values.mysql-host}',
    'port' = '3306',
    'username' = '${secret_values.mysql-username}',
    'password' = '${secret_values.mysql-passwd}',
    'database-name' = 'app_db',
    'table-name' = 'orders'
);

-- 投递 Kafka 下游数据表
CREATE TEMPORARY TABLE `orders_kfk` (
    `db_name` STRING,
    `table_name` STRING,
    `operation_ts` TIMESTAMP_LTZ(3),
    `id` int NOT NULL,
    `price` decimal(10,2) NOT NULL,
    PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
    'connector' = 'kafka',
    -- Topic 需要提前创建
    'topic' = 'flink-cdc',
    'properties.bootstrap.servers' = '${secret_values.kafka-host}',
    'properties.group.id' = 'testGroup',
    -- 可以按照 db 和 table 作为分区 key 字段,保证同一个数据表的数据在同一个分区中
    'key.fields' = 'db_name;table_name',
    'key.format' = 'csv',
    'value.format' = 'debezium-json'
);


INSERT INTO `orders_kfk`
SELECT * FROM `orders`;

注意:

  1. CDC 数据投递 Kafka,不能使用 JSON、CSV 等 APPEND 流数据,需要采用 Debezium-JSONCanal-JSON等 CDC 数据格式支持。
  2. key.fields可以指定用来分区的字段,能够保证同样的数据表会投递到同一个分区内,并且在分区内保证有序。具体参数可以参考 Kafka/BMQ

4.2 上传 MySQL Driver 文件

在 SQL 同步任务创建后,需要参考 使用 JDBC 或者 MySQL-CDC 数据源文档,下载 MySQL Driver(建议 8.0.27 版本),上传到 SQL 任务的参数配置 - 依赖文件中。
Image

5. 上线任务

5.1 任务上线

此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适资源池进行上线。

Image

上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。

5.2 确认任务执行成功

可以进入流式计算 Kafka 版管理界面(需要账户有相关访问权限),可以看到已经在 Topic 目录下,产生了相关的CDC 数据。则说明数据写入成功。
Image

数据格式如下:

## Insert 事件
{"before":null,"after":{"db_name":"app_db","table_name":"orders","operation_ts":"2025-11-13 08:19:42Z","id":3,"price":12},"op":"c"}

## Delete 事件
{"before":{"db_name":"app_db","table_name":"orders","operation_ts":"2025-11-13 08:19:42Z","id":3,"price":1E+1},"after":null,"op":"d"}

8. 常见问题

8.1 JDBC Driver 找不到错误

问题现象:如果任务启动失败,在日志中出现如下异常信息 Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

Image
解决方案:该问题是因为合规性原因,Flink CDC 没有内置 MySQL Driver,请参考 使用 JDBC 或者 MySQL-CDC 数据源 文档,下载 MySQL 官方 Driver (建议 8.0.27 版本),并且上传到 Flink CDC 任务的依赖文件中。

Image

8.2 更新事件不存在

问题现象:投递结果中没有更新事件。
解决方案:Flink SQL 会将更新事件拆分成 -D 和 +I 事件,所以在 debezium-json 等格式数据中可以看到更新事件对应两条数据,即删除后拆入的模式。