本文将为您介绍数据集成任务,在不同场景下所使用到的高级参数配置及其相关说明。
按需已创建 离线集成任务、流式集成任务、解决方案任务等任务类型。
离线任务运行参数、数据源相关参数和资源使用参数均配置到下图红框处。
DataSail 解决方案高级参数设置,您可在方案的资源组高级配置中进行设置。
参数名称 | 参数说明 | 默认值 |
---|---|---|
job.common.dirty_record_skip_enabled | 是否跳过脏数据。 | true |
| 读写限速参数,每秒读写条数限制,默认值 -1,代表不限制;配置大于 0 时,就开启条数限制。 | -1 |
| 读写限速参数,每秒读写 bytes 限制,默认值 -1,代表不限制;配置大于 0 时,就开启限制。 | -1 |
| 连接器的读并发和写并发,只适用于离线任务。 | 无默认值,系统根据数据量大小自动推算并发数。 |
job.writer.case_insensitive | 大小写不敏感。 | true |
job.writer.pre_sql_list | 写入数据源前置处理 SQL List ,格式是 json 数组,如: | 空 |
参数名称 | 参数说明 | 默认值 |
---|---|---|
job.common.global_parallelism_num | 连接器的全局并发数,只适用于流任务。
| MQ Partition 个数 / 4 |
job.common.checkpoint_interval | 每次 Checkpoint 时间间隔,单位为毫秒,默认 900000 毫秒(15分钟)进行一次 Checkpoint,只适用于流任务。 | 900000 |
job.common.checkpoint_timeout | Checkpoint 超时时间,单位为毫秒,只适用于流任务。 | 600000 |
实时整库中离线全量同步参数:
参数名称 | 参数说明 | 默认值 |
---|---|---|
job.common.is_use_batch_mode | 是否 batch 模式:
| true |
job.reader.enable_string_compatible | PostgreSQL2Hudi 实时分库分表同步解决方案中,若源端存在当前不能识别的 postgre 数据类型时,您可根据实际业务情况添加该参数,来判断是否将其转换为 string 类型。
说明 该参数仅适用于 PostgreSQL2Hudi 实时分库分表同步解决方案。 | false |
实时整库中实时增量同步参数
参数名称 | 参数说明 | 默认值 |
---|---|---|
job.common.checkpoint_interval | 设定 Checkpoint 刷新时间。 | 900000 |
job.reader.poll_interval_ms | 设置读 binlog 的刷新时间,默认 500 毫秒。 | 500 |
job.reader.debezium |
| - |
Flink 运行参数表:
参数名称 | 参数说明 | 默认值 |
---|---|---|
taskmanager.memory.managed.size | 每个 Task Manager 的托管内存占总内存大小。 | - |
taskmanager.memory.network.fraction | 每个 Task Manager 的网络内存的占比。 | - |
说明
资源使用参数只适用于离线集成任务。
流式集成任务的资源参数,可按照上方【流式集成任务配置说明】章节中的介绍,直接在页面中选配即可。
参数名称 | 参数说明 | 默认值 |
---|---|---|
job.common.flink_tm_vcores | 每个 Task Manager 使用的 CPU 核数。
| 1.0 |
job.common.slots_per_tm | 每个 Task Manager 默认 slot 的数量。 | 2 |
job.common.flink_tm_slot_memory | 每个 Task Manager 中的各个 slot 的内存大小,单位为 MB。
| 2048 |
job.common.flink_tm_task_off_heap_memory | 每个 Task Manager 的堆外内存占总内存的比例。 | 0.125 |
job.common.flink_tm_managed_memory_ratio | 每个 Task Manager 的托管内存占总内存的比例。 | 0.2 |
job.common.flink_tm_network_max | 每个 Task Manager 的网络内存的最大值,单位为 GB。
| 2g |
job.common.flink_jm_vcores | Flink Job Manager 的 CPU 核数。
| 1.0 |
job.common.flink_jm_memory | Flink Job Manager 的总内存大小,单位为 MB。
| 4096 |
job.common.flink_jm_off_heap_memory | Flink Job Manager 的堆外内存占总内存的比例。 | 0.125 |
读取 ByteHouse_CDW 时,支持以下高级参数,您可根据实际情况进行配置:
参数名称 | 描述 | 默认值 |
---|---|---|
job.reader.split_config | 设置任务分片数量配置参数。 | 分片数是和并发数相同 |
job.reader.string_split_size | 设置分片大小。
| 1000000 |
job.reader.customized_connection_properties | 读取 ByteHouse 超时设置参数。 |
批式写入 ByteHouse_CDW/CE 时,支持以下高级参数,您可根据实际情况进行配置:
参数名称 | 描述 | 默认值 |
---|---|---|
job.writer.flush_interval | 写入 buffer 的刷新时间,默认 60000 毫秒 | 60000 |
job.writer.buffer_count | 写入 buffer 记录条数,默认 8192 | 8192 |
job.writer.query_timeout | 设定查询超时退出时间,默认 30000 毫秒 | 30000 |
job.writer.skip_delete_task | 写入 ByteHouse CDW 是通过导入任务方式,默认任务执行完,再删除导入任务,通过此参数,您可设置是否跳过删除导入任务 ; | false |
job.writer.cfs_write_batch_size |
说明 该高级参数,需配合 CFS 写入方式进行使用。 | 4096/8192 |
job.writer.cfs_vw_id | 当选择 CFS 方式写入 ByteHouse 云数仓版时,建议在自定义参数设置中添加该高级参数,指定数据导入服务所使用的计算组信息。格式如下: | 无 |
job.writer.bh_ce_partition_type | 显示的指定 ByteHouse 的分区字段为 string 类型,当分区字段使用函数时,需要使用此高级参数函数。 | string |
job.writer.loading_mode | 高级参数 job.writer.loading_mode 可设置为 FULL_REFRESH 或 INCREMENTAL,默认为 INCREMENTAL:
说明
| INCREMENTAL |
job.writer.extraProperties | 添加此参数,数据导入 ByteHouse CDW 时,支持原子性导入数据。 | 无 |
job.writer.bh_connection_properties | Map<String,String> 类型。 说明 该高级参数,需配合 JDBC 写入方式进行使用。 | 无 |
job.writer.session_properties | Map<String,String> 类型。 | 无 |
job.writer.pre_sql_list | 写入 ByteHouse CDW 数据源前置处理 SQL List ,格式是 json 数组,多条执行语句可用逗号分隔。如: 说明 该高级参数,需配合 JDBC 写入方式进行使用。 | 无 |
参数名 | 描述 | 默认值 |
---|---|---|
job.reader.case_insensitive | 读取数据时字段大小写是否需要敏感。 | true |
参数 | 描述 | 默认值 |
---|---|---|
job.common.case_insensitive | JSON 内容解析时是否对字段 Key 大小写敏感。 | true |
job.common.support_json_path | 是否支持带 | false |
job.common.json_serializer_features | DataSail 使用 fastjson 解析 JSON 内容,用户可以通过此参数设置 JSON 解析的 features,详情参考 SerializerFeature - fastjson 1.2.83 javadoc。多个 SerializerFeature 使用逗号分隔。 | 无 |
job.common.convert_error_column_as_null | 是否将类型转化失败的字段默认置为 null。 | false |
job.common.host_ips_mapping | HDFS 数据源通过连接串形式配置时,需在高级参数中配置 ip_mapping 信息,将 hdfs 集群的节点域名与 ip 进行映射,示例如下:
| 无 |
参数 | 描述 | 默认值 |
---|---|---|
job.reader.parse_partition_from_path | 在需要读取并解析数据源地址路径下的分区字段场景中,可以添加此高级参数,在手动添加分区字段映射后,便可正常读取分区字段数据。 | false |
job.reader.partition_num | 读取分区字段数据场景中,如果是读取 Json 这类没有 Schema 定义的数据格式时,需添加此高级参数,来告知当前作业设置的路径中包含多少个分区字段数。 | 无 |
参数 | 描述 | 默认值 |
---|---|---|
job.writer.rolling.max_part_size | 文件切割大小,单位字节,默认 10G。 注意 这里是指未压缩读的数据大小, 而非 HDFS 最终文件大小。 | 10737418240 |
job.writer.hdfs.replication | HDFS 副本数 | 3 |
job.writer.hdfs.compression_codec | HDFS 压缩格式,支持
| zstd |
job.writer.dump.directory_frequency | 写入 HDFS 文件夹的频率,支持以下参数:
| dump.directory_frequency.day |
参数名称 | 参数说明 | 默认值 |
---|---|---|
job.common.checkpoint_interval | 设定 Checkpoint 刷新时间,默认 15 分钟,如果实时写入 Hive 时,写入 Hive 时间依据此参数。 | 900000 |
job.common.host_ips_mapping | Hive 数据源通过连接串方式接入自建集群时,需通过该高级参数配置 host 与真实 ip 之间的映射关系,示例如下:
| 无 |
参数名称 | 参数说明 | 默认值 |
---|---|---|
job.writer.partition_strategy | 参数分区创建策略 | partition_last |
job.writer.partition.date_format | 写入动态日期分区格式,可配置为:yyyyMMdd、yyyy-MM-DD | yyyyMMdd |
job.writer.partition.hour_format | 写入动态小时分区格式,可配置为:hh、HH | HH |
job.writer.null_string_as_null | 复杂类型中的 string 类型,默认会将 null 写为空字符串。如果需要配置默认写入 null,可以将此参数配置为 true。如脏数据中存在无法转换的列时,会自动转换为 null 值。 | false |
job.writer.case_insensitive | 默认会将数据全部转换为小写。 | true |
job.writer.convert_error_column_as_null | 将脏数据中无法转换的列自动转换为 null。 | false |
job.writer.dump.directory_frequency | 写入 HDFS 文件夹的频率,支持以下参数:
| dump.directory_frequency.day |
参数名称 | 描述 | 默认值 |
---|---|---|
job.reader.connector.startup-mode | 默认消费起始位置参数指定:
| |
job.reader.metadata_columns | 读取 Kafka 元数据相关信息,多个元数据可用英文逗号隔开。配置示例如下:job.reader.metadata_columns = timestamp,offset,key,value,partition,headers |
参数名称 | 描述 | 默认值 |
---|---|---|
job.common.host_ips_mapping | Kafka 通过公网接入,kafka broker 设置为域名;需配置 ip 与域名映射,示例如下: | |
job.common.skip_dump_parse | Kafka 数据源通过公网形式接入,开启 SASL_SSL 认证时,需设置该参数为 true。 | false |
job.writer.properties | max.request.size 消息体大小; 说明 适用范围:DataSail 整库解决方案配置中,如果单个消息体比较大时,可以调整此参数。 | {"max.request.size":1048576,"buffer.memory":33554432} |
job.writer.compression_type | 消息压缩格式,支持 none、snappy、gzip、lz4 说明 DataSail 整库解决方案配置中,可指定消息压缩格式。 | snappy |
参数名 | 描述 | 默认值 |
---|---|---|
solution.reader.ddl.external_schema_mode | 由于 Mongo 灵活的 Schema 定义,且 Mongo Schema Fetcher 获取的字段是无序且不保证每次获取的结果一致,这对解决方案中的自动建表能力有一定的困难。
| MERGE |
solution.reader.ddl.external_schemas | 自定义 Mongo Schema 字段信息,示例如下:
| 无 |
参数名 | 描述 | 默认值 |
---|---|---|
reader_fetch_size | 单批次读取文档 doc 的数量。 | 100000 |
filter | 指定读取过滤条件,满足 MongoDB 语法,如读取 id = 1000 的数据,填写示例如下: | 无 |
split_mode | 分片模式支持两种:
说明 若需关闭分片,可设置并发度为 1。 | parallelism |
参数名 | 描述 | 默认值 |
---|---|---|
max_connection_per_host | 连接池最大连接数。 | 100 |
connect_timeout_ms | 连接超时时间。 | 10000 |
batch_size | 单批次写入 MongoDB 的数据量。 | 100 |
write_mode | 高级参数设置写入方式:
| 无 |
批式读支持以下高级参数,您可根据实际情况进行配置:
参数名称 | 描述 | 默认值 |
---|---|---|
job.reader.init_sql | 读取数据前执行的 SQL 语句。对于视图的查询可能需要使用 init SQL 语句初始化环境。 | 无 |
job.reader.reader_fetch_size | 每次拉取的数据条数,只在准确分片中有效。 | 10000 |
job.reader.query_timeout_seconds | Jdbc 方式读取数据,设定读取超时时间,单位秒。 | 300 |
job.reader.shard_split_mode | Jdbc 连接分片模式,支持准确分片、并发分片、不分片三种模式:
| 准确分片 |
job.reader.customized_sql | 自定义查询读取 SQL 语句。filter 过滤配置项不足以描述所筛选的条件,可通过该配置项来自定义执行较复杂的查询 SQL。 说明 配置该高级参数项后,数据同步任务仍需配置 table_name、column 、split_pk 、shard_split_mode 等必填配置项。然而,在执行同步时,系统将忽略这些配置项信息,直接使用该高级参数项中配置的内容进行数据查询和筛选。 | 无 |
批式写支持以下高级参数,您可根据实际情况进行配置:
参数名称 | 描述 | 默认值 |
---|---|---|
job.writer.is_insert_ignore | insert into 模式时,主键或者唯一键冲突时任务失败还是忽略冲突,false 为任务执行失败;true 为忽略冲突,任务正常执行。 | false |
job.writer.write_batch_interval | 一次性批量提交的数据条数,该值可以减少与 MySQL 网络的交互次数并提升整体吞吐量。如果该值设置过大可能会导致数据同步进程 OOM。 | 100 |
job.writer.write_retry_times | MySQL 写入失败时重试次数。 | 3 |
job.writer.retry_interval_seconds | 写入失败后两次重试的时间间隔,单位秒。 | write_batch_interval / 10 |
job.writer.connection_parameters | Jdbc 连接的全部参数,可在默认值后追加补充:
tcpRcvBuf=1024000 (1MB)
allowMultiQueries=true
connectTimeout=60000 | autoReconnect=true&useUnicode=true&characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull |
参数名称 | 参数说明 | 参数默认值 |
---|---|---|
job.writer.sink_flush_interval_ms | 写入 buffer 刷新时间,默认 60000 毫秒 | 60000 |
job.writer.sink_buffer_size | 写入 buffer 数据大小,默认 10485760 (10MB) | 10485760 |
job.writer.sink_buffer_count | 写入buffer 记录条数,默认 8192 | 8192 |
job.writer.stream_load_properties | 数据写入方式参数指定。
| 整行更新 |
job.writer.request_read_timeouts | 写入等待获取结果时间,默认 60000 毫秒 | 60000 |
job.writer.request_connect_timeouts | 写入连接超时时间,默认 60000 毫秒 | 60000 |
job.writer.sink_enable_2PC | 写入时任务分两阶段提交,默认 false | false |
高级参数 Key | 高级参数 Value |
---|---|
job.reader.properties | 运行时动态设置 Paimon 表的属性(参见 https://paimon.apache.org/docs/0.8/maintenance/configurations/),格式为 Map<String, String> 类型的 JSON 字符串 |
job.reader.limit | 限制要读取的行数,通常用于数据抽样或测试 |
高级参数 Key | 高级参数 Value |
---|---|
job.writer.properties | 运行时动态设置 Paimon 表的属性(参见 https://paimon.apache.org/docs/0.8/maintenance/configurations/),格式为 Map<String, String> 类型的 JSON 字符串 |
job.writer.overwrite_partition | 覆盖写特定的分区,当且仅当“写入模式”为“覆盖”时生效,格式为 Map<String, String> 类型的 JSON 字符串 |
job.common.checkpoint_interval | 流作业 Flink 快照生成周期(单位为毫秒),默认为 300s(5 分钟)。 |
在解决方案的刷新目标表映射界面,我们可以填入一些高级参数来控制建表行为,详见 实时整库同步。
高级参数 Key | 高级参数 Value |
---|---|
solution.writer.common.ddl.buckets_num | 目标表的分桶(bucket)数。如果不设置,默认为 -1(动态 bucket)。 |
solution.writer.paimon.ddl.bucket_keys | 目标表的分桶键(bucket key)。如果有多个字段,可用半角逗号分隔(例如 id,name)。 |
solution.writer.paimon.ddl.options | 建表时的各类可选参数(参见 https://paimon.apache.org/docs/0.8/maintenance/configurations/),格式为 Map<String, String> 类型的 JSON 字符串 |
参数名称 | 描述 | 默认值 |
---|---|---|
job.reader.init_sql | 读取数据前执行的 SQL 语句。对于视图的查询可能需要使用 init SQL 语句初始化环境 | 无 |
job.reader.reader_fetch_size | 每次拉取的数据条数,只在准确分片中有效。 | 10000 |
job.reader.shard_split_mode | 分片模式,支持准确分片、并发分片、不分片三种模式:
| 准确分片 |
job.reader.customized_sql | 自定义查询读取 SQL 语句。filter 过滤配置项不足以描述所筛选的条件,可通过该配置项来自定义执行较复杂的查询 SQL。 说明 配置该高级参数项后,数据同步任务仍需配置 table_name、column 、split_pk 、shard_split_mode 等必填配置项。然而,在执行同步时,系统将忽略这些配置项信息,直接使用该高级参数项中配置的内容进行数据查询和筛选。 | 无 |
参数名 | 描述 | 默认值 |
---|---|---|
job.writer.is_insert_ignore | insert into 模式时,主键或者唯一键冲突时任务失败还是忽略冲突 | false |
job.writer.write_batch_interval | 一次性批量提交的数据条数,该值可以减少与 PostgreSQL 网络的交互次数并提升整体吞吐量。如果该值设置过大可能会导致数据同步进程 OOM。 | 100 |
job.writer.write_retry_times | PostgreSQL 写入失败时重试次数。 | 3 |
job.writer.retry_interval_seconds | 写入失败后两次重试的时间间隔,单位秒 | write_batch_interval / 10 |
参数名称 | 描述 | 默认值 |
---|---|---|
job.reader.key_pattens | 读取 Redis keys 的匹配策略,默认为*,即读取所有的 key。
| * |
job.reader.db_index | Redis 逻辑库索引号,默认为 0。如果您的数据位于 Redis 的其他 DB,比如在 DB 6 中,则填写6即可。 | 0 |
job.reader.reader_parallelism_num | 读取 Redis 分片数,Redis 服务为单线程模型,推荐设置为 1,默认为 1。 | 1 |
job.reader.client_timeout_ms | 创建 Redis 连接的超时时间,单位为毫秒(ms)。 | 60000 |
job.reader.max_attempt_count | 执行单次 Redis Command 失败的最大重试次数 | 3 |
参数名称 | 描述 | 默认值 |
---|---|---|
job.writer.write_batch_interval | 一次性批量提交的数据条数,该值可以减少与 Redis 网络的交互次数并提升整体吞吐量。但如果该值设置过大可能会导致数据同步进程 OOM。 | 50 |
job.writer.database | 指定写入 Redis 中的 Database 信息,默认为0。 | 0 |