You need to enable JavaScript to run this app.
导航
EMR Serveless Spark 与 Lance 集成
最近更新时间:2025.07.10 15:48:57首次发布时间:2025.07.10 15:48:57
我的收藏
有用
有用
无用
无用

当前 Lance 已实现对 Spark 和 Ray 两大计算引擎的支持。其中,Ray 计算能力主要通过 pylance 模块进行功能封装与逻辑实现。为充分展示计算引擎与数据仓库的协同能力,以及 SQL 语言在复杂业务逻辑实现中的优势,本文将重点围绕 Spark 计算引擎,详细阐述其与 Hive 数仓表的集成流程及 SQL 应用场景。

前提条件

开通 EMR Serveless 队列

确认用户已开通 EMR Serveless 队列权限。
Image

开通元数据权限

确认用户已开通相应 Catalog 的 操作权限,详细操作请参考:权限管理

使用限制

  1. 单个 SQL 语句或者 Spark 任务访问的 lance 表,目前必须要处于同一个 las catalog 下。
  2. Lance Catalog 中的表可与 Hive Catalog 中的 Hive 表执行 join 操作。但需注意的是,当前该功能暂不支持跨多个 Lance Catalog 进行关联操作。
  3. 单个任务需要访问的Lance表必须且只能存放在一个固定的TOS桶中。
  4. 当前仅支持使用PySpark和自定义作业访问。
  5. 下面文档里面的镜像仅支持北京地区,其它地区请联系火山引擎技术同学提供自定义镜像。
  6. 目前Lance表的创建需通过LAS Catalog页面操作,暂不支持直接使用Serverless Spark进行建表。此外,若通过SparkSQL在Las Catalog中进行建表操作,将无法同步表的format(格式)及schema(模式)信息。
  7. 暂不支持对 Spark 计算组的访问操作,其中通过 Kyubbi 组件的访问方式亦在不支持范围内。
  8. 暂不支持 alter table add/drop column。

SparkSQL 操作指南

说明

在 Serverless Spark 中 ,使用时可以通过lance.test_catalog.db_name.mytable这样的四段式来直接引用。参数如下:

  • test_catalog:Lance 表存储 Catalog 名称。
  • db_name:Lance 表存储 DataBase 名称。
  • tbl_name:Lance 表存储 Table 名称。

EMR Serverless 控制台 > 队列 >作业提交 > SparkSQL 编辑器中,提交如下作业:

--{}部分需要用户根据实际情况填写。
set spark.sql.datasource.verifyWrittenFiles.enabled=false;

set spark.sql.catalog.lance=com.lancedb.lance.spark.LanceCatalog;
set spark.sql.catalog.lance.type=hive;
set spark.hadoop.hive.metastore.catalog.default={catalog_name};

set spark.sql.catalog.lance.access_key_id={your ak};
set spark.sql.catalog.lance.secret_access_key={your sk};
set spark.sql.catalog.lance.aws_region=cn-beijing;
set spark.sql.catalog.lance.aws_endpoint=https://{bucket}.tos-s3-cn-beijing.ivolces.com;
set spark.sql.catalog.lance.virtual_hosted_style_request=true;

set spark.sql.catalog.lance.with_row_id=true;

set spark.executor.extraJavaOptions=-Dio.netty.maxDirectMemory=6G;
set spark.sql.storeAssignmentPolicy=ANSI;
                                                  
select * from 
lance.lance_catalog.db_name.sample_table a 
join hive.gch.hive_join b 
on a.id = b.id

查询结果:

PySpark 操作指南

编写 PySpark 作业

EMR Serverless 控制台,您可以使用 SQL 命令的方式来提交 PySpark 任务。提交前,您需要把相关的文件上传至TOS。下面是一个 Python 代码 Demo(lance_join.py ):

from pyspark.sql import SparkSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()

spark.sql("insert into lance.test_catalog.db_name.tbl_name select 2,'new data1'").show()
spark.sql("insert into lance.test_catalog.db_name.tbl_name select 8,'new data2'").show()
spark.stop()

说明

当需要访问普通 Hive 表且该表与当前操作环境不在同一 Catalog 下时,需通过hive.hive_db.hive_tbl三段式命名规则定位目标表。

  • hive:代表数据源类型为 Hive;
  • hive_db:指定 Hive 数据库名称;
  • hive_tbl:指向具体的表名。
select * from lance.twtest_lance.twtest_lance_db.sample_table a join hive.twtest.hive_table202504161 b on a.id = b.id

上传文件到 TOS 桶

提交作业前需将 Python 文件上传至 TOS 桶(TOS 桶须与 Serverless Spark 计算资源创建于同一地域)。

示例中的路径为:tos://hzw/ray/lance_join.py 。

控制台提交 PySpark 作业

EMR Serverless 支持通过 Set 参数的方式,提交一个 PySpark 作业,极简 Demo 如下:

  1. 进入提交作业页面。

Image

  1. 提交作业。
# {}部分需要用户根据实际情况填写。
set tqs.query.engine.type = sparkjar;
set spark.jar.resource ={tos地址};
set serverless.spark.access.key={your ak};
set serverless.spark.secret.key={your sk};
                                         
set spark.sql.catalog.lance=com.lancedb.lance.spark.LanceCatalog;
set spark.sql.catalog.lance.type=hive;
set spark.hadoop.hive.metastore.catalog.default={catalog_name};

set spark.sql.catalog.lance.access_key_id={your ak};
set spark.sql.catalog.lance.secret_access_key={your sk};
set spark.sql.catalog.lance.aws_region=cn-beijing;
set spark.sql.catalog.lance.aws_endpoint=https://{bucket}.tos-s3-cn-{region}.ivolces.com;
set spark.sql.catalog.lance.virtual_hosted_style_request=true;

set spark.executor.extraJavaOptions=-Dio.netty.maxDirectMemory=6G;
set spark.sql.storeAssignmentPolicy=ANSI;

其中变量参数的含义如下:

参数名

参数值

含义

serverless.spark.access.key

账户的 AccessKey

访问 TOS 所用的 AccessKey

serverless.spark.secret.key

账户的 SecretKey

访问 TOS 所用的 SecretKey

spark.jar.resource

tos://emr-autotest-shanghai/ziwen/neo_spark_poc_new.py

您编写的 PySpark 代码文件,所在的 TOS 路径

spark.sql.catalog.lance.access_key_id

账户访问 TOS 的 AccessKey

spark.sql.catalog.lance.secret_access_key

账户访问TOS 的 SecretKey

spark.sql.catalog.lance.metastore.catalog.default

要访问的 Catalog 名称

Catalog 中对应默认的 Catalog 的名字

bucket_name

上传 Python 文件的 TOS 桶名称

spark.sql.catalog.lance.aws_endpoint

TOS 的域名

运行中/完成后,点击查询日志可打开 Spark WebUI。
Python 代码中的输出(print/ df.show)会展示在 Driver 的 stdout 中。

其它地区配置模板

如果您的配置在华东地区,可以使用下面的配置模板,把 cn-beijing 替换为 cn-shanghai 即可。例如 SQL 模板更改如下。

--{}部分需要用户根据实际情况填写。
set spark.sql.datasource.verifyWrittenFiles.enabled=false;
set spark.sql.catalog.lance=com.lancedb.lance.spark.LanceCatalog;
set spark.sql.catalog.lance.type=hive;
set spark.hadoop.hive.metastore.catalog.default=lance_catalog;

set spark.sql.catalog.lance.access_key_id={your ak};
set spark.sql.catalog.lance.secret_access_key={your ak};
set spark.sql.catalog.lance.aws_region={region};
set spark.sql.catalog.lance.aws_endpoint={bucket}.tos-s3-cn-{region}.ivolces.com;;
set spark.sql.catalog.lance.virtual_hosted_style_request=true;
set spark.sql.caseSensitive=true;

set spark.sql.threePartIdentifier.catalogService.enabled=false;
set spark.sql.storeAssignmentPolicy=ANSI;

select * from lance.lance_catalog.db_name.tbl_name;