You need to enable JavaScript to run this app.
导航

MongoDB CDC

最近更新时间2023.10.30 15:38:57

首次发布时间2023.09.12 16:22:50

MongoDB CDC 连接器提供了从 MongoDB 数据库读取全量和增量数据的能力,仅用于做数据源表。

使用限制

  • MongoDB CDC 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。
  • MongoDB CDC 仅支持作为数据源表,MongoDB CDC 支持 3.6、4.X、5.0 版本。

DDL 定义

CREATE TABLE products (
    _id bigint,
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time varchar,
    PRIMARY KEY (_id) NOT ENFORCED  ## 如果要同步的数据库表定义了主键, 则这里也需要定义主键。
) WITH (
  'connector' = 'mongodb-cdc',
  'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database' = 'inventory',
  'collection' = 'products'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 mongodb-cdc 连接器。

hosts

(none)

String

MongoDB 服务器地址,格式为ip:port
如果有多个地址,需要用英文逗号(,)分隔。

username

(none)

String

MongoDB 数据库服务的用户名。

password

(none)

String

MongoDB 数据库服务的用户密码。

database

(none)

String

MongoDB 数据库名称。

collection

(none)

String

MongoDB Collection 名称。

connection.options

(none)

String

MongoDB 的连接选项,有多个配置项时,需要使用 & 进行连接。
如需了解连接选项的更多信息,请参见Connection String Options

errors.tolerance

none

String

遇到错误时,是否继续处理消息。

  • none:报告错误,并阻止继续处理消息。
  • all:忽略任何错误,继续处理消息。

errors.log.enable

true

Boolean

是否把错误操作写入日志文件。

  • true:默认值,写入错误操作日志
  • false:不写入

copy.existing

true

Boolean

是否从源库中复制已有数据。

  • true:默认值,mysql-cdc 连接器启动时将源库中已有数据复制到目标存储。
  • false:只复制 mysql-cdc 连接器启动后的变更数据。

copy.existing.pipeline

(none)

String

如果需要复制源库中已有数据时,您可以通过 copy.existing.pipeline 参数配置筛选条件。该参数是一个包含 JSON 对象的数组,可以定义过滤、转换等操作。
示例:"copy.existing.pipeline" : [{ "$match": { "closed": "false" } }],此配置表示只会复制closedfalse的数据。
如需了解更多信息,请参见$match (aggregation)

copy.existing.max.threads

(none)

Integer

如果需要复制源库中已有数据时,您可以通过 copy.existing.max.threads 参数配置执行复制数据时的最大线程数。

copy.existing.queue.size

16000

Integer

如果需要复制源库中已有数据时,您可以通过 copy.existing.queue.size 参数配置队列最大值。

poll.max.batch.size

1000

Integer

每次拉取数据的最大数量。
默认值 1000,表示在拉取间隔(默认 1500 ms)下最多能拉取 1000 条数据。

poll.await.time.ms

1500

Integer

拉取数据的时间间隔,默认值为 1500ms。

heartbeat.interval.ms

0

Integer

发送心跳消息的间隔时长。
默认为 0ms,表示禁用心跳信息,这意味着不会发送心跳信息来维持连接的活动状态,可能会出现一段时间内无法检测出连接中断或故障。
心跳信息时间不宜过短,以免频繁发送心跳信息导致网络开销;不宜过长,以保证能及时检测连接中断或故障。

类型映射

MongoDB CDC 和 Flink 字段类型对应关系如下:

MongoDB 字段类型

Flink SQL 字段类型

-

TINYINT

-

SMALLINT

Int

INT

Long

BIGINT

-

FLOAT

Double

DOUBLE

Decimal128

DECIMAL(p, s)

Boolean

BOOLEAN

DateTimestamp

DATE

DateTimestamp

TIME

Date

TIMESTAMP(3)
TIMESTAMP_LTZ(3)

Timestamp

TIMESTAMP(0)
TIMESTAMP_LTZ(0)

String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex

STRING

BinData

BYTES

Object

ROW

Array

ARRAY

DBPointer

ROW<$ref STRING, $id STRING>

GeoJSON

Point : ROW<type STRING, coordinates ARRAY>
Line : ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
...

示例代码

CREATE TABLE mongo_cdc_source (
    _id bigint,
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time varchar,
    PRIMARY KEY (_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb-cdc',
  'hosts' = 'localhost1:3717,localhost2:3717',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database' = 'doc-db',
  'collection' = 'products'
);
CREATE TABLE print_table (
    _id bigint,
    order_id bigint,
    order_product_id bigint,
    order_customer_id bigint,
    order_status varchar,
    order_update_time varchar
) WITH (
 'connector' = 'print'
);
insert into print_table 
select * from mongo_cdc_source;