当前 Lance 已实现对 Spark 和 Ray 两大计算引擎的支持。其中,Ray 计算能力主要通过 pylance 模块进行功能封装与逻辑实现。为充分展示计算引擎与数据仓库的协同能力,以及 SQL 语言在复杂业务逻辑实现中的优势,本文将重点围绕 Spark 计算引擎,详细阐述其与 Hive 数仓表的集成流程及 SQL 应用场景。
确认用户已开通 EMR Serveless 队列权限。
确认用户已开通相应 Catalog 的 操作权限,详细操作请参考:权限管理。
说明
在 Serverless Spark 中 ,使用时可以通过lance.test_catalog.db_name.mytable
这样的四段式来直接引用。参数如下:
在 EMR Serverless 控制台 > 队列 >作业提交 > SparkSQL 编辑器中,提交如下作业:
--{}部分需要用户根据实际情况填写。 set spark.sql.datasource.verifyWrittenFiles.enabled=false; set spark.sql.catalog.lance=com.lancedb.lance.spark.LanceCatalog; set spark.sql.catalog.lance.type=hive; set spark.hadoop.hive.metastore.catalog.default={catalog_name}; set spark.sql.catalog.lance.access_key_id={your ak}; set spark.sql.catalog.lance.secret_access_key={your sk}; set spark.sql.catalog.lance.aws_region=cn-beijing; set spark.sql.catalog.lance.aws_endpoint=https://{bucket}.tos-s3-cn-beijing.ivolces.com; set spark.sql.catalog.lance.virtual_hosted_style_request=true; set spark.sql.catalog.lance.with_row_id=true; set spark.executor.extraJavaOptions=-Dio.netty.maxDirectMemory=6G; set spark.sql.storeAssignmentPolicy=ANSI; select * from lance.lance_catalog.db_name.sample_table a join hive.gch.hive_join b on a.id = b.id
查询结果:
在 EMR Serverless 控制台,您可以使用 SQL 命令的方式来提交 PySpark 任务。提交前,您需要把相关的文件上传至TOS。下面是一个 Python 代码 Demo(lance_join.py ):
from pyspark.sql import SparkSession spark = SparkSession.builder.enableHiveSupport().getOrCreate() spark.sql("insert into lance.test_catalog.db_name.tbl_name select 2,'new data1'").show() spark.sql("insert into lance.test_catalog.db_name.tbl_name select 8,'new data2'").show() spark.stop()
说明
当需要访问普通 Hive 表且该表与当前操作环境不在同一 Catalog 下时,需通过hive.hive_db.hive_tbl
三段式命名规则定位目标表。
select * from lance.twtest_lance.twtest_lance_db.sample_table a join hive.twtest.hive_table202504161 b on a.id = b.id
提交作业前需将 Python 文件上传至 TOS 桶(TOS 桶须与 Serverless Spark 计算资源创建于同一地域)。
示例中的路径为:tos://hzw/ray/lance_join.py 。
EMR Serverless 支持通过 Set 参数的方式,提交一个 PySpark 作业,极简 Demo 如下:
# {}部分需要用户根据实际情况填写。 set tqs.query.engine.type = sparkjar; set spark.jar.resource ={tos地址}; set serverless.spark.access.key={your ak}; set serverless.spark.secret.key={your sk}; set spark.sql.catalog.lance=com.lancedb.lance.spark.LanceCatalog; set spark.sql.catalog.lance.type=hive; set spark.hadoop.hive.metastore.catalog.default={catalog_name}; set spark.sql.catalog.lance.access_key_id={your ak}; set spark.sql.catalog.lance.secret_access_key={your sk}; set spark.sql.catalog.lance.aws_region=cn-beijing; set spark.sql.catalog.lance.aws_endpoint=https://{bucket}.tos-s3-cn-{region}.ivolces.com; set spark.sql.catalog.lance.virtual_hosted_style_request=true; set spark.executor.extraJavaOptions=-Dio.netty.maxDirectMemory=6G; set spark.sql.storeAssignmentPolicy=ANSI;
其中变量参数的含义如下:
参数名 | 参数值 | 含义 |
---|---|---|
serverless.spark.access.key | 账户的 AccessKey | 访问 TOS 所用的 AccessKey |
serverless.spark.secret.key | 账户的 SecretKey | 访问 TOS 所用的 SecretKey |
spark.jar.resource | tos://emr-autotest-shanghai/ziwen/neo_spark_poc_new.py | 您编写的 PySpark 代码文件,所在的 TOS 路径 |
spark.sql.catalog.lance.access_key_id | 账户访问 TOS 的 AccessKey | |
spark.sql.catalog.lance.secret_access_key | 账户访问TOS 的 SecretKey | |
spark.sql.catalog.lance.metastore.catalog.default | 要访问的 Catalog 名称 | Catalog 中对应默认的 Catalog 的名字 |
bucket_name | 上传 Python 文件的 TOS 桶名称 | |
spark.sql.catalog.lance.aws_endpoint | TOS 的域名 |
运行中/完成后,点击查询日志可打开 Spark WebUI。
Python 代码中的输出(print/ df.show)会展示在 Driver 的 stdout 中。
如果您的配置在华东地区,可以使用下面的配置模板,把 cn-beijing 替换为 cn-shanghai 即可。例如 SQL 模板更改如下。
--{}部分需要用户根据实际情况填写。 set spark.sql.datasource.verifyWrittenFiles.enabled=false; set spark.sql.catalog.lance=com.lancedb.lance.spark.LanceCatalog; set spark.sql.catalog.lance.type=hive; set spark.hadoop.hive.metastore.catalog.default=lance_catalog; set spark.sql.catalog.lance.access_key_id={your ak}; set spark.sql.catalog.lance.secret_access_key={your ak}; set spark.sql.catalog.lance.aws_region={region}; set spark.sql.catalog.lance.aws_endpoint={bucket}.tos-s3-cn-{region}.ivolces.com;; set spark.sql.catalog.lance.virtual_hosted_style_request=true; set spark.sql.caseSensitive=true; set spark.sql.threePartIdentifier.catalogService.enabled=false; set spark.sql.storeAssignmentPolicy=ANSI; select * from lance.lance_catalog.db_name.tbl_name;