You need to enable JavaScript to run this app.
文档中心
E-MapReduce

E-MapReduce

复制全文
高阶使用
Spark 对湖仓表使用说明
复制全文
Spark 对湖仓表使用说明

EMR SparkSQL 完全兼容开源 SparkSQL 语法,以下对基本的库表操作做一个说明,其他详细指南可以参考开源 SparkSQL 语法说明

使用说明

非Spark官网支持的SQL语法,需要额外通过设置下述参数来支持(例如Iceberg的procedures)。

set emr.serverless.spark.custom.parse.enabled = true;

基础操作

库操作

-- 创建数据库
create database db_demo;

-- 创建数据库,指定自定义TOS桶路径进行存储-- 注意要确保该TOS桶存在,并且当前用户有该桶路径的读写权限
create database db_demo location 'tos://您的tos bucket name/warehouse/';

--查看数据库信息
desc database db_demo;

-- 删除数据库
drop database db_demo;

表操作

use db_demo;
-- 创建表
create table tb_demo(id int, name string);

-- 创建表指定指定自定义TOS桶路径进行存储, 创建的表是外表
create table tb_demo(id int, name string) LOCATION 'tos://您的tos bucket name/xxx/tb_demo';

-- 创建外表,效果同上
create EXTERNAL table tb_demo(id int, name string) LOCATION 'tos://您的tos bucket name/xxx/tb_demo';
DESCRIBE EXTENDED  tb_demo;

-- 描述表信息
desc table tb_demo;

-- 查询建表语句
show create table tb_demo;

-- 删除表
drop table tb_demo;

-- 插入数据
insert into tb_demo select 1,'name1';

-- 查询表数据
select * from tb_demo;

UDF 操作

-- 上传UDF, 上传到TOS对应路径-- 创建udf,默认在hive catalog中
CREATE FUNCTION <schemaName>.<functionName> AS '<funcClassName>' using jar 'tos://您的tos bucket name/您的jar包地址';

-- Spark使用UDF
select dbname.udfname('aaabbB');
select hive.dbname.udfname('aaabbB');

-- presto 使用UDF
select hive.dbname.udfname('aaabbB');

Spark 访问 hive
create database if not exists test_hive_db;
create table if not EXISTS test_hive_db.test_tb(id int, name string);

insert into table test_hive_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_hive_db.test_tb;

drop table if exists test_hive_db.test_tb;
drop database if exists test_hive_db;

Spark 访问 iceberg
-- 默认参数
set spark.sql.catalog.spark_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.spark_iceberg_catalog.type=hive;
set spark.sql.storeAssignmentPolicy=ansi;
use spark_iceberg_catalog;

create database if not exists test_iceberg_db;
create table if not EXISTS test_iceberg_db.test_tb(id int, name string) using iceberg;

insert into table test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_iceberg_db.test_tb;

drop table if exists test_iceberg_db.test_tb;
drop database if exists test_iceberg_db;

-- 访问iceberg元数据表,如history、snapshots等元数据表
set spark.sql.threePartIdentifier.catalogService.enabled=false;
select * from test_iceberg_db.test_tb.history;

Spark 访问 paimon
-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog;
set spark.sql.catalog.spark_catalog.metastore=hive;
set spark.sql.storeAssignmentPolicy=ansi;
-- 自定义warehouse路径
set spark.sql.catalog.spark_catalog.warehouse=tos://tos_bucket_name/warehouse;

create database if not exists test_paimon_db;
create table if not EXISTS test_paimon_db.test_tb(id int, name string) using paimon;

insert into table test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_paimon_db.test_tb;

drop table if exists test_paimon_db.test_tb;
drop database if exists test_paimon_db;

Spark 访问 Hudi
-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog;
set spark.serializer=org.apache.spark.serializer.KryoSerializer;

create database if not exists test_hudi_db;
create table if not EXISTS test_hudi_db.test_tb(id int, name string) using hudi;

insert into table test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_hudi_db.test_tb;

drop table if exists test_hudi_db.test_tb;
drop database if exists test_hudi_db;

Spark 访问 Deltalake
-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;

create database if not exists test_del_db;
create table if not EXISTS test_del_db.test_tb(id int, name string) using delta;

insert into table test_del_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_del_db.test_tb;

drop table if exists test_del_db.test_tb;
drop database if exists test_del_db;

访问自定义LAS Catalog数据目录

说明

提示
若需要创建LAS Catalog数据目录可以跳转至LAS控制台操作。

Spark默认访问的是LAS Catalog中Hive数据目录下的库表,若您需要访问LAS Catalog下除了Hive以外的数据目录,可以通过以下方式访问:

  1. 对于DDL,可以通过参数设置需要访问的LAS Catalog数据目录,比如

    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;  -- 指定your_las_catalog为需要访问的LAS Catalog数据目录
    
    --DDL
    create database if not exists test_hive_db;
    create table if not EXISTS test_hive_db.test_tb(id int, name string);
    

    说明

    注意:spark.hadoop.hive.metastore.catalog.default参数是对当前整个会话生效,当前会话中所有的库表都应当存在于同一个LAS Catalog数据目录下。

  2. 对于DML、DQL,除了可以设置上述参数,也可以在表名上使用三段式的命名来指定自定义LAS Catalog数据目录下的库表,比如:

    insert into table your_las_catalog.test_hive_db.test_tb values(1, 'hehe'),(2, 'haha');
    
    select * from your_las_catalog.test_hive_db.test_tb;
    

    说明

    注意
    DDL不支持使用三段式访问支持访问多个LAS Catalog数据目录。

针对每一种湖仓格式的完整示例可以参考如下。

Spark访问自定义LAS Catalog中的Hive表

  • DDL

    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    --DDL
    create database if not exists test_hive_db;
    create table if not EXISTS test_hive_db.test_tb(id int, name string);
    
  • DML&DQL:

    insert into table your_las_catalog.test_hive_db.test_tb values(1, 'hehe'),(2, 'haha');
    
    select * from your_las_catalog.test_hive_db.test_tb;
    

    会话指定 Catalog:sql中仅支持访问一个catalog

    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    --DML&DQL
    insert into table test_hive_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from test_hive_db.test_tb;
    

Spark访问自定义LAS Catalog中的Iceberg表

  • DDL

    -- 默认参数
    set spark.sql.catalog.spark_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;
    set spark.sql.catalog.spark_iceberg_catalog.type=hive;
    set spark.sql.storeAssignmentPolicy=ansi;
    use spark_iceberg_catalog;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    create database if not exists test_iceberg_db;
    create table if not EXISTS test_iceberg_db.test_tb(id int, name string) using iceberg;
    
  • DML&DQL

    -- 默认参数
    set spark.sql.catalog.spark_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;
    set spark.sql.catalog.spark_iceberg_catalog.type=hive;
    set spark.sql.storeAssignmentPolicy=ansi;
    
    -- 访问方式:自定义catalog.库.表
    use spark_iceberg_catalog;
    insert into table your_las_catalog.test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from your_las_catalog.test_iceberg_db.test_tb;
    
    --访问方式:spark_iceberg_catalog.自定义catalog.库.表
    insert into table spark_iceberg_catalog.your_las_catalog.test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from spark_iceberg_catalog.your_las_catalog.test_iceberg_db.test_tb;
    

    会话指定Catalog:SQL 中仅支持访问一个 catalog

    -- 默认参数
    set spark.sql.catalog.spark_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;
    set spark.sql.catalog.spark_iceberg_catalog.type=hive;
    set spark.sql.storeAssignmentPolicy=ansi;
    use spark_iceberg_catalog;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    --访问方式:库.表
    insert into table test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from test_iceberg_db.test_tb;
    
    -- 访问iceberg元数据表,如history、snapshots等元数据表
    set spark.sql.threePartIdentifier.catalogService.enabled=false;
    select * from test_iceberg_db.test_tb.history;
    

Spark访问自定义LAS Catalog中的Paimon表

  • DDL

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog;
    set spark.sql.catalog.spark_catalog.metastore=hive;
    set spark.sql.storeAssignmentPolicy=ansi;
    -- 自定义warehouse路径
    set spark.sql.catalog.spark_catalog.warehouse=tos://tos_bucket_name/warehouse;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    create database if not exists test_paimon_db;
    create table if not EXISTS test_paimon_db.test_tb(id int, name string) using paimon;
    
  • DML&DQL:

    说明

    三段式访问支持访问多个 catalog,不支持 DDL。

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog;
    set spark.sql.catalog.spark_catalog.metastore=hive;
    set spark.sql.storeAssignmentPolicy=ansi;
    -- 自定义warehouse路径
    set spark.sql.catalog.spark_catalog.warehouse=tos://tos_bucket_name/warehouse;
    
    -- 访问方式:自定义catalog.库.表
    insert into table your_las_catalog.test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from your_las_catalog.test_paimon_db.test_tb;
    

    会话指定 Catalog:SQL 中仅支持访问一个 catalog

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog;
    set spark.sql.catalog.spark_catalog.metastore=hive;
    set spark.sql.storeAssignmentPolicy=ansi;
    -- 自定义warehouse路径
    set spark.sql.catalog.spark_catalog.warehouse=tos://tos_bucket_name/warehouse;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    --访问方式:库.表
    insert into table test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from test_paimon_db.test_tb;
    

Spark访问自定义LAS Catalog中的Hudi表

  • DDL

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog;
    set spark.serializer=org.apache.spark.serializer.KryoSerializer;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    create database if not exists test_hudi_db;
    create table if not EXISTS test_hudi_db.test_tb(id int, name string) using hudi;
    
  • DML&DQL

    说明

    三段式访问支持访问多个 catalog,不支持 DDL。

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog;
    set spark.serializer=org.apache.spark.serializer.KryoSerializer;
    
    -- 访问方式:自定义catalog.库.表
    insert into table your_las_catalog.test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from your_las_catalog.test_hudi_db.test_tb;
    

    会话指定 catalog:SQL 中仅支持访问一个 catalog

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog;
    set spark.serializer=org.apache.spark.serializer.KryoSerializer;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    --访问方式:库.表
    insert into table test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from test_hudi_db.test_tb;
    

Spark访问自定义LAS Catalog中的DeltaLake表

  • DDL

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    create database if not exists test_del_db;
    create table if not EXISTS test_del_db.test_tb(id int, name string) using delta;
    
  • DML&DQL

    说明

    三段式访问支持访问多个 catalog,不支持 DDL。

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;
    
    -- 访问方式:自定义catalog.库.表
    insert into table your_las_catalog.test_del_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from your_las_catalog.test_del_db.test_tb;
    

    会话指定catalog:SQL 中仅支持访问一个 Catalog

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    --访问方式:库.表
    insert into table test_del_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from test_del_db.test_tb;
    

配置自定义湖格式版本

EMR Serverless Spark内置湖格式版本为:

  • Spark:3.5.1
  • Paimon:0.8.1
  • Iceberg:1.5.2
  • Hudi:0.15.0
  • Delta:3.1.0

当内置的湖格式(如 Paimon、Iceberg 等)版本无法满足特定业务需求时,EMR Serverless Spark 支持用户通过引入自定义 Jar 包的方式来升级或切换版本。
以下以升级至 Paimon 1.3 版本为例,说明操作步骤:

  1. 准备并上传 Connector 包

请从对应湖格式官网下载适配 Spark 版本的 Connector 包。

  • 示例:Paimon 1.3 下载地址参考 Paimon 官网 Quick Start
  • 操作:将下载好的 Jar 包(如 paimon-spark-3.5-1.3.1.jar)上传至您的 TOS 路径。
  1. 设置引入依赖 Jar 包

根据任务类型,在计算组或任务配置中设置相关参数以引入自定义依赖:

  • 通用计算组:任务级别设置参数 las.spark.jar.depend.jars=[{"fileName":"tos://your_tos_path/paimon-spark-3.5-1.3.1.jar"}]
  • SparkSQL计算组:在计算组配置参数spark.jars=tos://your_tos_path/paimon-spark-3.5-1.3.1.jar
  1. 排除内置湖格式包

为避免版本冲突,必须通过参数排除serverless spark内置的湖格式包:

  • 配置参数:serverless.spark.excluded.extensions=paimon
  • 参数说明:支持移除的湖格式包为paimon,iceberg,hudi,delta,多个格式逗号分隔。

说明

注意

  1. Spark 版本兼容:自定义 JAR 包必须严格与 EMR Serverless Spark 的 Spark 版本(3.5.1)兼容。使用不匹配的版本可能导致类加载异常或任务运行时报错。
  2. 湖格式版本兼容:升级湖格式主版本(如 Paimon 0.8 → 1.3)时,请仔细阅读官方 Release Notes,确认新版本是否存在不兼容的 API 变更、配置项调整或底层数据格式变化,并提前验证旧表的可读性与数据迁移方案。
最近更新时间:2026.03.31 20:34:13
这个页面对您有帮助吗?
有用
有用
无用
无用