目前在创建火山引擎 E-MapReduce(EMR)集群,并且安装Hudi服务后,EMR已经默认将Hudi相关依赖集成到Flink、Spark、Hive、Trino、Presto开源组件中。计算任务读写Hudi时,不需要额外再引入相关的Hudi依赖。不同的EMR版本使用了不同的Hudi版本,具体信息如下:
Hudi 版本 | EMR 版本 |
---|---|
Hudi 0.10.0 | EMR 1.3.1 |
Hudi 0.11.1 | EMR 3.0.1 ~ EMR 3.1.1 |
Hudi 0.12.2 | EMR 3.2.1 ~ EMR 3.8.1 |
Hudi 0.14.1 | EMR 3.9.1+ |
接下来将为您介绍 Hudi 的安装,并通过 Spark SQL 快速上手 Hudi 表和通过 Flink SQL 创建 Catalog/Table 相关内容。
Hudi 在创建 Hadoop 集群过程中作为可选组件安装,集群创建完成后确保 Hudi 组件可见并且状态是正常的。详见创建集群。
如果在集群初始化时没有安装,也可以通过添加 Hudi 组件在已有集群上添加 Hudi。详见添加服务。
EMR SparkSQL 完全兼容开源 SparkSQL 语法,以下对基本的 Hudi 表操作做一个说明,其他详细指南可以参考 Hudi高阶使用文档。
要快速上手 Hudi,可以启动一个 SparkSQL 的本地 session 快速读取 Hudi 表,降低使用门槛。
启动方式
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark versions: 3.2+
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
# For Spark versions: 3.0 - 3.1
# Spark 3.1
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
# For Spark versions: 3.2+
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
# For Spark versions: 3.0 - 3.1
spark-sql \
-conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
create table hudi_cow_nonpcf_tbl (
uuid int,
name string,
price double
) using hudi;
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;
create table hudi_mor_tbl (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
);
COW 表和 MOR 表拥有相同的 SparkSQL 语法
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
update hudi_ctas_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;
select * from hudi_ctas_cow_pt_tbl;
drop table hudi_cow_nonpcf_tbl;
Hudi 0.12 版本已经支持通过 Flink Catalog 来管理表信息,目前支持以下两种 Catalog:DFS Catalog 和 Hive Catalog
CREATE CATALOG dfs_catalog WITH (
'type'='hudi',
'catalog.path'='hdfs://emr-master-1:8020/warehouse/tablespace/managed/hive'
);
USE CATALOG dfs_catalog;
CREATE DATABASE hudi_dfs_db;
USE hudi_dfs_db;
CREATE TABLE `dfs_catalog`.`hudi_dfs_db`.`flink_hudi_mor_tbl`(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field' = 'uuid',
'precombine.field' = 'ts'
);
CREATE CATALOG hms_catalog WITH (
'type'='hudi',
'catalog.path'='hdfs://emr-master-1:8020/warehouse/tablespace/managed/hive',
'hive.conf.dir'='/etc/emr/hive/conf/',
'mode'='hms'
);
USE CATALOG hms_catalog;
CREATE DATABASE hudi_hms_db;
-- MOR 表
-- hive_sync.enabled默认等于true,在写数据成功后会额外将rt和ro同步到HMS中
CREATE TABLE `hms_catalog`.`hudi_hms_db`.`flink_hudi_mor_tbl`(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field' = 'uuid',
'precombine.field' = 'ts',
'hive_sync.enabled' = 'true'
);