全域数据集成 DataSail Hudi 数据源为您提供读取和写入 EMR Hudi 的双向通道能力。本文为您介绍 DataSail 的 Hudi 数据同步的能力支持情况。
支持半托管火山引擎 E-MapReduce(EMR) Hudi 0.12.x 版本。
Hudi Reader 和 Writer 会通过 HiveServer2 拿到目标 Hudi 表的元信息,从而获取到各个字段的类型,自动完成任务的 Schema 配置,在配置任务字段映射时,您只需单击自动添加即可完成 Schema 配置。
支持的字段类型如下:
类型分类 | 数据集成 column 配置类型 |
---|---|
整数类 | tinyint、smallint、int、bigint |
浮点类 | float、double、decimal |
字符串类 | string、varchar |
时间类 | date、timestamp |
布尔类 | boolean |
数组类 | array |
字典类 | map |
二进制类型 | binary |
新建数据源操作详见配置数据源,以下为您介绍 EMR Hudi 数据源配置相关信息。
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
---|---|
基本配置 | |
*数据源类型 | Hudi |
*接入方式 | EMR Hudi |
*数据源名称 | 数据源的名称,可自行设置,仅支持中文,英文,数字,“_”,100个字符以内。 |
参数配置 | |
*EMR 集群 ID | |
*数据库名 | 下拉选择集群环境中,已创建的数据库名称。 |
Hudi 版本号 | 依据已选择的 EMR Hadoop 集群,自动展现集群中包含的 Hudi 版本。 |
用户名 | 有权限访问数据库的用户名信息。 |
密码 | 输入用户名对应的密码信息。 |
Access Key ID | 如果您需要访问 Hudi on TOS,建议您填写 Access Key Id 和 Secret Access Key 信息。 说明
|
Secret Access Key | 与 Access Key ID 配套使用,类似登录密码,用于签名您的访问参数,以防被篡改。 |
扩展配置 | 您可输入 HDFS 配置的可选扩展,例如 Hadoop HA 的配置信息。 |
Hudi 数据源测试连通性成功后,进入到数据开发界面,开始新建 Hudi 相关通道任务。新建任务方式详见离线数据同步。
任务创建成功后,您可根据实际场景,配置 Hudi 离线读或 Hudi 离线写等通道任务。
数据来源选择 Hudi,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
---|---|
*数据源类型 | 下拉选择 Hudi 数据源类型。 |
*数据源名称 | 已在数据源管理中注册成功的 Hudi 数据源,下拉可选。 |
*数据表 | 选择需要采集的数据表名称信息,目前单个任务只支持将单表的数据采集到一个目标表中。
|
分区设置 | 会根据所选数据库表,获取 Hudi 表中分区信息,指定读取的分区。分区内容可通过时间变量参数方式进行设置,详见平台时间变量与常量说明。 说明 读取 Hudi 表为非分区表时,不需要设置分区。 |
数据目标类型选择 Hudi,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
---|---|
*目标类型 | 下拉选择 Hudi 数据源类型。 |
*数据源名称 | 已在数据源管理中注册成功的 Hudi 数据源,下拉可选。 |
*数据表 | 选择需要写入数据的 Hudi 表名称信息,下拉可选。 说明 您需预先创建好 Hudi 表,参考下文4.3.3 创建 Hudi 表一节。 |
*写入方式 | 选择目标数据写入方式,支持以下两种写入方式:
|
*唯一键 | Hudi 数据表选择完成后,可下拉选择表中对应的唯一键字段。 |
数据目标类型选择 Hudi,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
---|---|
*目标类型 | 下拉选择 Hudi 数据源类型。 |
*数据源名称 | 已在数据源管理中注册成功的 Hudi 数据源,下拉可选。 |
*数据表 | 选择需要写入数据的 Hudi 表名称信息,下拉可选。 说明 您需预先创建好 Hudi 表,参考下文4.3.4 创建 Hudi 表一节。 |
*写入方式 | 选择目标数据写入方式,支持以下两种写入方式:
|
如果您需要写入的 Hudi 表不存在时,您需要预先进行创建,您可以通过 Hive Shell 创建。详见 Hudi 基础使用。
创建 COW 表
注意
CREATE EXTERNAL TABLE `your_cow_hudi_table`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `your_business_field_1` bigint, `your_business_field_2` bigint, ...) PARTITIONED BY ( `your_partition_field_1` bigint, `your_partition_field_2` bigint, ...) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs|tos://path/to/your/table' TBLPROPERTIES ( 'path' = 'hdfs|tos://path/to/your/table', 'hoodie.datasource.write.recordkey.field'='your_pk_1,your_pk2,...', 'connector'='hudi' );
创建 MOR 表
说明
MOR 表与 COW 相比区别在于 INPUTFORMAT 不同,其他均相同。
CREATE EXTERNAL TABLE `your_mor_hudi_table`( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `your_business_field_1` bigint, `your_business_field_2` bigint, ...) PARTITIONED BY ( `your_partition_field_1` bigint, `your_partition_field_2` bigint, ...) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs|tos://path/to/your/table' TBLPROPERTIES ( 'path' = 'hdfs|tos://path/to/your/table', 'hoodie.datasource.write.recordkey.field'='your_pk_1,your_pk_2,...', 'connector'='hudi' );
数据来源和目标端配置完成后,需要指定来源和目标端的字段映射关系,根据字段映射关系,数据集成任务将源端字段中的数据,写入到目标端对应字段中。
字段映射支持选择基础模式和转换模式配置映射:
注意
基础模式和转换模式不支持互相切换,模式切换后,将清空现有字段映射中所有配置信息,一旦切换无法撤销,需谨慎操作。
转换模式:
字段映射支持数据转换,您可根据实际业务需求进行配置,将源端采集的数据,事先通过数据转换后,以指定格式输入到目标端数据库中。
转换模式详细操作说明详见4.1 转换模式
在转换模式中,你可依次配置:来源节点、数据转换、目标节点信息:
配置节点 | 说明 |
---|---|
来源节点 | 配置数据来源 Source 节点信息:
配置完成后,单击确认按钮,完成来源节点配置。 |
数据转换 | 单击数据转换右侧添加按钮,选择 SQL 转换方式,配置转换信息和规则:
配置完成后,单击确认按钮,完成数据转换节点配置。SQL 脚本示例详见4.1.2 添加转换节点。 |
目标节点 | 配置目标节点 Sink 信息:
配置完成后,单击确认按钮,完成目标节点配置。 |
基础模式:
您可通过以下三种方式操作字段映射关系:
对于可视化通道任务,高级参数可在任务开发界面:任务运行参数 > 自定义参数设置中填写,读参数需要加上 job.reader.
,写参数需要加上job.writer.
前缀,如图所示:
参数 | 类型 | 默认值 | 含义 |
---|---|---|---|
job.reader.hoodie.datasource.query.type | String | snapshot | 可选值:
更多详见 Hudi 官网文档。 |
job.reader.read.start-commit | String | 无 | 用于增量试图查询,指定起始 commit。取值为 earliest 或者 yyyyMMddHHmmss 格式的时间。 |
job.reader.read.end-commit | String | 无 | 用于增量试图查询,指定结束 commit,取值为 yyyyMMddHHmmss 格式的时间。 |
说明
更多 Hudi 高级参数,可参考 Hudi 官网文档 Flink Options,需在对应的参数前添加 job.reader.
前缀。
参数 | 类型 | 默认值 | 含义 |
---|---|---|---|
job.writer.table.type | String | COPY_ON_WRITE | 指定 Hudi 表类型,可选值:
更多详见 Hudi 官网文档 |
job.writer.hoodie.datasource.write.partitionpath.field | String | ""(空字符串) | 分区表使用,用于指定分区字段,多个分区字段以逗号分隔,例如 |
job.writer.index.type | String | bucket | 指定索引类型。DataSail 默认使用 bucket 索引类型。 |
job.writer.precombine.field | String | ts | 预合并字段,用于在实际写入之前对具有相同键值的记录进行预合并。当两个记录具有相同的键值时,将选择预合并字段值较大的记录。取值为 hudi 表字段。特殊值 |
参数 | 类型 | 默认值 | 含义 |
---|---|---|---|
job.writer.primary_key | Array | 无 | 写入方式选择为 upsert 时,需指定主键信息,参数值格式参考如:["unique_key","app_id","event_date","hash_uid"] |
job.writer.hoodie.datasource.write.partitionpath.field | String | 无 | 写入分区键设置,格式如:app_id,event_date |
说明
更多 Hudi 高级参数参考 Hudi 官网文档 Flink Options,需在对应的参数前添加 job.writer.
前缀。
DataSail 支持 MySQL 到 Hudi 的离线整库和实时整库解决方案,并支持自动建表。
注意
目前解决方案中目标表为 Hudi 时,自动建表暂不支持 MySQL 的 bigint unsigned
、decimal
类型,如果您需要同步数据源 MySQL 表有这些字段,请参考上文4.3.3 创建 Hudi 表一节预先建表,进行规避。
MySQL 字段类型 | Hudi 字段(Hive Shell 建表) |
---|---|
Bigint unsigned | decimal(20, 0) |
decimal | decimal |
解决方案中,集成高级参数支持上文5.2 Hudi 批式写中的高级参数,对同步的所有表生效。如果您需要对某些 Hudi 表进行特殊设置,您可以通过高级参数 job.writer.table-properties
进行设置,其值是一个 json 串,示例如下:
[{"table_pattern":"db_hudi.*.user_.*","table.type":"MERGE_ON_READ","hoodie.datasource.write.partitionpath.field":"item_id"}]
示例参数说明:
[{"table_pattern": "db.table1.*",...}, {"table_pattern": "db.table2.*",...}]
以上示例配置的含义:对于匹配 "db_hudi.*.user_.*"
的 Hudi 表,指定表类型 table.type 为 MERGE_ON_READ,分区字段 hoodie.datasource.write.partitionpath.field 为 item_id
。
说明
解决方案中离线全量同步和实时增量同步的集成高级参数设置是独立的,在同一个解决方案里,一般配置相同的 Hudi 高级参数。
更多 Hudi 高级参数可参考 Hudi 官网文档 Flink Options,您只需额外在对应的参数前添加 job.writer.
前缀即可。