You need to enable JavaScript to run this app.
E-MapReduce

E-MapReduce

复制全文
高阶使用
Spark 对湖仓表使用说明
复制全文
Spark 对湖仓表使用说明

EMR SparkSQL 完全兼容开源 SparkSQL 语法,以下对基本的库表操作做一个说明,其他详细指南可以参考开源 SparkSQL 语法说明

使用说明

非Spark官网支持的SQL语法,需要额外通过设置下述参数来支持(例如Iceberg的procedures)。

set emr.serverless.spark.custom.parse.enabled = true;

基础操作

库操作

-- 创建数据库
create database db_demo;

-- 创建数据库,指定自定义TOS桶路径进行存储-- 注意要确保该TOS桶存在,并且当前用户有该桶路径的读写权限
create database db_demo location 'tos://您的tos bucket name/warehouse/';

--查看数据库信息
desc database db_demo;

-- 删除数据库
drop database db_demo;

表操作

use db_demo;
-- 创建表
create table tb_demo(id int, name string);

-- 创建表指定指定自定义TOS桶路径进行存储, 创建的表是外表
create table tb_demo(id int, name string) LOCATION 'tos://您的tos bucket name/xxx/tb_demo';

-- 创建外表,效果同上
create EXTERNAL table tb_demo(id int, name string) LOCATION 'tos://您的tos bucket name/xxx/tb_demo';
DESCRIBE EXTENDED  tb_demo;

-- 描述表信息
desc table tb_demo;

-- 查询建表语句
show create table tb_demo;

-- 删除表
drop table tb_demo;

-- 插入数据
insert into tb_demo select 1,'name1';

-- 查询表数据
select * from tb_demo;

UDF 操作

-- 上传UDF, 上传到TOS对应路径-- 创建udf,默认在hive catalog中
CREATE FUNCTION <schemaName>.<functionName> AS '<funcClassName>' using jar 'tos://您的tos bucket name/您的jar包地址';

-- Spark使用UDF
select dbname.udfname('aaabbB');
select hive.dbname.udfname('aaabbB');

-- presto 使用UDF
select hive.dbname.udfname('aaabbB');

Spark 访问 hive
create database if not exists test_hive_db;
create table if not EXISTS test_hive_db.test_tb(id int, name string);

insert into table test_hive_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_hive_db.test_tb;

drop table if exists test_hive_db.test_tb;
drop database if exists test_hive_db;

Spark 访问 iceberg
-- 默认参数
set spark.sql.catalog.spark_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;
set spark.sql.catalog.spark_iceberg_catalog.type=hive;
set spark.sql.storeAssignmentPolicy=ansi;
use spark_iceberg_catalog;

create database if not exists test_iceberg_db;
create table if not EXISTS test_iceberg_db.test_tb(id int, name string);

insert into table test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_iceberg_db.test_tb;

drop table if exists test_iceberg_db.test_tb;
drop database if exists test_iceberg_db;

-- 访问iceberg元数据表,如history、snapshots等元数据表
set spark.sql.threePartIdentifier.catalogService.enabled=false;
select * from test_iceberg_db.test_tb.history;

Spark 访问 paimon
-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog;
set spark.sql.catalog.spark_catalog.metastore=hive;
set spark.sql.storeAssignmentPolicy=ansi;
-- 自定义warehouse路径
set spark.sql.catalog.spark_catalog.warehouse=tos://tos_bucket_name/warehouse;

create database if not exists test_paimon_db;
create table if not EXISTS test_paimon_db.test_tb(id int, name string);

insert into table test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_paimon_db.test_tb;

drop table if exists test_paimon_db.test_tb;
drop database if exists test_paimon_db;

Spark 访问 Hudi
-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog;
set spark.serializer=org.apache.spark.serializer.KryoSerializer;

create database if not exists test_hudi_db;
create table if not EXISTS test_hudi_db.test_tb(id int, name string);

insert into table test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_hudi_db.test_tb;

drop table if exists test_hudi_db.test_tb;
drop database if exists test_hudi_db;

Spark 访问 Deltalake
-- 默认参数
set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;

create database if not exists test_del_db;
create table if not EXISTS test_del_db.test_tb(id int, name string);

insert into table test_del_db.test_tb values(1, 'hehe'),(2, 'haha');
select * from test_del_db.test_tb;

drop table if exists test_del_db.test_tb;
drop database if exists test_del_db;

访问自定义LAS Catalog数据目录

说明

提示
若需要创建LAS Catalog数据目录可以跳转至LAS控制台操作。

Spark默认访问的是LAS Catalog中Hive数据目录下的库表,若您需要访问LAS Catalog下除了Hive以外的数据目录,可以通过以下方式访问:

  1. 对于DDL,可以通过参数设置需要访问的LAS Catalog数据目录,比如

    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;  -- 指定your_las_catalog为需要访问的LAS Catalog数据目录
    
    --DDL
    create database if not exists test_hive_db;
    create table if not EXISTS test_hive_db.test_tb(id int, name string);
    

    说明

    注意:spark.hadoop.hive.metastore.catalog.default参数是对当前整个会话生效,当前会话中所有的库表都应当存在于同一个LAS Catalog数据目录下。

  2. 对于DML、DQL,除了可以设置上述参数,也可以在表名上使用三段式的命名来指定自定义LAS Catalog数据目录下的库表,比如:

    insert into table your_las_catalog.test_hive_db.test_tb values(1, 'hehe'),(2, 'haha');
    
    select * from your_las_catalog.test_hive_db.test_tb;
    

    说明

    注意
    DDL不支持使用三段式访问支持访问多个LAS Catalog数据目录。

针对每一种湖仓格式的完整示例可以参考如下。

Spark访问自定义LAS Catalog中的Hive表

  • DDL

    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    --DDL
    create database if not exists test_hive_db;
    create table if not EXISTS test_hive_db.test_tb(id int, name string);
    
  • DML&DQL:

    insert into table your_las_catalog.test_hive_db.test_tb values(1, 'hehe'),(2, 'haha');
    
    select * from your_las_catalog.test_hive_db.test_tb;
    

    会话指定 Catalog:sql中仅支持访问一个catalog

    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    --DML&DQL
    insert into table test_hive_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from test_hive_db.test_tb;
    

Spark访问自定义LAS Catalog中的Iceberg表

  • DDL

    -- 默认参数
    set spark.sql.catalog.spark_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;
    set spark.sql.catalog.spark_iceberg_catalog.type=hive;
    set spark.sql.storeAssignmentPolicy=ansi;
    use spark_iceberg_catalog;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    create database if not exists test_iceberg_db;
    create table if not EXISTS test_iceberg_db.test_tb(id int, name string);
    
  • DML&DQL

    -- 默认参数
    set spark.sql.catalog.spark_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;
    set spark.sql.catalog.spark_iceberg_catalog.type=hive;
    set spark.sql.storeAssignmentPolicy=ansi;
    
    -- 访问方式:自定义catalog.库.表
    use spark_iceberg_catalog;
    insert into table your_las_catalog.test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from your_las_catalog.test_iceberg_db.test_tb;
    
    --访问方式:spark_iceberg_catalog.自定义catalog.库.表
    insert into table spark_iceberg_catalog.your_las_catalog.test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from spark_iceberg_catalog.your_las_catalog.test_iceberg_db.test_tb;
    

    会话指定Catalog:SQL 中仅支持访问一个 catalog

    -- 默认参数
    set spark.sql.catalog.spark_iceberg_catalog=org.apache.iceberg.spark.SparkCatalog;
    set spark.sql.catalog.spark_iceberg_catalog.type=hive;
    set spark.sql.storeAssignmentPolicy=ansi;
    use spark_iceberg_catalog;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    --访问方式:库.表
    insert into table test_iceberg_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from test_iceberg_db.test_tb;
    
    -- 访问iceberg元数据表,如history、snapshots等元数据表
    set spark.sql.threePartIdentifier.catalogService.enabled=false;
    select * from test_iceberg_db.test_tb.history;
    

Spark访问自定义LAS Catalog中的Paimon表

  • DDL

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog;
    set spark.sql.catalog.spark_catalog.metastore=hive;
    set spark.sql.storeAssignmentPolicy=ansi;
    -- 自定义warehouse路径
    set spark.sql.catalog.spark_catalog.warehouse=tos://tos_bucket_name/warehouse;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    create database if not exists test_paimon_db;
    create table if not EXISTS test_paimon_db.test_tb(id int, name string);
    
  • DML&DQL:

    说明

    三段式访问支持访问多个 catalog,不支持 DDL。

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog;
    set spark.sql.catalog.spark_catalog.metastore=hive;
    set spark.sql.storeAssignmentPolicy=ansi;
    -- 自定义warehouse路径
    set spark.sql.catalog.spark_catalog.warehouse=tos://tos_bucket_name/warehouse;
    
    -- 访问方式:自定义catalog.库.表
    insert into table your_las_catalog.test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from your_las_catalog.test_paimon_db.test_tb;
    

    会话指定 Catalog:SQL 中仅支持访问一个 catalog

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog;
    set spark.sql.catalog.spark_catalog.metastore=hive;
    set spark.sql.storeAssignmentPolicy=ansi;
    -- 自定义warehouse路径
    set spark.sql.catalog.spark_catalog.warehouse=tos://tos_bucket_name/warehouse;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    --访问方式:库.表
    insert into table test_paimon_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from test_paimon_db.test_tb;
    

Spark访问自定义LAS Catalog中的Hudi表

  • DDL

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog;
    set spark.serializer=org.apache.spark.serializer.KryoSerializer;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    create database if not exists test_hudi_db;
    create table if not EXISTS test_hudi_db.test_tb(id int, name string);
    
  • DML&DQL

    说明

    三段式访问支持访问多个 catalog,不支持 DDL。

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog;
    set spark.serializer=org.apache.spark.serializer.KryoSerializer;
    
    -- 访问方式:自定义catalog.库.表
    insert into table your_las_catalog.test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from your_las_catalog.test_hudi_db.test_tb;
    

    会话指定 catalog:SQL 中仅支持访问一个 catalog

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog;
    set spark.serializer=org.apache.spark.serializer.KryoSerializer;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    --访问方式:库.表
    insert into table test_hudi_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from test_hudi_db.test_tb;
    

Spark访问自定义LAS Catalog中的DeltaLake表

  • DDL

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    create database if not exists test_del_db;
    create table if not EXISTS test_del_db.test_tb(id int, name string);
    
  • DML&DQL

    说明

    三段式访问支持访问多个 catalog,不支持 DDL。

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;
    
    -- 访问方式:自定义catalog.库.表
    insert into table your_las_catalog.test_del_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from your_las_catalog.test_del_db.test_tb;
    

    会话指定catalog:SQL 中仅支持访问一个 Catalog

    -- 默认参数
    set spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;
    
    -- 指定自定义Catalog,根据实际情况调整
    set spark.hadoop.hive.metastore.catalog.default=your_las_catalog;
    
    --访问方式:库.表
    insert into table test_del_db.test_tb values(1, 'hehe'),(2, 'haha');
    select * from test_del_db.test_tb;
    
最近更新时间:2026.02.10 16:43:56
这个页面对您有帮助吗?
有用
有用
无用
无用