You need to enable JavaScript to run this app.
导航

MySQL CDC

最近更新时间2024.01.23 19:25:54

首次发布时间2023.09.12 16:22:50

MySQL CDC 连接器提供了从 MySQL 数据库读取全量和增量数据的能力,仅用于做数据源表。

使用限制

  • MySQL CDC 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。
  • 支持 MySQL 版本为 5.6, 5.7, 8.x。
  • 如果您需要使用 MySQL CDC 连接器连接云数据库 veDB MySQL 版,您的连接终端请按照以下要求配置,否则可能会因为自定义连接终端的限制而出现任务故障。
    如需详细了解各参数含义,请参见编辑连接终端
    • 读写模式:配置为读写
    • 一致性级别:配置为最终一致性
    • 主节点接受读:关闭该选项。
    • 事务拆分:打开该选项。

DDL 定义

CREATE TABLE orders (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time timestamp,
    PRIMARY KEY (order_id) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义主键。
     ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = 'localhost',
     'port' = '3306',
     'username' = 'flinkuser',
     'password' = 'flinkpw',
     'database-name' = 'mydb',
     'table-name' = 'orders'
     );

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 mysql-cdc 连接器。

hostname

(none)

String

MySQL 数据库服务器的 IP 地址或主机名。
推荐使用主库地址

port

3306

Integer

MySQL 数据库服务器的端口号。

username

(none)

String

MySQL 数据库服务器的用户名称。

password

(none)

String

MySQL 数据库服务器的用户密码。

database-name

(none)

String

数据库名称。
数据库名称支持正则表达式,以匹配多个库。

table-name

(none)

String

Table 名称。
Table 名称支持正则表达式,以匹配多个表。

server-id

(none)

Integer

读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 5400 或 5400~5408。
默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是建议用户明确指定 Server id。

说明

如果scan.incremental.snapshot.enabled参数设置为 true 时,建议 server id 配置成整数范围。

scan.incremental.snapshot.enabled

true

Boolean

增量快照读取机制。

说明

如果需要保证 Source 的并发运行,那么需要保证拥有唯一的 server id,因此建议 server id 配置成整数范围。

scan.incremental.snapshot.chunk.size

8096

Integer

读取表的快照时,捕获的表被拆为多少个块。

scan.snapshot.fetch.size

1024

Integer

读取表快照时,每次读取数据的最大条数。

scan.startup.mode

initial

String

MySQL CDC 消费者可选的启动模式。

  • initial:首次启动时对数据库表执行初始快照,并继续读取最新的 binlog。
  • earliest-offset:跳过快照阶段,从可读取的第一 binlog 位点开始读取。
  • latest-offset:首次启动时,不对数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取连接器启动之后的数据更改。
  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可以通过 binlog 文件名和位置指定,或者通过 GTID 集合指定。
  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

scan.startup.specific-offset.file

(none)

String

specific-offset启动模式下,启动位点的 binlog 文件名。

scan.startup.specific-offset.pos

(none)

Long

specific-offset启动模式下,启动位点的 binlog 文件位置。

scan.startup.specific-offset.gtid-set

(none)

String

specific-offset启动模式下,启动位点的 GTID 集合。

scan.startup.specific-offset.skip-events

(none)

Long

specific-offset启动模式下,在指定的启动位点后需要跳过的事件数量。

scan.startup.specific-offset.skip-rows

(none)

Long

specific-offset启动模式下,在指定的启动位点后需要跳过的数据行数量。

server-time-zone

(none)

String

数据库使用的会话时区,例如 Asia/Shanghai。该参数控制了 MySQL 中的 TIMESTAMP 类型如何转成 STRING 类型。

debezium.min.row. count.to.stream.result

1000

Integer

在快照操作期间,连接器将查询每个包含的表,以生成该表中所有行的读取事件。 该参数用于指定将事件传递到下游时,表必须包含的最小行数,默认值为 1000。
如果将此参数设置为 0,表示无需等待收集一定数量的数据,只要有一行就会立即传输。

connect.timeout

30s

Duration

连接 MySQL 数据库服务器的最长等待时间。

connect.max-retries

3

Integer

与 MySQL 数据库服务器重连的最大次数。

connection.pool.size

20

Integer

连接池大小。

jdbc.properties.*

20

String

自定义 JDBC URL 参数,例如:'jdbc.properties.useSSL' = 'false'

heartbeat.interval

30s

Duration

发送心跳事件的时间间隔,用于跟踪最新可用的 binlog 偏移量,一般用于解决慢表的问题(更新缓慢的数据表)。

debezium.*

(none)

String

Debezium 属性参数,从更细粒度控制 Debezium 客户端的行为。例如'debezium.snapshot.mode' = 'never'
如需了解更多 Debezium 属性,请参见Debezium 属性

scan.incremental.close-idle-reader.enabled

false

Boolean

是否在快照结束后关闭空闲的读取器(reader)。

示例代码

create table mysql_cdc_source (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_update_time timestamp,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname'='mys**.rds.ivolces.com',
 'port' = '3306',
 'username' = 'doc_user',
 'password' = 'Pw**45!',
 'database-name' = 'doc_db',
 'table-name' = 'orders'
);

create table print_sink (
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_update_time timestamp
) WITH (
  'connector' = 'print'
);

insert into print_sink
select * from mysql_cdc_source;