为满足用户更加定制化的数据查询分析需求,LAS 提供了 Spark Jar 任务的查询方式。用户可以通过编写自己的 Spark 应用程序来进行定制化的数据分析工作,同时 LAS 会管控用户对数据集访问的权限与平台现有权限一致。
LAS Spark Jar 任务通过LASFS,使用户在使用原生的 Spark SQL API 无感知的情况下(即无需改变现有代码),完成对用户操作和访问数据集权限的控制。
在 Spark 中使用 LASFS 时,依赖社区 Spark 的 3.0.1 版本,Scope 为 provided。LASFS 的操作对用户是透明的,用户使用普通的 API 即可通过 LASFS 实现对表的读写操作。
LASFS 的相关依赖在运行时由 Spark 的镜像提供,您在开发代码时无需添加LASFS依赖。
通过 spark conf 的方式传入 LASFS 相关配置。
参数 | 是否必填 | 默认值 | 说明 |
---|---|---|---|
spark.sql.extensions | 是 | 需配置为 org.apache.spark.sql.LASExtension,若需要配置其他extesion则用逗号分隔。 | |
spark.sql.lasfs.enabled | 必填为 true | 默认为false | 是否启用 lasfs,需设置为 true 启用 lasfs |
spark.hadoop.fs.lasfs.access.key | 是 | 使用 lasfs 的 ak | |
spark.hadoop.fs.lasfs.secret.key | 是 | 使用 lasfs 的 sk | |
spark.hadoop.fs.lasfs.session.token | 否 | 以assume role方式访问lasfs时需要配置 | |
spark.hadoop.fs.lasfs.identity.id | 否 | 以assume role方式访问lasfs时需要配置 | |
spark.hadoop.fs.lasfs.identity.type | 否 | 以assume role方式访问lasfs时需要配置 | |
spark.hadoop.fs.lasfs.service.region | 是 | 需按照您所在的region配置为以下值 | |
spark.hadoop.fs.lasfs.endpoint | 是 | 需按照您所在的region配置为以下值 | |
spark.hadoop.fs.defaultFS | 是 | 固定配置 | |
spark.hadoop.hive.exec.scratchdir | 是 | 固定配置 | |
spark.hadoop.tmp.dir | 是 | 固定配置 | |
spark.sql.analysis.hoodie.relation.conversion.skip.enabled | 是 | 固定配置为false |
Spark LASFS 参数需要指定 LASFS 的 AK/SK,可通过以下方式进行获取。
进入火山引擎首页后,点击右上角 个人头像 - 密钥管理,进入密钥管理页面:
在密钥管理页可以进行查看或添加 AK/SK,填入对应的参数配置中即可。
import org.apache.spark.sql.SparkSession; public class SparkJarExmaple { public static void main(String[] args) { SparkSession spark = SparkSession.builder() .enableHiveSupport() .appName("SparkJarExmaple") .config("spark.sql.extensions", "org.apache.spark.sql.LASExtension") //固定配置 .config("spark.sql.lasfs.enabled", "true") //使用LASFS方式访问库表 .config("spark.hadoop.fs.lasfs.access.key", "xxx") .config("spark.hadoop.fs.lasfs.secret.key", "xxx==") .config("spark.hadoop.fs.lasfs.service.region", "cn-beijing") //需按照您所在的region配置 .config("spark.hadoop.fs.lasfs.endpoint", "100.96.4.84:80") //需按照您所在的region配置 .config("spark.hadoop.fs.defaultFS", "lasfs:/") //固定配置 .config("spark.hadoop.hive.exec.scratchdir", "/warehouse/tmp/hive") //固定配置 .config("spark.hadoop.tmp.dir", "/warehouse/tmp/hadoop") //固定配置 .config("spark.sql.analysis.hoodie.relation.conversion.skip.enabled", "false") //固定配置 .getOrCreate(); // db.lasfs_test schema: // id bigint, // msg string // partition cols: // date string spark.sql("insert into db.lasfs_test partition (date='1') select id, id as msg from range(10000)") spark.sql("select * from db.lasfs_test where date='1'").show() spark.stop(); } }
由于 LAS 统一使用 CU 作为计量单位,目前 Spark 资源相关参数已被禁用。如需调整内存,需要使用 las.job.driver.cu
和 las.job.executor.cu
(默认值均为 4)这两个参数来控制 core 和 memory,计算规则如下:
内部计算规则如下:
spark.driver.cores = lasJobDriverCU spark.driver.memory = lasJobDriverCU * 4g spark.executor.cores = lasJobExecutorCU spark.executor.memory = lasJobExecutorCU * 4g
Spark Jar作业此前通过Tunnel通道读写数据,在后续版本中Tunnel组件会逐步下线,已经上线的存量作业,可以暂时保持使用Tunnel组件,新增作业建议使用LASFS方式访问库表。
另外,若您需要访问Bytelake1.0内表,则必须通过Tunnel组件访问。使用Tunnel的配置示例如下:
import org.apache.spark.sql.SparkSession; public class SparkJarExmaple { public static void main(String[] args) { SparkSession spark = SparkSession.builder() .enableHiveSupport() .appName("SparkJarExmaple") .config("spark.sql.extensions", "org.apache.spark.sql.LASExtension") .config("spark.sql.tunnel.enabled", "true") .config("spark.sql.tunnel.access.key.id", "xxx") .config("spark.sql.tunnel.secret.access.key", "xxx==") .config("spark.sql.tunnel.endpoint", "las-tunnel.ivolces.com:80") .config("spark.sql.tunnel.service.region", "cn-beijing") .getOrCreate(); // db.lasfs_test schema: // id bigint, // msg string // partition cols: // date string spark.sql("insert into db.tunnel_test partition (date='1') select id, id as msg from range(10000)") spark.sql("select * from db.tunnel_test where date='1'").show() spark.stop(); } }
注意
若LASFS和Tunnel同时配置,则优先通过LASFS访问,对于Bytelake1.0内表会继续使用Tunnel。