最近更新时间:2023.09.12 16:22:50
首次发布时间:2023.09.12 16:22:50
MySQL CDC 连接器提供了从 MySQL 数据库读取全量和增量数据的能力,仅用于做数据源表。
CREATE TABLE orders ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp, PRIMARY KEY (order_id) NOT ENFORCED ## 如果要同步的数据库表定义了主键, 则这里也需要定义主键。 ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'mydb', 'table-name' = 'orders' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 mysql-cdc 连接器。 |
hostname | 是 | (none) | String | MySQL 数据库服务器的 IP 地址或主机名。 |
port | 否 | 3306 | Integer | MySQL 数据库服务器的端口号。 |
username | 是 | (none) | String | MySQL 数据库服务器的用户名称。 |
password | 是 | (none) | String | MySQL 数据库服务器的用户密码。 |
database-name | 是 | (none) | String | 数据库名称。 |
table-name | 是 | (none) | String | Table 名称。 |
server-id | 否 | (none) | Integer | 读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 5400 或 5400~5408。 说明 如果 |
scan.incremental.snapshot.enabled | 否 | true | Boolean | 增量快照读取机制。 说明 如果需要保证 Source 的并发运行,那么需要保证拥有唯一的 server id,因此建议 server id 配置成整数范围。 |
scan.incremental.snapshot.chunk.size | 否 | 8096 | Integer | 读取表的快照时,捕获的表被拆为多少个块。 |
scan.snapshot.fetch.size | 否 | 1024 | Integer | 读取表快照时,每次读取数据的最大条数。 |
scan.startup.mode | 否 | initial | String | 支持的启动模式。
|
server-time-zone | 否 | UTC | String | 数据库使用的会话时区,例如 Asia/Shanghai。该参数控制了 MySQL 中的 TIMESTAMP 类型如何转成 STRING 类型。 |
debezium.min.row. count.to.stream.result | 否 | 1000 | Integer | 在快照操作期间,连接器将查询每个包含的表,以生成该表中所有行的读取事件。 该参数用于指定将事件传递到下游时,表必须包含的最小行数,默认值为 1000。 |
connect.timeout | 否 | 30s | Duration | 连接 MySQL 数据库服务器的最长等待时间。 |
connect.max-retries | 否 | 3 | Integer | 与 MySQL 数据库服务器重连的最大次数。 |
connection.pool.size | 否 | 20 | Integer | 连接池大小。 |
jdbc.properties.* | 否 | 20 | String | 自定义 JDBC URL 参数,例如: |
heartbeat.interval | 否 | 30s | Duration | 发送心跳事件的时间间隔,用于跟踪最新可用的 binlog 偏移量,一般用于解决慢表的问题(更新缓慢的数据表)。 |
debezium.* | 否 | (none) | String | Debezium 属性参数,从更细粒度控制 Debezium 客户端的行为。例如 |
create table mysql_cdc_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_update_time timestamp, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname'='mys**.rds.ivolces.com', 'port' = '3306', 'username' = 'doc_user', 'password' = 'Pw**45!', 'database-name' = 'doc_db', 'table-name' = 'orders' ); create table print_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_update_time timestamp ) WITH ( 'connector' = 'print' ); insert into print_sink select * from mysql_cdc_source;