You need to enable JavaScript to run this app.
导航
高级参数
最近更新时间:2025.06.05 10:59:07首次发布时间:2022.05.27 15:34:06
我的收藏
有用
有用
无用
无用

本文将为您介绍数据集成任务,在不同场景下所使用到的高级参数配置及其相关说明。

使用前提

按需已创建 离线集成任务流式集成任务解决方案任务等任务类型。

高级参数配置步骤

离线集成任务配置说明

离线任务运行参数、数据源相关参数和资源使用参数均配置到下图红框处。

  1. 登录 DataLeap 租户控制台
  2. 选中任务所属的项目,进入数据开发界面,并打开需配置高级参数的任务。
  3. 在【任务运行参数 > 自定义参数设置】中,添加相应的高级参数,添加示例:
    Image

流式集成任务配置说明

  1. 数据源相关参数,任务运行参数等的配置,与离线集成任务中的配置方式一致。
  2. 流式集成任务的资源参数,可直接在下图红框处进行可视化配置:
    Image
  3. 流式集成任务的其他高级参数,可在【任务运行参数 > 高级参数】中添加,添加示例:
    Image

解决方案参数配置说明

DataSail 解决方案高级参数设置,您可在方案的资源组高级配置中进行设置。

  1. 登录 DataSail 控制台
  2. 在左侧导航栏中选择数据同步方案,进入同步方案配置界面。
  3. 单击目录树中项目选择入口,选择已创建的 DataLeap 项目。
    Image
  4. 在方案编辑界面中,您可通过以下两个入口配置高级参数:
    • 入口一:【基本配置 > 资源组高级配置】中,添加相应的离线、实时高级参数设置。
      Image
    • 入口二:【映射配置 > 高级参数配置】中,添加相应的高级参数设置。
      Image

通用参数

离线任务运行参数

参数名称

参数说明

默认值

job.common.dirty_record_skip_enabled

是否跳过脏数据。

true

  • job.common.reader_transport_channel_speed_record
  • job.common.writer_transport_channel_speed_record

读写限速参数,每秒读写条数限制,默认值 -1,代表不限制;配置大于 0 时,就开启条数限制。

-1

  • job.common.reader_transport_channel_speed_byte
  • job.common.writer_transport_channel_speed_byte

读写限速参数,每秒读写 bytes 限制,默认值 -1,代表不限制;配置大于 0 时,就开启限制。

-1

  • job.reader.reader_parallelism_num
  • job.writer.writer_parallelism_num

连接器的读并发和写并发,只适用于离线任务
不建议配置,不合理的配置会造成资源浪费或导致执行变慢。

无默认值,系统根据数据量大小自动推算并发数。

job.writer.case_insensitive

大小写不敏感。
在读写 TOS、OSS、ES、Kafka 等数据源时,如有大小写转换问题,建议配置为大小写敏感即 job.writer.case_insensitive=false

true

job.writer.pre_sql_list

写入数据源前置处理 SQL List ,格式是 json 数组,如:
job.writer.pre_sql_list=["delete from xx where id=xxx","delete from xx where id=xxx"]

流式任务运行参数

参数名称

参数说明

默认值

job.common.global_parallelism_num

连接器的全局并发数,只适用于流任务
该参数会决定启动多少个 TaskManager。

  • 如果 MQ 中的流量较小,则可通过在任务高级参数中指定此参数来控制作业的全局并发。
  • 如果 MQ 的 Partition 个数很多,但数据流量并不高,亦可通过此参数来节约任务的执行资源。

MQ Partition 个数 / 4

job.common.checkpoint_interval

每次 Checkpoint 时间间隔,单位为毫秒,默认 900000 毫秒(15分钟)进行一次 Checkpoint,只适用于流任务

900000

job.common.checkpoint_timeout

Checkpoint 超时时间,单位为毫秒,只适用于流任务

600000

解决方案相关参数

  • 实时整库中离线全量同步参数:

    说明

    离线全量同步更多数据源高级参数配置,可参考离线任务运行参数 数据源参数

    参数名称

    参数说明

    默认值

    job.common.is_use_batch_mode

    是否 batch 模式:

    • true: flink 任务 batch 模式,在资源有限情况,部分 taskmanager 也可运行任务;
    • false:flink 任务 pipeline 模式,适用数据量大时,读写同时进行。

    true

    job.reader.enable_string_compatible

    PostgreSQL2Hudi 实时分库分表同步解决方案中,若源端存在当前不能识别的 postgre 数据类型时,您可根据实际业务情况添加该参数,来判断是否将其转换为 string 类型。

    • false:不转换,作业运行报错;
    • true:转换为 string 类型,作业正常运行。

    说明

    该参数仅适用于 PostgreSQL2Hudi 实时分库分表同步解决方案。

    false

  • 实时整库中实时增量同步参数

    参数名称

    参数说明

    默认值

    job.common.checkpoint_interval

    设定 Checkpoint 刷新时间。

    900000

    job.reader.poll_interval_ms

    设置读 binlog 的刷新时间,默认 500 毫秒。
    设置场景:调小此参数,提高实时性。

    500

    job.reader.debezium

    • 忽略特殊不能解析的 ddl:
      "database.history.skip.unparseable.ddl": "true"

    • binlog 忽略 sql 操作配置:
      "skipped.operations":"d"
      取值这几个: d,c,u

      • c for inserts/create,
      • u for updates,
      • d for deletes,
      • t for truncates,
      • none to not skip any operations.

      默认 : truncate operations are skipped.
      配置示例:{"database.history.skip.unparseable.ddl":"true","skipped.operations":"d,u"}
      更多说明可参考官网:https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-skipped-operations

    -

  • Flink 运行参数表:

    参数名称

    参数说明

    默认值

    taskmanager.memory.managed.size

    每个 Task Manager 的托管内存占总内存大小。
    推荐配置:200m (实时任务使用较少)

    -

    taskmanager.memory.network.fraction

    每个 Task Manager 的网络内存的占比。
    推荐配置:0.05 (实时任务使用较少)

    -

资源使用相关参数

说明

资源使用参数只适用于离线集成任务。
流式集成任务的资源参数,可按照上方【流式集成任务配置说明】章节中的介绍,直接在页面中选配即可。

参数名称

参数说明

默认值

job.common.flink_tm_vcores

每个 Task Manager 使用的 CPU 核数。

  • 取值说明:必须大于或等于 0.5,且必须是 0.5 的倍数

1.0

job.common.slots_per_tm

每个 Task Manager 默认 slot 的数量。

2

job.common.flink_tm_slot_memory

每个 Task Manager 中的各个 slot 的内存大小,单位为 MB。

  • 取值说明:每个 Task Manager 的 CPU 和内存(GB)的比例,必须满足:flink_tm_vcores : (flink_tm_slot_memory [GB] * slots_per_tm) = 1:2 或 1:4。
  • 取值示例:flink_tm_vcores = 4.0,slots_per_tm = 2,则 flink_tm_slot_memory 必须为 4GB 或 8GB,即配置为 4096 或 8192。

2048

job.common.flink_tm_task_off_heap_memory

每个 Task Manager 的堆外内存占总内存的比例。

0.125

job.common.flink_tm_managed_memory_ratio

每个 Task Manager 的托管内存占总内存的比例。

0.2

job.common.flink_tm_network_max

每个 Task Manager 的网络内存的最大值,单位为 GB。

  • 取值说明:配置时使用小写字母 g,比如:2g。

2g

job.common.flink_jm_vcores

Flink Job Manager 的 CPU 核数。

  • 取值说明:不得小于 0.5,且必须是 0.5 的倍数

1.0

job.common.flink_jm_memory

Flink Job Manager 的总内存大小,单位为 MB。

  • 取值说明:Job Manager 的 CPU 和内存(GB)的比例,必须满足:flink_jm_vcores : flink_jm_memory [GB] = 1:2 或 1:4。
  • 取值示例:flink_jm_vcores = 2.0,则 flink_jm_memory 必须为 4GB 或 8GB,即配置为 4096 或 8192。

4096

job.common.flink_jm_off_heap_memory

Flink Job Manager 的堆外内存占总内存的比例。

0.125

数据源参数

ByteHouse CDW/CE

ByteHouse_CDW/CE 批式读

读取 ByteHouse_CDW 时,支持以下高级参数,您可根据实际情况进行配置:

参数名称

描述

默认值

job.reader.split_config

设置任务分片数量配置参数。
配置示例: job.reader.split_config = {"split_num": 10}
适用范围: 整数类型切分键,如切分键是 int 类型,默认分片数是和并发数相同 ,可以通过此参数修改分片数量:job.reader.split_config = {"split_num": 100}

分片数是和并发数相同

job.reader.string_split_size

设置分片大小。
适用范围:字符串类型切分键,您可根据该值计算分片数量。
该值不宜过大或过小:

  • 过小,分片数量较多,计算分片耗时较长
  • 过大,单分片数量过大,触发读数据超时

1000000

job.reader.customized_connection_properties

读取 ByteHouse 超时设置参数。
配置示例: 超时设置 : job.reader.customized_connection_properties = {"max_execution_time":3000}

ByteHouse_CDW/CE 批式写

批式写入 ByteHouse_CDW/CE 时,支持以下高级参数,您可根据实际情况进行配置:

参数名称

描述

默认值

job.writer.flush_interval

写入 buffer 的刷新时间,默认 60000 毫秒

60000

job.writer.buffer_count

写入 buffer 记录条数,默认 8192

8192

job.writer.query_timeout

设定查询超时退出时间,默认 30000 毫秒

30000

job.writer.skip_delete_task

写入 ByteHouse CDW 是通过导入任务方式,默认任务执行完,再删除导入任务,通过此参数,您可设置是否跳过删除导入任务 ;
适用场景:需要在 ByteHouse CDW 查看导入服务日志时,参数设置为 true,则跳过删除导入任务。

false

job.writer.cfs_write_batch_size

  • 动态分区默认 4096;
  • 非动态分区默认 8192。

说明

该高级参数,需配合 CFS 写入方式进行使用。

4096/8192

job.writer.cfs_vw_id

当选择 CFS 方式写入 ByteHouse 云数仓版时,建议在自定义参数设置中添加该高级参数,指定数据导入服务所使用的计算组信息。格式如下:
job.writer.cfs_vw_id = vw-21xxxxx57-dts-test,需替换为具体的计算组信息,可前往 ByteHouse 控制台获取对应的计算组。详见 ByteHouse CDW 计算组

job.writer.bh_ce_partition_type

显示的指定 ByteHouse 的分区字段为 string 类型,当分区字段使用函数时,需要使用此高级参数函数。

string

job.writer.loading_mode

高级参数 job.writer.loading_mode 可设置为 FULL_REFRESH 或 INCREMENTAL,默认为 INCREMENTAL:

  • FULL_REFRESH:可支持非分区表清除原先表中数据后再导入新数据,避免数据重复。
  • INCREMENTAL:支持分区表导入数据时,事先清除对应分区中的数据后再导入新数据,对非分区表无效。

说明

  • 该高级参数,需配合 CFS 写入方式进行使用。
  • 该高级参数对写入 ByteHouse CE 数据源时不适用。

INCREMENTAL

job.writer.extraProperties

添加此参数,数据导入 ByteHouse CDW 时,支持原子性导入数据。
配置示例如下:
job.writer.extraProperties = {"job_time":"${date}"}

job.writer.bh_connection_properties

Map<String,String> 类型。
离线、实时解决方案中,目标数据源为 ByteHouse CE、ByteHouse CDW 时,可自定义设置 ByteHouse query 参数,设置到 jdbc 连接的 properties 中。
配置示例如下:
job.writer.bh_connection_properties={"query_timeout":63}

说明

该高级参数,需配合 JDBC 写入方式进行使用。

job.writer.session_properties

Map<String,String> 类型。
离线、实时解决方案中,目标数据源为 ByteHouse CE、ByteHouse CDW 时,可自定义设置 ByteHouse session 参数,通过在当前 session 中执行 set key=value 的语句来实现。
配置示例如下:
job.writer.session_properties={"max_threads":8}

job.writer.pre_sql_list

写入 ByteHouse CDW 数据源前置处理 SQL List ,格式是 json 数组,多条执行语句可用逗号分隔。如:
job.writer.pre_sql_list=["delete from table_name1 where id=xxx","delete from table_name2 where id=xxx"]

说明

该高级参数,需配合 JDBC 写入方式进行使用。

Elasticsearch

Elasticsearch 批式读

参数名

描述

默认值

job.reader.case_insensitive

读取数据时字段大小写是否需要敏感。
在读 ES 数据源时,如有字段包含大写,建议配置为大小写不敏感。即 job.reader.case_insensitive=false

true

HDFS

HDFS 通用高级参数

参数

描述

默认值

job.common.case_insensitive

JSON 内容解析时是否对字段 Key 大小写敏感。

true

job.common.support_json_path

是否支持带 . 的字段名。true为支持,false 为不支持。

false

job.common.json_serializer_features

DataSail 使用 fastjson 解析 JSON 内容,用户可以通过此参数设置 JSON 解析的 features,详情参考 SerializerFeature - fastjson 1.2.83 javadoc。多个 SerializerFeature 使用逗号分隔。

job.common.convert_error_column_as_null

是否将类型转化失败的字段默认置为 null。

false

job.common.host_ips_mapping

HDFS 数据源通过连接串形式配置时,需在高级参数中配置 ip_mapping 信息,将 hdfs 集群的节点域名与 ip 进行映射,示例如下:
Image

job.common.host_ips_mapping = {
 "master-1-1.emr-c7axxxxxxxxx9b.cn-beijing.emr-volces.com":"xxx.xx.x.xx",
 "master-1-3.emr-c7axxxxxxxxx9b.cn-beijing.emr-volces.com":"xxx.xx.x.xx",
 "core-1-1.emr-c7axxxxxxxxx9b":"xxx.xx.x.xx",
 "master-1-1.emr-c7axxxxxxxxx9b":"xxx.xx.x.xx",
 "core-1-1.emr-c7axxxxxxxxx9b.cn-beijing.emr-volces.com":"xxx.xx.x.xx",
 "core-1-2.emr-c7axxxxxxxxx9b":"xxx.xx.x.xx",
 "master-1-2.emr-c7axxxxxxxxx9b.cn-beijing.emr-volces.com":"xxx.xx.x.xx",
 "master-1-3.emr-c7axxxxxxxxx9b":"xxx.xx.x.xx",
 "core-1-2.emr-c7axxxxxxxxx9b.cn-beijing.emr-volces.com":"xxx.xx.x.xx",
 "master-1-2.emr-c7axxxxxxxxx9b":"172.16.1.68"
 }

HDFS 批式读

参数

描述

默认值

job.reader.parse_partition_from_path

在需要读取并解析数据源地址路径下的分区字段场景中,可以添加此高级参数,在手动添加分区字段映射后,便可正常读取分区字段数据。
格式为:job.reader.parse_partition_from_path=true

false

job.reader.partition_num

读取分区字段数据场景中,如果是读取 Json 这类没有 Schema 定义的数据格式时,需添加此高级参数,来告知当前作业设置的路径中包含多少个分区字段数。
格式为:job.reader.partition_num=n,n 为分区字段个数。

HDFS 实时写

参数

描述

默认值

job.writer.rolling.max_part_size

文件切割大小,单位字节,默认 10G。

注意

这里是指未压缩读的数据大小, 而非 HDFS 最终文件大小。

10737418240

job.writer.hdfs.replication

HDFS 副本数

3

job.writer.hdfs.compression_codec

HDFS 压缩格式,支持

  • snappy
  • lz4
  • zstd
  • fourmc
  • fourmz
  • gzip
  • None(不压缩)

zstd

job.writer.dump.directory_frequency

写入 HDFS 文件夹的频率,支持以下参数:

  • 天级:dump.directory_frequency.day
  • 小时级:dump.directory_frequency.hour

dump.directory_frequency.day

HIVE

HIVE 通用高级参数

参数名称

参数说明

默认值

job.common.checkpoint_interval

设定 Checkpoint 刷新时间,默认 15 分钟,如果实时写入 Hive 时,写入 Hive 时间依据此参数。

900000

job.common.host_ips_mapping

Hive 数据源通过连接串方式接入自建集群时,需通过该高级参数配置 host 与真实 ip 之间的映射关系,示例如下:

{
  "core-1-1.emr-1": "1xx.xx.0.11",
  "core-1-2.emr-2": "1xx.xx.0.12",
  "master-1-1.emr-1": "1xx.xx.0.13"
}

HIVE 批式写

参数名称

参数说明

默认值

job.writer.partition_strategy

参数分区创建策略
参数值:partition_first、partition_last
适应场景:流任务写数据时设置为 partition_first 时,便可实时查看当前 Hive 分区数据。

partition_last

job.writer.partition.date_format

写入动态日期分区格式,可配置为:yyyyMMdd、yyyy-MM-DD

yyyyMMdd

job.writer.partition.hour_format

写入动态小时分区格式,可配置为:hh、HH

HH

job.writer.null_string_as_null

复杂类型中的 string 类型,默认会将 null 写为空字符串。如果需要配置默认写入 null,可以将此参数配置为 true。如脏数据中存在无法转换的列时,会自动转换为 null 值。

false

job.writer.case_insensitive

默认会将数据全部转换为小写。

true

job.writer.convert_error_column_as_null

将脏数据中无法转换的列自动转换为 null。

false

job.writer.dump.directory_frequency

写入 HDFS 文件夹的频率,支持以下参数:

  • 天级:dump.directory_frequency.day
  • 小时级:dump.directory_frequency.hour

dump.directory_frequency.day

Kafka

Kafka 流式读

参数名称

描述

默认值

job.reader.connector.startup-mode

默认消费起始位置参数指定:

  • earliest-offset 最老:job.reader.connector.startup-mode=earliest-offset,指定最早的消费起始位置;
  • latest-offset 最新:job.reader.connector.startup-mode=latest-offset,指定最新的消费起始位置;
  • specific-offsets:通过分区 offset 位点指定消费起始位置,job.reader.connector.connector.specific-offsets = [{"partition":0,"offset":1},{"partition":1,"offset":2}...]
  • specific-timestamp:通过时间戳位点来指定消费起始位置,job.reader.connector.connector.specific-timestamp = 1716292387000(某个时间的时间戳,单位为毫秒)

job.reader.metadata_columns

读取 Kafka 元数据相关信息,多个元数据可用英文逗号隔开。配置示例如下:job.reader.metadata_columns = timestamp,offset,key,value,partition,headers

Kafka 离线写

参数名称

描述

默认值

job.common.host_ips_mapping

Kafka 通过公网接入,kafka broker 设置为域名;需配置 ip 与域名映射,示例如下:
{
"ukxxxxxxxx-kafka1":"xxx.xx.xxx.xx",
"ukxxxxxxxx-kafka2":"xxx.xx.xxx.xx",
"uxxxxxxxxx-kafka3":"xxx.xx.xxx.xx",
}

job.common.skip_dump_parse

Kafka 数据源通过公网形式接入,开启 SASL_SSL 认证时,需设置该参数为 true。
Image

false

job.writer.properties

max.request.size 消息体大小;
buffer.memory 缓存大小。

说明

适用范围:DataSail 整库解决方案配置中,如果单个消息体比较大时,可以调整此参数。

{"max.request.size":1048576,"buffer.memory":33554432}

job.writer.compression_type

消息压缩格式,支持 none、snappy、gzip、lz4

说明

DataSail 整库解决方案配置中,可指定消息压缩格式。

snappy

Mongo

MongoDB 解决方案读

参数名

描述

默认值

solution.reader.ddl.external_schema_mode

由于 Mongo 灵活的 Schema 定义,且 Mongo Schema Fetcher 获取的字段是无序且不保证每次获取的结果一致,这对解决方案中的自动建表能力有一定的困难。
因此平台提供了该参数,用于指定需要在 Schema Fetcher 中获取的字段及顺序。取值说明如下:

  • Replace:
    solution.reader.ddl.external_schema_mode=Replace
    当对字段顺序存在要求,或者想要自定义来源字段时,可使用该取值。此时,Mongo 返回的字段及字段属性都会被丢弃。所以,您需要结合下方的solution.reader.ddl.external_schemas参数,自行定义每个字段的类型以及 isPrimaryKey 等关键字段,传递完整的 Schema 信息。
  • Merge:
    solution.reader.ddl.external_schema_mode=Merge
    当字段顺序无要求且场景中有增量字段时,可用该取值。结合下方 solution.reader.ddl.external_schemas 参数,并通过设置字段的 position 参数,来指定新增字段在 Schema 中插入的位置,插入顺序取值如下:
    • First:置于 Schema 首位。
    • Last:置于 Schema 末尾。
    • Before:须指定参数,如 Before(name) ,置于 name 字段之前。
    • After:须指定参数,如 After(name) ,置于 name 字段之后。

MERGE

solution.reader.ddl.external_schemas

自定义 Mongo Schema 字段信息,示例如下:

  • solution.reader.ddl.external_schema_mode=Replace,全列替换模式,示例如下:

    solution.reader.ddl.external_schemas={
            "test.dts_mongo_test": [{
                    "name": "_id",
                    "type": "objectid",
                    "isPrimaryKey": true
            }, {
                    "name": "address2",
                    "type": "string"
            }, {
                    "name": "price",
                    "type": "double"
            }]
    }
    

    Image

  • solution.reader.ddl.external_schema_mode=Merge,融合模式,示例如下:

    solution.reader.ddl.external_schemas={
            "test.dts_mongo_test": [{
                    "name": "_id",
                    "type": "objectid",
                    "position": "first"
            }, {
                    "name": "address2",
                    "type": "string",
                    "isPrimaryKey": true,
                    "position": "before(_id)"
            }, {
                    "name": "price",
                    "type": "double",
                    "position": "last"
            }]
    }
    

    Image

MongoDB 批示读

参数名

描述

默认值

reader_fetch_size

单批次读取文档 doc 的数量。

100000

filter

指定读取过滤条件,满足 MongoDB 语法,如读取 id = 1000 的数据,填写示例如下:
{id: {$eq: 1000}}

split_mode

分片模式支持两种:

  1. paginating:单个分片的数量为:totalRecords / batchSize
  2. parallelism:若 MongoDB 支持 splitVector 功能,则使用 MongoDB 内置切片功能,否则将根据文档数量/parallelism 平均划分,单个分片的数量为:totalRecords / parallelismpaginating:根据数据量/fetchsize精确分片

说明

若需关闭分片,可设置并发度为 1。

parallelism

MongoDB 批式写

参数名

描述

默认值

max_connection_per_host

连接池最大连接数。

100

connect_timeout_ms

连接超时时间。

10000

batch_size

单批次写入 MongoDB 的数据量。

100

write_mode

高级参数设置写入方式:

  • insert:直接写入数据,表示不覆盖,出现唯一键冲突时,任务会执行失败。
  • overwrite:写入时,不清除数据,唯一键相同,用新的数据覆盖旧数据,唯一键不同时,直接插入新数据。
  • insert_if_not_exists:写入时,不清除数据,唯一键相同,原数据保持不变,唯一键不同时,直接插入新数据。

MySQL

MySQL 批式读

批式读支持以下高级参数,您可根据实际情况进行配置:

参数名称

描述

默认值

job.reader.init_sql

读取数据前执行的 SQL 语句。对于视图的查询可能需要使用 init SQL 语句初始化环境。

job.reader.reader_fetch_size

每次拉取的数据条数,只在准确分片中有效。

10000

job.reader.query_timeout_seconds

Jdbc 方式读取数据,设定读取超时时间,单位秒。

300

job.reader.shard_split_mode

Jdbc 连接分片模式,支持准确分片、并发分片、不分片三种模式:

  • 准确分片(默认):根据配置的分片键将数据拆分为不同的区间,除下最后一个区间外,每个区间精准的有 reader_fetch_size 条数,此模式速度较慢,但可以保证下游文件大小基本一致。
    • 拉取数据量很大的表或者分片键不是主键或者索引键时,该分片模式分片时间会比较长;
    • 该分片模式支持分片键为整型数据类型和字符串数据类型;
    • 配置方式:将该参数配置为 accurate。
  • 非精确分片:此模式速度较快,但无法保障下游文件大小一致。
    • 该分片模式仅支持分片键为整型数据类型;
    • 参数配置为 parallelism时:根据表的最大最小值,将所有的数据按照执行并发数进行区间分片;
    • 参数配置为quick时:根据 reader_fetch_size 进行模糊分片。
  • 不分片:不进行分片,适用于表数据量较小,或没有主键、索引键的表。
    • 配置方式:将该参数配置为 nosplit 或者不配置 split_pk 。

准确分片

job.reader.customized_sql

自定义查询读取 SQL 语句。filter 过滤配置项不足以描述所筛选的条件,可通过该配置项来自定义执行较复杂的查询 SQL。
例如:需要进行多表 join 后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id。

说明

配置该高级参数项后,数据同步任务仍需配置 table_name、column 、split_pk 、shard_split_mode 等必填配置项。然而,在执行同步时,系统将忽略这些配置项信息,直接使用该高级参数项中配置的内容进行数据查询和筛选。

MySQL 批式写

批式写支持以下高级参数,您可根据实际情况进行配置:

参数名称

描述

默认值

job.writer.is_insert_ignore

insert into 模式时,主键或者唯一键冲突时任务失败还是忽略冲突,false 为任务执行失败;true 为忽略冲突,任务正常执行。

false

job.writer.write_batch_interval

一次性批量提交的数据条数,该值可以减少与 MySQL 网络的交互次数并提升整体吞吐量。如果该值设置过大可能会导致数据同步进程 OOM。

100

job.writer.write_retry_times

MySQL 写入失败时重试次数。

3

job.writer.retry_interval_seconds

写入失败后两次重试的时间间隔,单位秒。

write_batch_interval / 10

job.writer.connection_parameters

Jdbc 连接的全部参数,可在默认值后追加补充:

  • tcpRcvBuf 读单行数据量大时使用:

tcpRcvBuf=1024000 (1MB)

  • allowMultiQueries 允许支持使用多条 sql,用分号隔开:

allowMultiQueries=true

  • connectTimeout 连接超时时间,毫秒:

connectTimeout=60000

autoReconnect=true&useUnicode=true&characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull

StarRocks

StarRocks 批式写

参数名称

参数说明

参数默认值

job.writer.sink_flush_interval_ms

写入 buffer 刷新时间,默认 60000 毫秒

60000

job.writer.sink_buffer_size

写入 buffer 数据大小,默认 10485760 (10MB)

10485760

job.writer.sink_buffer_count

写入buffer 记录条数,默认 8192

8192

job.writer.stream_load_properties

数据写入方式参数指定。
示例:主键冲突策略

  • 默认整行更新
  • {"partial_update":true} 部分更新,只更新有列映射的字段,仅支持主键表
  • {"insert_ignore":"ignore"} 忽略更新,按主键忽略新数据,仅支持主键表

整行更新

job.writer.request_read_timeouts

写入等待获取结果时间,默认 60000 毫秒
适用场景:如数据量比较大,写入较慢,会触发 timeout,调大此参数

60000

job.writer.request_connect_timeouts

写入连接超时时间,默认 60000 毫秒

60000

job.writer.sink_enable_2PC

写入时任务分两阶段提交,默认 false
true 适用场景:流式集成写入,两阶段提交,保证数据 exactly once

false

Paimon

Paimon 批式读

高级参数 Key

高级参数 Value

job.reader.properties

运行时动态设置 Paimon 表的属性(参见 https://paimon.apache.org/docs/0.8/maintenance/configurations/),格式为 Map<String, String> 类型的 JSON 字符串

job.reader.limit

限制要读取的行数,通常用于数据抽样或测试

Paimon 批式写

高级参数 Key

高级参数 Value

job.writer.properties

运行时动态设置 Paimon 表的属性(参见 https://paimon.apache.org/docs/0.8/maintenance/configurations/),格式为 Map<String, String> 类型的 JSON 字符串

job.writer.overwrite_partition

覆盖写特定的分区,当且仅当“写入模式”为“覆盖”时生效,格式为 Map<String, String> 类型的 JSON 字符串

job.common.checkpoint_interval

流作业 Flink 快照生成周期(单位为毫秒),默认为 300s(5 分钟)。
当流式写 Paimon 时,每次快照完成后才会进行数据提交(Commit)操作,因此较短的快照周期可以让新数据更快地被下游查到。
但是过短的快照周期会产生大量的小文件,导致性能和查询效率受到严重影响,请合理设置该值。

Paimon 解决方案自动建表高级参数

在解决方案的刷新目标表映射界面,我们可以填入一些高级参数来控制建表行为,详见 实时整库同步

高级参数 Key

高级参数 Value

solution.writer.common.ddl.buckets_num

目标表的分桶(bucket)数。如果不设置,默认为 -1(动态 bucket)。

solution.writer.paimon.ddl.bucket_keys

目标表的分桶键(bucket key)。如果有多个字段,可用半角逗号分隔(例如 id,name)。
如果不设置,默认 bucket key 等同于主键;如果表不含主键,则默认会使用所有字段作为 bucket key。

solution.writer.paimon.ddl.options

建表时的各类可选参数(参见 https://paimon.apache.org/docs/0.8/maintenance/configurations/),格式为 Map<String, String> 类型的 JSON 字符串

PostgreSQL

PostgreSQL 批式读

参数名称

描述

默认值

job.reader.init_sql

读取数据前执行的 SQL 语句。对于视图的查询可能需要使用 init SQL 语句初始化环境

job.reader.reader_fetch_size

每次拉取的数据条数,只在准确分片中有效。

10000

job.reader.shard_split_mode

分片模式,支持准确分片、并发分片、不分片三种模式:

  • 准确分片(默认):根据配置的分片键将数据拆分为不同的区间,除下最后一个区间外,每个区间精准的有 reader_fetch_size 条数。
    • 拉取数据量很大的表或者分片键不是主键或者索引键时,该分片模式分片时间会比较长;
    • 该分片模式支持分片键为整型数据类型和字符串数据类型;
    • 配置方式:将该参数配置为 accurate。
  • 并发分片:根据表的最大最小值,将所有的数据按照并发数进行区间分片。
    • 该分片模式仅支持分片键为整型数据类型;
    • 配置方式:将该参数配置为 parallelism。
  • 不分片:不进行分片,适用于没有主键、索引键的表。
    • 配置方式:将该参数配置为 nosplit 或者不配置 split_pk。

准确分片

job.reader.customized_sql

自定义查询读取 SQL 语句。filter 过滤配置项不足以描述所筛选的条件,可通过该配置项来自定义执行较复杂的查询 SQL。
例如:需要进行多表 join 后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id。

说明

配置该高级参数项后,数据同步任务仍需配置 table_name、column 、split_pk 、shard_split_mode 等必填配置项。然而,在执行同步时,系统将忽略这些配置项信息,直接使用该高级参数项中配置的内容进行数据查询和筛选。

PostgreSQL 批式写

参数名

描述

默认值

job.writer.is_insert_ignore

insert into 模式时,主键或者唯一键冲突时任务失败还是忽略冲突

false

job.writer.write_batch_interval

一次性批量提交的数据条数,该值可以减少与 PostgreSQL 网络的交互次数并提升整体吞吐量。如果该值设置过大可能会导致数据同步进程 OOM。

100

job.writer.write_retry_times

PostgreSQL 写入失败时重试次数。

3

job.writer.retry_interval_seconds

写入失败后两次重试的时间间隔,单位秒

write_batch_interval / 10

Redis

Redis 离线读

参数名称

描述

默认值

job.reader.key_pattens

读取 Redis keys 的匹配策略,默认为*,即读取所有的 key。
支持精确匹配和模糊匹配:

  • 精确匹配:比如只同步user_a和user_b的 key,填写user_a,user_b即可;
  • 模糊匹配:比如只同步前缀是user_info的 key,填写 user_info*即可。

*

job.reader.db_index

Redis 逻辑库索引号,默认为 0。如果您的数据位于 Redis 的其他 DB,比如在 DB 6 中,则填写6即可。

0

job.reader.reader_parallelism_num

读取 Redis 分片数,Redis 服务为单线程模型,推荐设置为 1,默认为 1。

1

job.reader.client_timeout_ms

创建 Redis 连接的超时时间,单位为毫秒(ms)。

60000

job.reader.max_attempt_count

执行单次 Redis Command 失败的最大重试次数

3

Redis 离线写

参数名称

描述

默认值

job.writer.write_batch_interval

一次性批量提交的数据条数,该值可以减少与 Redis 网络的交互次数并提升整体吞吐量。但如果该值设置过大可能会导致数据同步进程 OOM。

50

job.writer.database

指定写入 Redis 中的 Database 信息,默认为0。

0