最近更新时间:2023.10.30 15:38:57
首次发布时间:2023.09.12 16:22:50
MongoDB CDC 连接器提供了从 MongoDB 数据库读取全量和增量数据的能力,仅用于做数据源表。
CREATE TABLE products ( _id bigint, order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time varchar, PRIMARY KEY (_id) NOT ENFORCED ## 如果要同步的数据库表定义了主键, 则这里也需要定义主键。 ) WITH ( 'connector' = 'mongodb-cdc', 'hosts' = 'localhost:27017,localhost:27018,localhost:27019', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database' = 'inventory', 'collection' = 'products' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 mongodb-cdc 连接器。 |
hosts | 是 | (none) | String | MongoDB 服务器地址,格式为 |
username | 是 | (none) | String | MongoDB 数据库服务的用户名。 |
password | 是 | (none) | String | MongoDB 数据库服务的用户密码。 |
database | 是 | (none) | String | MongoDB 数据库名称。 |
collection | 是 | (none) | String | MongoDB Collection 名称。 |
connection.options | 否 | (none) | String | MongoDB 的连接选项,有多个配置项时,需要使用 & 进行连接。 |
errors.tolerance | 否 | none | String | 遇到错误时,是否继续处理消息。
|
errors.log.enable | 否 | true | Boolean | 是否把错误操作写入日志文件。
|
copy.existing | 否 | true | Boolean | 是否从源库中复制已有数据。
|
copy.existing.pipeline | 否 | (none) | String | 如果需要复制源库中已有数据时,您可以通过 copy.existing.pipeline 参数配置筛选条件。该参数是一个包含 JSON 对象的数组,可以定义过滤、转换等操作。 |
copy.existing.max.threads | 否 | (none) | Integer | 如果需要复制源库中已有数据时,您可以通过 copy.existing.max.threads 参数配置执行复制数据时的最大线程数。 |
copy.existing.queue.size | 否 | 16000 | Integer | 如果需要复制源库中已有数据时,您可以通过 copy.existing.queue.size 参数配置队列最大值。 |
poll.max.batch.size | 否 | 1000 | Integer | 每次拉取数据的最大数量。 |
poll.await.time.ms | 否 | 1500 | Integer | 拉取数据的时间间隔,默认值为 1500ms。 |
heartbeat.interval.ms | 否 | 0 | Integer | 发送心跳消息的间隔时长。 |
MongoDB CDC 和 Flink 字段类型对应关系如下:
MongoDB 字段类型 | Flink SQL 字段类型 |
---|---|
- | TINYINT |
- | SMALLINT |
Int | INT |
Long | BIGINT |
- | FLOAT |
Double | DOUBLE |
Decimal128 | DECIMAL(p, s) |
Boolean | BOOLEAN |
DateTimestamp | DATE |
DateTimestamp | TIME |
Date | TIMESTAMP(3) |
Timestamp | TIMESTAMP(0) |
String | STRING |
BinData | BYTES |
Object | ROW |
Array | ARRAY |
DBPointer | ROW<$ref STRING, $id STRING> |
Point : ROW<type STRING, coordinates ARRAY |
CREATE TABLE mongo_cdc_source ( _id bigint, order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time varchar, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb-cdc', 'hosts' = 'localhost1:3717,localhost2:3717', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database' = 'doc-db', 'collection' = 'products' ); CREATE TABLE print_table ( _id bigint, order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time varchar ) WITH ( 'connector' = 'print' ); insert into print_table select * from mongo_cdc_source;