You need to enable JavaScript to run this app.
导航
Spark Jar 作业开发
最近更新时间:2024.06.11 14:52:51首次发布时间:2021.12.10 19:32:08

1 概述

为满足用户更加定制化的数据查询分析需求,LAS 提供了 Spark Jar 任务的查询方式。用户可以通过编写自己的 Spark 应用程序来进行定制化的数据分析工作,同时 LAS 会管控用户对数据集访问的权限与平台现有权限一致。

2 Spark LASFS 使用方式

LAS Spark Jar 任务通过LASFS,使用户在使用原生的 Spark SQL API 无感知的情况下(即无需改变现有代码),完成对用户操作和访问数据集权限的控制。

2.1 依赖说明

在 Spark 中使用 LASFS 时,依赖社区 Spark 的 3.0.1 版本,Scope 为 provided。LASFS 的操作对用户是透明的,用户使用普通的 API 即可通过 LASFS 实现对表的读写操作。
LASFS 的相关依赖在运行时由 Spark 的镜像提供,您在开发代码时无需添加LASFS依赖。

2.2 参数说明

通过 spark conf 的方式传入 LASFS 相关配置。

参数

是否必填

默认值

说明

spark.sql.extensions

需配置为 org.apache.spark.sql.LASExtension,若需要配置其他extesion则用逗号分隔。

spark.sql.lasfs.enabled

必填为 true

默认为false

是否启用 lasfs,需设置为 true 启用 lasfs

spark.hadoop.fs.lasfs.access.key

使用 lasfs 的 ak

spark.hadoop.fs.lasfs.secret.key

使用 lasfs 的 sk

spark.hadoop.fs.lasfs.session.token

以assume role方式访问lasfs时需要配置

spark.hadoop.fs.lasfs.identity.id

以assume role方式访问lasfs时需要配置

spark.hadoop.fs.lasfs.identity.type

以assume role方式访问lasfs时需要配置

spark.hadoop.fs.lasfs.service.region

需按照您所在的region配置为以下值
华北region: cn-beijing
华东region: cn-shanghai
华南region: cn-grougzhou

spark.hadoop.fs.lasfs.endpoint

需按照您所在的region配置为以下值
华北region: 100.96.4.84:80
华东region: 100.96.4.33:80
华南region: 100.96.4.35:80

spark.hadoop.fs.defaultFS

固定配置
lasfs:/

spark.hadoop.hive.exec.scratchdir

固定配置
/warehouse/tmp/hive

spark.hadoop.tmp.dir

固定配置
/warehouse/tmp/hadoop

spark.sql.analysis.hoodie.relation.conversion.skip.enabled

固定配置为false

2.3 AK/SK 获取

Spark LASFS 参数需要指定 LASFS 的 AK/SK,可通过以下方式进行获取。
进入火山引擎首页后,点击右上角 个人头像 - 密钥管理,进入密钥管理页面:
图片
在密钥管理页可以进行查看或添加 AK/SK,填入对应的参数配置中即可。
图片

import org.apache.spark.sql.SparkSession;

public class SparkJarExmaple {
  public static void main(String[] args) {
    SparkSession spark = SparkSession.builder()
    .enableHiveSupport()
    .appName("SparkJarExmaple")
    .config("spark.sql.extensions", "org.apache.spark.sql.LASExtension") //固定配置
    .config("spark.sql.lasfs.enabled", "true") //使用LASFS方式访问库表
    .config("spark.hadoop.fs.lasfs.access.key", "xxx")
    .config("spark.hadoop.fs.lasfs.secret.key", "xxx==")
    .config("spark.hadoop.fs.lasfs.service.region", "cn-beijing") //需按照您所在的region配置
    .config("spark.hadoop.fs.lasfs.endpoint", "100.96.4.84:80") //需按照您所在的region配置
    .config("spark.hadoop.fs.defaultFS", "lasfs:/") //固定配置
    .config("spark.hadoop.hive.exec.scratchdir", "/warehouse/tmp/hive") //固定配置
    .config("spark.hadoop.tmp.dir", "/warehouse/tmp/hadoop") //固定配置
    .config("spark.sql.analysis.hoodie.relation.conversion.skip.enabled", "false") //固定配置
    .getOrCreate();
    // db.lasfs_test schema:
    // id bigint,
    // msg string
    // partition cols:
    // date string
    spark.sql("insert into db.lasfs_test partition (date='1') select id, id as msg from range(10000)")
    spark.sql("select * from db.lasfs_test where date='1'").show()
    spark.stop();
  }
}

3 参数配置

由于 LAS 统一使用 CU 作为计量单位,目前 Spark 资源相关参数已被禁用。如需调整内存,需要使用 las.job.driver.culas.job.executor.cu(默认值均为 4)这两个参数来控制 core 和 memory,计算规则如下:
内部计算规则如下:

spark.driver.cores = lasJobDriverCU
spark.driver.memory = lasJobDriverCU * 4g
spark.executor.cores = lasJobExecutorCU
spark.executor.memory = lasJobExecutorCU * 4g

4 关于Tunnel通道的说明

Spark Jar作业此前通过Tunnel通道读写数据,在后续版本中Tunnel组件会逐步下线,已经上线的存量作业,可以暂时保持使用Tunnel组件,新增作业建议使用LASFS方式访问库表。
另外,若您需要访问Bytelake1.0内表,则必须通过Tunnel组件访问。使用Tunnel的配置示例如下:

import org.apache.spark.sql.SparkSession;

public class SparkJarExmaple {
  public static void main(String[] args) {
    SparkSession spark = SparkSession.builder()
    .enableHiveSupport()
    .appName("SparkJarExmaple")
    .config("spark.sql.extensions", "org.apache.spark.sql.LASExtension")
    .config("spark.sql.tunnel.enabled", "true")
    .config("spark.sql.tunnel.access.key.id", "xxx")
    .config("spark.sql.tunnel.secret.access.key", "xxx==")
    .config("spark.sql.tunnel.endpoint", "las-tunnel.ivolces.com:80")
    .config("spark.sql.tunnel.service.region", "cn-beijing")
    .getOrCreate();
    // db.lasfs_test schema:
    // id bigint,
    // msg string
    // partition cols:
    // date string
    spark.sql("insert into db.tunnel_test partition (date='1') select id, id as msg from range(10000)")
    spark.sql("select * from db.tunnel_test where date='1'").show()
    spark.stop();
  }
}

注意

若LASFS和Tunnel同时配置,则优先通过LASFS访问,对于Bytelake1.0内表会继续使用Tunnel。