最近更新时间:2023.03.30 14:04:29
首次发布时间:2023.03.30 14:04:29
目前火山引擎 E-MapReduce(EMR)集群中,Hudi 已支持两个版本,EMR 3.2.1 中使用 0.12.2,EMR 其余版本中,使用 0.11.1 版本。
本文将为您介绍 Hudi 的安装,并通过 Spark SQL 快速上手 Hudi 表和通过 Flink SQL 创建 Catalog/Table 相关内容。
Hudi 在创建 Hadoop 集群过程中作为可选组件安装,集群创建完成后确保 Hudi 组件可见并且状态是正常的。详见创建集群。
如果在集群初始化时没有安装,也可以通过添加 Hudi 组件在已有集群上添加 Hudi。详见添加服务。
EMR SparkSQL 完全兼容开源 SparkSQL 语法,以下对基本的 Hudi 表操作做一个说明,其他详细指南可以参考 Hudi高阶使用文档。
要快速上手 Hudi,可以启动一个 SparkSQL 的本地 session 快速读取 Hudi 表,降低使用门槛。
对于 EMR 1.3 版本, 需要增加配置 --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' 执行命令为:
export HADOOP_USER_NAME=hive spark-sql --master local --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
create table hudi_cow_nonpcf_tbl ( uuid int, name string, price double ) using hudi;
create table hudi_ctas_cow_pt_tbl using hudi tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts') partitioned by (dt) as select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;
create table hudi_mor_tbl ( id int, name string, price double, ts bigint ) using hudi tblproperties ( type = 'mor', primaryKey = 'id', preCombineField = 'ts' );
COW 表和 MOR 表拥有相同的 SparkSQL 语法
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20; insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
update hudi_ctas_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;
select * from hudi_ctas_cow_pt_tbl;
drop table hudi_cow_nonpcf_tbl;
Hudi 0.12 版本已经支持通过 Flink Catalog 来管理表信息,目前支持以下两种 Catalog:DFS Catalog 和 Hive Catalog
CREATE CATALOG dfs_catalog WITH ( 'type'='hudi', 'catalog.path'='hdfs://emr-master-1:8020/warehouse/tablespace/managed/hive' ); USE CATALOG dfs_catalog; CREATE DATABASE hudi_dfs_db; USE hudi_dfs_db; CREATE TABLE `dfs_catalog`.`hudi_dfs_db`.`flink_hudi_mor_tbl`( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'precombine.field' = 'ts' );
CREATE CATALOG hms_catalog WITH ( 'type'='hudi', 'catalog.path'='hdfs://emr-master-1:8020/warehouse/tablespace/managed/hive', 'hive.conf.dir'='/etc/emr/hive/conf/', 'mode'='hms' ); USE CATALOG hms_catalog; CREATE DATABASE hudi_hms_db; -- MOR 表 -- hive_sync.enabled默认等于true,在写数据成功后会额外将rt和ro同步到HMS中 CREATE TABLE `hms_catalog`.`hudi_hms_db`.`flink_hudi_mor_tbl`( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'precombine.field' = 'ts', 'hive_sync.enabled' = 'true' );