You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
Flink CDC 数据同步
Flink SQL 投递 Kafka CDC 数据
复制全文
Flink SQL 投递 Kafka CDC 数据

1. 概述

Flink SQL CDC 数据源是 Apache Flink 支持高效捕获和处理数据库变更数据,实现全量与增量数据的无缝同步。而 Kafka 是大规模、高吞吐的消息队列,可以作为 CDC 数据同步的中间媒介,实现数据订阅、跨云同步等多种场景。本手册将指导您如何使用 Flink SQL 进行 MySQL 到 Kafka 的数据同步任务。

注意,除了 MySQL 之外以下是支持的 CDC 数据源以及数据库版本:

Connector

Database

Driver

MongoDB CDC

MongoDB

MongoDB Driver: 4.11.2

MySQL CDC

MySQL
veDB MySQL

JDBC Driver: 8.0.28

Postgres CDC

PostgreSQL

JDBC Driver: 42.5.1

SQLServer CDC

Sqlserver

JDBC Driver: 9.4.1.jre8

适用场景:

  1. 需要将数据库变更日志投递到 Kafka 的场景。一般有以下几种技术选型:
    1. 【不建议】Upsert-Kafka 支持投递 CDC 数据,但是在消费 Upsert-Kafka 数据时,会产生 ChangelogNormalizer 用来保证数据顺序,会增加 Flink 的开销。
    2. 【建议】使用 Kafka + CDC 格式(Debezium、Canal 等),可以直接消费 CDC 数据,减少 ChangelogNormalizer 的开销。

2. 环境准备

确保您已经:

  1. 开通了流式计算 Flink 版产品,并能够在作业开发中创建 Flink SQL 任务,Flink 版本需要 1.16 以上。
  2. 已经在资源管理 - 资源池功能模块购买了按量或者包年包月资源池,可以正常提交 Flink 任务。
  3. 已经在消息队列 Kafka 版开通 Kafka 实例,并且和 Flink 资源池网络打通(建议使用同一 VPC)。

3. MySQL 设置

3.1 基础设置

当前 Flink CDC 支持从 MySQL 自动同步。需要 MySQL有如下前置条件(本文以云数据库 MySQL 版为例进行 Demo):

  1. MySQL 和 Flink 网络需要联通,建议MySQL 和 Flink 使用同一个 VPC 和子网,并且在 MySQL 为 Flink 的安全组设置白名单。参考创建白名单绑定白名单到实例

注意:火山引擎 MySQL 实例创建后默认不绑定任何白名单,任何 IP 均无法访问该 MySQL 实例。

Image

  1. 另外需要使用 MySQL 创建访问用户,并且赋予用户只读权限以及 REPLICATION SLAVEREPLICATION CLIENT用以访问 binlog。本文创建 flinkcdc 账号。

Image

3.2 MySQL 样例数据

为了测试的目的,可以使用以下 SQL 创建数据库,并且导入部分 Demo 数据:

-- create database
CREATE DATABASE app_db;

USE app_db;

-- create orders table
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);

-- create shipments table
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');

-- create products table
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

-- insert records
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

3.2 Kafka Topic 创建

可以参考创建 Kafka Topic,在消息队列 Kafka 版中创建 Topic ,用来接收投递的 CDC 数据。本文中采用 flink-cdc作为 Topic 名称。

Image

4. 配置 SQL 同步任务

操作路径:作业开发 - Flink SQL 作业 - 创建作业
参考文档开发 Flink SQL 任务

-- mysql-cdc 同步 orders 数据表
CREATE TEMPORARY TABLE `orders` (
    `db_name` STRING METADATA FROM 'database_name' VIRTUAL,
    `table_name` STRING METADATA  FROM 'table_name' VIRTUAL,
    `operation_ts` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    `id` int NOT NULL,
    `price` decimal(10,2) NOT NULL,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '${secret_values.mysql-host}',
    'port' = '3306',
    'username' = '${secret_values.mysql-username}',
    'password' = '${secret_values.mysql-passwd}',
    'database-name' = 'app_db',
    'table-name' = 'orders'
);

-- 投递 Kafka 下游数据表
CREATE TEMPORARY TABLE `orders_kfk` (
    `db_name` STRING,
    `table_name` STRING,
    `operation_ts` TIMESTAMP_LTZ(3),
    `id` int NOT NULL,
    `price` decimal(10,2) NOT NULL,
    PRIMARY KEY (`id`) NOT ENFORCED
)
WITH (
    'connector' = 'kafka',
    -- Topic 需要提前创建
    'topic' = 'flink-cdc',
    'properties.bootstrap.servers' = '${secret_values.kafka-host}',
    'properties.group.id' = 'testGroup',
    -- 可以按照 db 和 table 作为分区 key 字段,保证同一个数据表的数据在同一个分区中
    'key.fields' = 'db_name;table_name',
    'key.format' = 'csv',
    'value.format' = 'debezium-json'
);


INSERT INTO `orders_kfk`
SELECT * FROM `orders`;

注意:

  1. CDC 数据投递 Kafka,不能使用 JSON、CSV 等 APPEND 流数据,需要采用 Debezium-JSONCanal-JSON等 CDC 数据格式支持。
  2. key.fields可以指定用来分区的字段,能够保证同样的数据表会投递到同一个分区内,并且在分区内保证有序。具体参数可以参考 Kafka/BMQ

4.2 上传 MySQL Driver 文件

在 SQL 同步任务创建后,需要参考 使用 JDBC 或者 MySQL-CDC 数据源文档,下载 MySQL Driver(建议 8.0.27 版本),上传到 SQL 任务的参数配置 - 依赖文件中。
Image

5. 上线任务

5.1 任务上线

此时可以点击工具栏中的验证按钮检查一些基本的 SQL 语法问题。
如果 SQL 没有其他错误之后,可以选择上线 SQL 任务,选择工具栏 - 上线,选择合适资源池进行上线。

Image

上线成功后,在任务管理页面启动任务,并且检查任务进入运行中状态。

5.2 确认任务执行成功

可以进入流式计算 Kafka 版管理界面(需要账户有相关访问权限),可以看到已经在 Topic 目录下,产生了相关的CDC 数据。则说明数据写入成功。
Image

数据格式如下:

## Insert 事件
{"before":null,"after":{"db_name":"app_db","table_name":"orders","operation_ts":"2025-11-13 08:19:42Z","id":3,"price":12},"op":"c"}

## Delete 事件
{"before":{"db_name":"app_db","table_name":"orders","operation_ts":"2025-11-13 08:19:42Z","id":3,"price":1E+1},"after":null,"op":"d"}

8. 常见问题

8.1 JDBC Driver 找不到错误

问题现象:如果任务启动失败,在日志中出现如下异常信息 Caused by: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

Image
解决方案:该问题是因为合规性原因,Flink CDC 没有内置 MySQL Driver,请参考 使用 JDBC 或者 MySQL-CDC 数据源 文档,下载 MySQL 官方 Driver (建议 8.0.27 版本),并且上传到 Flink CDC 任务的依赖文件中。

Image

8.2 更新事件不存在

问题现象:投递结果中没有更新事件。
解决方案:Flink SQL 会将更新事件拆分成 -D 和 +I 事件,所以在 debezium-json 等格式数据中可以看到更新事件对应两条数据,即删除后拆入的模式。

最近更新时间:2025.11.25 16:05:23
这个页面对您有帮助吗?
有用
有用
无用
无用