You need to enable JavaScript to run this app.
导航
基础使用
最近更新时间:2025.04.01 20:13:42首次发布时间:2025.03.29 22:17:41
我的收藏
有用
有用
无用
无用

在创建 LAS 集群并且安装 Hudi 服务后,已经默认将 Hudi 相关依赖集成到 Flink、Spark、Presto 等开源组件中。计算任务读写 Hudi 时,不需要额外再引入相关的 Hudi 依赖。
接下来带您通过 Spark SQL 快速上手 Hudi 表和通过 Flink SQL 创建 Catalog/Table 相关内容。

通过 Spark SQL 快速上手 Hudi 表

LAS SparkSQL 完全兼容开源 SparkSQL 语法,以下对基本的 Hudi 表操作做一个说明,其他详细指南可以参考高阶使用
要快速上手 Hudi,可以启动一个 SparkSQL 的本地 session 快速读取 Hudi 表,降低使用门槛。

  • 启动方式
    • spark-sql
spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
  • kyuubi 代理
/usr/lib/emr/current/kyuubi/bin/beeline -n hive -p KtVMyIFnqavBc1HS   -u "jdbc:hive2://las-master-1:10009" 

创建 Hudi

创建非分区 COW

create table hudi_cow_nonpcf_tbl (
  uuid int,
  name string,
  price double
) using hudi;

创建分区 COW 表

create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
asselect 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;

创建 Hudi MOR 表

create table hudi_mor_tbl (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
);

插入数据

COW 表和 MOR 表拥有相同的 SparkSQL 语法

insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;

更新数据

update hudi_ctas_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;

查询表数据

select * from hudi_ctas_cow_pt_tbl;

删除 Hudi

drop table hudi_cow_nonpcf_tbl;

Hudi 已经支持通过 Flink Catalog 来管理表信息,目前支持以下两种 Catalog:DFS Catalog 和 Hive Catalog

sudo su - emr
-- 设置环境变量
source /etc/emr/flink/flink-env.sh 
/usr/lib/emr/current/flink/bin/yarn-session.sh -s 2 -jm 1024 -tm 2048 -nm flink-test -d
  • sql-client
-- 设置环境变量
source /etc/emr/flink/flink-env.sh 
/usr/lib/emr/current/flink/bin/sql-client.sh embedded -s yarn-session

Catalog

CREATE CATALOG dfs_catalog WITH (
    'type'='hudi',
    'catalog.path'='hdfs://las-master-1:8020/user/hive'
);
USE CATALOG dfs_catalog;

CREATE DATABASE hudi_dfs_db;
USE hudi_dfs_db;

CREATE TABLE `dfs_catalog`.`hudi_dfs_db`.`flink_hudi_mor_tbl`(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.recordkey.field' = 'uuid',
  'precombine.field' = 'ts'
);