You need to enable JavaScript to run this app.
导航

Postgres CDC

最近更新时间2024.02.02 16:34:48

首次发布时间2023.10.30 15:38:58

Postgres CDC 连接器用于从 PostgreSQL 数据库读取全量快照数据和增量数据,仅支持做数据源表。

使用限制

  • Postgres CDC 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。
  • Postgres CDC 仅支持作为数据源表,支持的 PostgreSQL 数据库版本为 9.6、10、11、12 、13、14 版本。
  • 当禁用增量快照功能时,Postgres CDC 暂不支持在全表扫描阶段执行 Checkpoint。
    scan.incremental.snapshot.enabled=false(默认配置)时,如果任务在全表扫描阶段触发 Checkpoint,则可能由于 Checkpoint 超时导致任务 Failover。因此,建议您在 Flink 参数中配置 Checkpoint 时间间隔,以及配置 Task 重启策略,以避免在全量同步阶段由于 Checkpoint 超时导致任务 Failover。

DDL 定义

CREATE TABLE pgsql_source (
  order_id bigint,
  order_customer_id bigint,
  order_product_id bigint,
  order_status varchar,
  order_update_time timestamp,
  PRIMARY KEY (`order_id`) NOT ENFORCED  -- 如果要数据库表定义了主键, 则这里也需要定义。
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'postgre***da.rds-pg.ivolces.com',
  'port' = '5432',
  'username' = 'doc_user',
  'password' = 'Pw**45!',
  'database-name' = 'doc_autotest',
  'schema-name' = 'public',
  'table-name' = 'orders',
  'slot.name' = 'order'
);

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 postgres-cdc 连接器。

hostname

(none)

String

PostgreSQL 数据库的 IP 地址或主机名。

username

(none)

String

PostgreSQL 数据库的用户名称。

password

(none)

String

PostgreSQL 数据库的用户密码。

database-name

(none)

String

PostgreSQL 数据库名称。

schema-name

(none)

String

PostgreSQL Schema 名称。Schema 名称支持正则表达式以读取多个 Schema 的数据。

table-name

(none)

String

PostgreSQL Table 名称。表名支持正则表达式以读取多个表的数据。

port

5432

Integer

Postgres 数据库服务的端口号,默认值为 5432。

slot.name

flink

String

逻辑解码槽的名字。

说明

建议每个表都设置slot.name参数,以避免出现
PSQLException: ERROR: replication slot "flink" is active for PID 974报错。

decoding.plugin.name

decoderbufs

String

Postgres Logical Decoding 插件名称。根据 Postgres 服务上安装的插件确定。支持的插件列表如下:

  • decoderbufs(默认值)
  • wal2json
  • wal2json_rds
  • wal2json_streaming
  • wal2json_rds_streaming
  • pgoutput

changelog-mode

all

String

使用的更改日志模式(changelog mode),支持以下两种模式。

  • all:使用 all 模式时,将变更编码为使用所有 RowKinds 的撤回流。这意味着每个变化操作都会被编码为插入(INSERT)、更新(UPDATE)和删除(DELETE)的事件。
  • upsert:使用 upsert 模式时,将变更编码为描述对主键的幂等更新的 upsert 流。这意味着每个变化操作都会被编码为对具有主键的表的幂等更新。这种模式适用于没有使用全量副本标识(replica identity FULL)的具有主键的表。在使用 upsert 模式时,必须设置主键。

heartbeat.interval.ms

30s

Duration

发送心跳消息的间隔时长。

debezium.*

(none)

String

Debezium 属性参数。
更细粒度控制Debezium客户端的行为。例如'debezium.snapshot.mode' = 'never',详情请参见
配置属性

debezium.snapshot.select.statement.overrides

(none)

String

如果表中有大量数据,但您不需要全部历史数据时,您可以在 Debezium 的底层配置中指定要进行快照的数据范围。该参数只会影响快照,不会影响后续的数据读取消费。

说明

在 PostgreSQL 中,必须使用 Schema 和 Table 名称进行配置,例如'debezium.snapshot.select.statement.overrides' = 'schema.table'
设置该参数后,还必须同时设置debezium.snapshot.select.statement.overrides.[schema].[table]参数。

debezium.snapshot.select.statement.overrides.[schema].[table]

(none)

String

如果您想要限制快照的数据范围,可以使用 debezium.snapshot.select.statement.overrides 配置属性来指定 SQL 语句。

  • SQL 语句必须符合数据源的语法规范,并且需要包含要应用快照选择的模式(schema)和表名。例如'debezium.snapshot.select.statement.overrides.schema.table' = 'select * from schema.table where 1 != 1'
  • 对于 Flink SQL 客户端提交任务,不支持在内容中使用单引号括起来的函数。例如'debezium.snapshot.select.statement.overrides.schema.table' = 'select * from schema.table where to_char(rq, 'yyyy-MM-dd')'

scan.incremental.snapshot.enabled

false

Boolean

是否使用增量快照,默认不使用。
增量快照读取相比传统的快照机制有以下优点:

  • 通过增量快照读取,源端可以并行读取快照,可以提高读取性能并减少读取时间。
  • 增量快照读取允许源端在快照读取过程中以块为粒度进行 Checkpoint 操作,提供了更好的容错性和恢复能力。
  • 增量快照读取,源端在读取快照之前无需获取全局读锁(FLUSH TABLES WITH READ LOCK),提供了更好的并发性和可用性。

增量快照相关参数

scan.incremental.snapshot.enabled=true时,以下增量快照参数可用。

参数

是否必选

默认值

数据类型

描述

scan.incremental.snapshot.chunk.size

optional

8096

Integer

快照的 chunk 大小(行数),指在读取表快照时,将捕获的表分割成多个较小的块或部分。

scan.startup.mode

initial

String

Postgres CDC Consumer 的可选启动模式,支持initiallatest-offset模式。

  • initial:首次启动时对数据库表执行初始快照,并继续读取最新的 binlog。
  • latest-offset:首次启动时,不对数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取连接器启动之后的数据更改。

chunk-meta.group.size

1000

Integer

快照 chunk 元数据的 group 大小,是指如果元数据的大小超过了组大小,那么元数据将会被分成多个组。

connect.timeout

30s

Duration

连接器连接到 PostgreSQL 服务后的最长等待时长。

connect.pool.size

30

Integer

连接池大小。

connect.max-retries

3

Integer

与 PostgreSQL 数据库服务器重连的最大次数。

scan.snapshot.fetch.size

1024

Integer

读取表快照时,每次读取数据的最大条数。

scan.incremental.snapshot.chunk.key-column

(none)

String

表快照的分块键(chunk key),指在读取表快照时,通过一个分块键将捕获的表分割成多个块。默认情况下,分块键是主键的第一列。该列必须是主键的一部分。
当读取表快照时,系统通过分块键将表数据分割成多个较小的块。这样可以提高读取和处理的效率,并允许并行处理每个块。分块键通常是表的某个关键列,用于将数据合理地分布到不同的块中。

chunk-key.even-distribution.factor.lower-bound

0.05d

Double

块键(chunk key)的均匀分布因子下限。

说明

分块键分布因子用于确定表的分布是否均匀的。当数据分布均匀时,表块将使用均匀计算优化;当数据分布不均匀时,将会发生拆分查询。
分布因子可以通过公式计算得出(MAX(id) - MIN(id) + 1) / rowCount

chunk-key.even-distribution.factor.upper-bound

1000.0d

Double

块键(chunk key)的均匀分布因子上限。

可用元数据

在表定义中,以下格式的元数据可以作为只读(VIRTUAL)列暴露出来。

元数据名称

数据类型

描述

table_name

STRING NOT NULL

包含该 Row 的表名称。

schema_name

STRING NOT NULL

包含该 Row 的 schema 名称。

database_name

STRING NOT NULL

包含该 Row 的数据库名称。

op_ts

TIMESTAMP_LTZ(3) NOT NULL

Row 在数据库中进行更改的时间。全量阶段数据,该字段值为 0。

元数据使用示例:

CREATE TABLE products (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    shipment_id INT,
    order_id INT,
    origin STRING,
    destination STRING,
    is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'localhost',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'postgres',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'table-name' = 'shipments'
);

数据映射

Postgres CDC 和 Flink 字段类型对应关系如下:

Postgres CDC 字段类型

Flink 字段类型

TINYINT

SMALLINT
INT2
SMALLSERIAL
SERIAL2

SMALLINT

INTEGER
SERIAL

INT

BIGINT
BIGSERIAL

BIGINT

DECIMAL(20, 0)

BIGINT

BIGINT

REAL
FLOAT4

FLOAT

FLOAT8
DOUBLE PRECISION

DOUBLE

NUMERIC(p, s)
DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT

STRING

BYTEA

BYTES

示例代码

CREATE TABLE pgsql_source (
  order_id bigint,
  order_customer_id bigint,
  order_product_id bigint,
  order_status varchar,
  order_update_time timestamp,
  PRIMARY KEY (`order_id`) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义。
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'postgre***da.rds-pg.ivolces.com',
  'port' = '5432',
  'username' = 'doc_autotest',
  'password' = 'Pw**45!',
  'database-name' = 'doc_autotest',
  'schema-name' = 'public',
  'table-name' = 'orders',
  'slot.name' = 'order'
);

CREATE TABLE print_table (
  order_id bigint,
  order_customer_id bigint,
  order_product_id bigint,
  order_status varchar,
  order_update_time timestamp
) WITH (
 'connector' = 'print'
);
INSERT INTO print_table 
SELECT * FROM pgsql_source;