本文将为您介绍数据集成任务,在不同场景下所使用到的高级参数配置及其相关说明。
具体场景使用参数说明详见高级参数附录。
离线任务运行参数、数据源相关参数和资源使用参数均可配置到下图红框处。
数据同步解决方案高级参数设置,您可在方案的资源组高级配置或映射配置中的高级配置入口进行设置。
进入数据同步解决方案界面:
在方案编辑界面中,【基本配置 > 资源组高级配置】中,添加相应的离线、实时高级参数设置。
方案编辑界面,在映射配置步骤,目标库表映射配置 > 高级配置 > 高级参数配置中,添加相关高级参数。
参数名称 | 参数说明 | 默认值 |
|---|---|---|
job.common.flink_tm_slot_memory | Flink TM 单个 slot 的内存大小,单位为 MB,默认大小为 4096。配置示例:job.common.flink_tm_slot_memory=8192。 | 4096 |
job.reader.reader_fetch_size | 每次拉取的数据条数,只在准确分片中有效。 | 10000 |
job.reader.shard_split_mode | JDBC Connector 方式连接读取的数据源(如 MySQL、PostgreSQL、SQLServer 等),支持指定分片模式为 准确分片(accurate)、并发分片(quick)模式:
| accurate |
job.reader.use_sqoop_string_split_mode true | JDBC Connector 方式连接读取的数据源(如 MySQL、PostgreSQL、SQLServer 等),指定分片模式为 quick 模式,即 job.reader.shard_split_mode=quick 时,需结合 job.reader.use_sqoop_string_split_mode=true 参数使用,该参数将指定 quick 分片算法为 sqoop 算法。 | false |
studio.restart.attempts=N参数,即可实现任务失败自动重试,其中“N”为自动重试次数,您可按需进行设置。通过设置 Flink 运行参数,来实现任务失败自动重试的能力。
在数据同步解决方案任务编辑界面, 基本配置 > 资源组高级配置 > Flink 运行参数中,设置 studio.restart.attempts=N参数,即可实现实时任务失败自动重试,其中“N”为自动重试次数,您可按需进行设置。
参数配置方式详见解决方案参数配置说明-入口一。
解决方案高级参数示例如下:
参数 | 说明 | 默认值 |
|---|---|---|
solution.table_schema | 当源端/目标端是SQLServer、PostgreSQL、GaussDB、KingBase这类“库-Schema-表”三层结构的数据源时,若想要指定目标表的Schema信息时,您需在数据同步解决方案任务编辑界面,目标库表映射配置 > 高级配置 > 高级参数配置添加 | 无 |
job.reader.database_include_list | 当源端为 Oracle 数据源时,您可通过在解决方案任务编辑界面,目标库表映射配置 > 高级配置 > 高级参数配置中添加高级参数: | 无 |
job.writer.index.type | Hudi 数据库表中,若之前使用非 | bucket index |
参数配置方式详见解决方案参数配置说明-入口二。
job.writer.write_mode : copy集成任务场景中若需要使用 Kerberos 认证方式写入 HBase 时,可在高级参数中添加以下参数:job.writer.platform_kerberos_enable=true
该参数开启后,任务将使用 Kerberos 认证方式进行 HBase 写入,同时在 HBase 数据源配置时也需添加相应的数据源高级参数,详见HBase。
在通道任务里,若源端为 MySQL、PostgreSQL、SQL Server 或者 Oracle,并且源端数据表中的字段类型是 Varchar,当 Varchar 字段的精度发生改变时,写入 Hive 目标端的 Varchar 字段数据可能会出现被截断的场景,在此场景下,添加以下高级参数后:
当以下指定范围内的数据库及字段类型发生长度加长时,且目标端的 Hive 表使用 varchar 字段类型,可自动为 Hive 表的 varchar 类型的长度自动增加:
说明
--添加以下高级参数: job.writer.table_operations={"createTable":"IGNORE","addTableColumn":"EXECUTE","renameTable":"IGNORE","truncateTable":"IGNORE","updateTableColumn":"EXECUTE","renameTableColumn":"IGNORE","dropTable":"IGNORE","dropTableColumn":"IGNORE"} job.common.interceptor_name=batch_catalog_v2
job.writer.key_fields:
离线集成任务写入 Kafka 时,支持通过该高级参数自定义指定写入 Kafka 数据记录的关键字段。可根据这些关键字段进行正确的分区、索引或其他数据相关的处理,例如,在一个订单系统中,以订单 ID 作为 Kafka message key,那属于同一个订单的所有消息都会被发送到同一个分区,方便后续对订单相关数据进行集中处理和查询。job.writer.key_fields:id,name,age 。
job.writer.write_map_null_value:
当离线集成的目标端是Kafka时,支持指定对源端空值(null)的处理方式。 支持输入true、false。
参数值 | 作用 |
|---|---|
false(默认值) | 源端的值为null时,Kafka中的那一条的消息中的JSON,其中的key不存在(即空数据的key不写入) |
true | 源端的值为null时,Kafka中的JSON中的key存在,value为null |
下面以MySQL -> Kafka为例:
-- MySQL的表结构 CREATE TABLE `demo` ( `customer_id` int(11), `name` varchar(50), `address` varchar(100), `date_of_birth` date, `create_time` timestamp, `modify_time` timestamp, `age` int(11), `salary` decimal(30,5), PRIMARY KEY (`customer_id`) );
MySQL中的数据
参数效果
参数值 | 作用 |
|---|---|
false(默认值) | |
true |
在普通通道任务里,若源端为 MySQL、PostgreSQL、SQL Server 或者 Oracle,以下两个场景中,可添加以下高级参数来实现:
说明
--添加以下高级参数: job.writer.table_operations={"createTable":"IGNORE","addTableColumn":"EXECUTE","renameTable":"IGNORE","truncateTable":"IGNORE","updateTableColumn":"EXECUTE","renameTableColumn":"IGNORE","dropTable":"IGNORE","dropTableColumn":"IGNORE"} job.common.interceptor_name=batch_catalog_v2