LAS Spark 任务适用于定时执行 Spark 离线任务的场景,支持 Jar 包资源和 Python资源引用的方式。
LAS ByteLake 表的属性不支持并发写入数据,否则会出现写入冲突情况。通过 Spark 任务在向 ByteLake 表写入数据时,您可在 DataLeap 任务调度设置里,开启最大并发控制按钮,并将其设置为 1,以此避免因实例并发冲突致使任务执行失败。此外,您还需确保在其他业务中不存在并发写入同一个 ByteLake 表的情况。
注意
新建任务完成后,在任务配置界面完成以下参数配置。
语言类型支持 Java、Python。
说明
语言类型暂不支持互相转换,切换语言类型会清空当前配置,需谨慎切换。
语言类型选择 Java 时,资源类型支持 Jar 资源包的形式,可以按以下方式选择资源:
语言类型选择 Python 时:
from pyspark import SparkFiles from pyspark.sql import SparkSession from pyspark.sql import SQLContext job_name='pyspark_test_on_las' spark = SparkSession.builder.appName(job_name).getOrCreate() spark.sql("select 1").show(10) spark.stop()
更多 LAS Spark 任务操作,详见:Spark JAR 开发。
注意
若语句中需要设置系统环境变量时,避免直接覆盖系统环境变量,请按照追加方式指定,例如PATH=$PATH:/home/lihua/apps/bin/
;
参数 | 说明 |
---|---|
Spark 参数 | |
Main Class/Py-files |
|
Conf参数 | 配置任务中需设置的一些 conf 参数,如常用的 Driver 内存分配参数
|
任务参数 | |
自定义参数 | 输入任务中需要额外传递的参数,如数据输入/输出路径参数、数据处理的业务时间参数、过滤条件参数等等。
|
任务产出数据登记,用于记录任务---数据血缘信息,并不会对代码逻辑造成影响。对于系统无法通过解析获取产出信息的任务,可手动登记其产出信息。
如果任务含有 LAS 表的写入操作,强烈建议填写。您填写的内容即为任务产出,支持填写多个。其他任务的依赖推荐会根据此处填写的 LAS 表信息进行推荐。 具体登记内容包括:
以下示例将为您演示如何通过 LAS Spark 任务中 Python 语言方式,来直接访问 LAS 表中的数据。
CREATE TABLE IF NOT EXISTS test_schema.student_demo ( id INT COMMENT 'id', name STRING COMMENT 'name', age INT COMMENT 'age' ) PARTITIONED BY (date STRING COMMENT 'date partition') stored as bytelake; INSERT INTO test_schema.student_demo PARTITION (date = '20230518') VALUES(1, 'TOM', 10);
说明
LAS ByteLake 2.0 和 Managed Hive 表类型,推荐使用以下 lasfs 示例;对于 ByteLake1.0 内表,需继续使用 Tunnel 方式。详见 Spark Jar 作业开发。
from pyspark import SparkConf from pyspark.context import SparkContext from pyspark.sql import SparkSession, SQLContext SparkContext._ensure_initialized() spark = SparkSession.builder \ .appName("pyspark_test_read_hive_x_new_1") \ .config("spark.sql.lasfs.enabled", "true") \ .config("spark.hadoop.hive.exec.scratchdir", "lasfs:/warehouse/tmp/hive") \ .config("spark.hadoop.tmp.dir", "lasfs:/warehouse/tmp/hadoop") \ .config("spark.hadoop.fs.lasfs.impl", "com.volcengine.las.fs.LasFileSystem") \ .config("spark.hadoop.fs.lasfs.endpoint", "100.96.4.84:80") \ .config("spark.hadoop.fs.lasfs.service.region", "cn-beijing") \ .config("spark.hadoop.fs.lasfs.access.key", "AKxxx") \ .config("spark.hadoop.fs.lasfs.secret.key", "SKxxx==") \ .config("spark.sql.analysis.hoodie.relation.conversion.skip.enabled", "false") \ .config("spark.sql.extensions", "org.apache.spark.sql.LASExtension") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext sql = spark.sql sqlContext = spark._wrapped sqlCtx = sqlContext df = sql("insert into test_schema.student_demo partition (date='20240619') values('1','TOM','10')") df.show() spark.stop()
替换参数说明:
参数 | 说明 |
---|---|
appName | 自定义 Spark 任务名称。 |
access.key | 可前往访问控制的密钥管理中查看 Access Key ID。相关操作说明请参见 Access Key(密钥)管理。 |
secret.key | 可前往访问控制的密钥管理中查看 Secret Access Key。相关操作说明请参见 Access Key(密钥)管理。 |
endpoint | 需按照您所在的 Region 信息进行配置,不同地域值,详见 Spark Jar 作业开发 - spark.hadoop.fs.lasfs.endpoint。 |
region | 按照您所在的 Region 信息,对应配置:
|
任务配置完成后,您可单击操作栏中的保存和调试按钮,进行任务调试。
注意
待任务执行成功后,您可进入火山引擎控制台:
调试任务成功,并查看日志校验数据情况无误后,返回数据开发界面,将任务提交发布到运维中心离线任务运维中执行。
单击上方操作栏中的保存和提交上线按钮,在提交上线对话框中,选择回溯数据、监控设置、提交设置等参数,最后单击确认按钮,完成作业提交。 提交上线说明详见:数据开发概述---离线任务提交。
后续任务运维操作详见:离线任务运维。