在创建 LAS 集群并且安装 Hudi 服务后,已经默认将 Hudi 相关依赖集成到 Flink、Spark、Presto 等开源组件中。计算任务读写 Hudi 时,不需要额外再引入相关的 Hudi 依赖。
接下来带您通过 Spark SQL 快速上手 Hudi 表和通过 Flink SQL 创建 Catalog/Table 相关内容。
LAS SparkSQL 完全兼容开源 SparkSQL 语法,以下对基本的 Hudi 表操作做一个说明,其他详细指南可以参考高阶使用。
要快速上手 Hudi,可以启动一个 SparkSQL 的本地 session 快速读取 Hudi 表,降低使用门槛。
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
/usr/lib/emr/current/kyuubi/bin/beeline -n hive -p KtVMyIFnqavBc1HS -u "jdbc:hive2://las-master-1:10009"
create table hudi_cow_nonpcf_tbl ( uuid int, name string, price double ) using hudi;
create table hudi_ctas_cow_pt_tbl using hudi tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts') partitioned by (dt) asselect 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;
create table hudi_mor_tbl ( id int, name string, price double, ts bigint ) using hudi tblproperties ( type = 'mor', primaryKey = 'id', preCombineField = 'ts' );
COW 表和 MOR 表拥有相同的 SparkSQL 语法
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20; insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
update hudi_ctas_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;
select * from hudi_ctas_cow_pt_tbl;
drop table hudi_cow_nonpcf_tbl;
Hudi 已经支持通过 Flink Catalog 来管理表信息,目前支持以下两种 Catalog:DFS Catalog 和 Hive Catalog
sudo su - emr -- 设置环境变量 source /etc/emr/flink/flink-env.sh /usr/lib/emr/current/flink/bin/yarn-session.sh -s 2 -jm 1024 -tm 2048 -nm flink-test -d
-- 设置环境变量 source /etc/emr/flink/flink-env.sh /usr/lib/emr/current/flink/bin/sql-client.sh embedded -s yarn-session
CREATE CATALOG dfs_catalog WITH ( 'type'='hudi', 'catalog.path'='hdfs://las-master-1:8020/user/hive' ); USE CATALOG dfs_catalog; CREATE DATABASE hudi_dfs_db; USE hudi_dfs_db; CREATE TABLE `dfs_catalog`.`hudi_dfs_db`.`flink_hudi_mor_tbl`( uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED, name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'precombine.field' = 'ts' );