You need to enable JavaScript to run this app.
导航

SQLServer CDC

最近更新时间2024.02.02 16:34:47

首次发布时间2023.10.30 15:38:58

SQLServer CDC 连接器用于从 SQLServer 数据库读取全量数据和增量数据,仅支持做数据源表。

使用限制

  • SQLServer CDC 连接器暂时仅支持在 Flink 1.16-volcano 引擎版本中使用。
  • SQLServer CDC 仅支持作为数据源表,支持的 SQLServer 数据库版本为 2012、2014、2016、2017、2019 版本。

DDL 定义

CREATE TABLE sqlserver_source (
  order_id bigint,
  order_customer_id bigint,
  order_product_id bigint,
  order_status varchar,
  order_update_time timestamp
 ) WITH (
    'connector' = 'sqlserver-cdc',
    'hostname' = 'mssql****85.rds-mssql.ivolces.com',
    'port' = '1433',
    'username' = 'doc_user',
    'password' = 'Pwd***5!',
    'database-name' = 'doc_autotest',
    'table-name' = 'dbo.orders'
 );

WITH 参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,此处是 sqlserver-cdc 连接器。

hostname

(none)

String

SQLServer 数据库的 IP 地址或主机名。

username

(none)

String

SQLServer SQLServer 数据库的用户名称。

password

(none)

String

SQLServer 数据库的用户密码。

database-name

(none)

String

SQLServer 数据库名称。

table-name

(none)

String

SQLServer Table 名称。表名支持正则表达式以读取多个表的数据。

port

1433

Integer

SQLServer 数据库服务的端口号,默认值为 1433。

server-time-zone

UTC

String

SQLServer 数据库会话时区设置,例如 'Asia/Shanghai'。

scan.incremental.snapshot.enabled

true

Boolean

是否启用增量快照,默认启用

chunk-meta.group.size

1000

Integer

快照 chunk 元数据的 group 大小,是指如果元数据的大小超过了组大小,那么元数据将会被分成多个组。

chunk-key.even-distribution.factor.lower-bound

0.05d

Double

块键(chunk key)的均匀分布因子下限。

说明

分块键分布因子用于确定表的分布是否均匀的。当数据分布均匀时,表块将使用均匀计算优化;当数据分布不均匀时,将会发生拆分查询。
分布因子可以通过公式计算得出(MAX(id) - MIN(id) + 1) / rowCount

chunk-key.even-distribution.factor.upper-bound

1000.0d

Double

块键(chunk key)的均匀分布因子上限。

debezium.*

(none)

String

Debezium 属性参数,从更细粒度控制 Debezium 客户端的行为。例如 'debezium.snapshot.mode' = 'initial_only',详情请参见配置属性

scan.incremental.close-idle-reader.enabled

false

Boolean

是否在快照结束后关闭空闲的读取器(reader)。

可用元数据

在表定义中,以下格式的元数据可以作为只读(VIRTUAL)列暴露出来。

元数据名称

数据类型

描述

table_name

STRING NOT NULL

包含该 Row 的表名称。

schema_name

STRING NOT NULL

包含该 Row 的 schema 名称。

database_name

STRING NOT NULL

包含该 Row 的数据库名称。

op_ts

TIMESTAMP_LTZ(3) NOT NULL

Row 在数据库中进行更改的时间。全量阶段数据,该字段值为 0。

元数据使用示例:

CREATE TABLE products (
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    schema_name STRING METADATA  FROM 'schema_name' VIRTUAL,
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    id INT NOT NULL,
    name STRING,
    description STRING,
    weight DECIMAL(10,3)
) WITH (
    'connector' = 'sqlserver-cdc',
    'hostname' = 'localhost',
    'port' = '1433',
    'username' = 'sa',
    'password' = 'Password!',
    'database-name' = 'inventory',
    'schema-name' = 'dbo',
    'table-name' = 'products'

数据映射

SQLServer CDC 和 Flink 字段类型对应关系如下:

SQLServer CDC 字段类型

Flink 字段类型

char(n)

CHAR(n)

varchar(n)
nvarchar(n)
nchar(n)

VARCHAR(n)

text
ntext
xml

STRING

decimal(p, s)
money
smallmoney

DECIMAL(p, s)

numeric

NUMERIC

float
real

DOUBLE

bit

BOOLEAN

int

INT

tinyint

SMALLINT

smallint

SMALLINT

bigint

BIGINT

date

DATE

time(n)

TIME(n)

datetime2
datetime
smalldatetime

TIMESTAMP(n)

datetimeoffset

TIMESTAMP_LTZ(3)

示例代码

CREATE TABLE sqlserver_source (
  order_id bigint,
  order_customer_id bigint,
  order_product_id bigint,
  order_status varchar,
  order_update_time timestamp
 ) WITH (
    'connector' = 'sqlserver-cdc',
    'hostname' = 'mssql****85.rds-mssql.ivolces.com',
    'port' = '1433',
    'username' = 'doc_user',
    'password' = 'Pwd***5!',
    'database-name' = 'doc_autotest',
    'table-name' = 'dbo.orders'
 );

CREATE TABLE print_sink (
  order_id bigint,
  order_customer_id bigint,
  order_product_id bigint,
  order_status varchar,
  order_update_time timestamp
) WITH (
  'connector' = 'print'
);

INSERT INTO print_sink 
SELECT * FROM sqlserver_source;