LAS Spark 任务适用于定时执行 Spark 离线任务的场景,支持 Jar 包资源和 Python资源引用的方式。
项目已绑定 湖仓一体分析服务(LAS)引擎,操作详见:新建项目。
在任务配置界面完成以下参数配置。
语言类型支持 Java、Python。
注意
语言类型暂不支持互相转换,切换语言类型会清空当前配置,需谨慎切换。
from pyspark import SparkFiles from pyspark.sql import SparkSession from pyspark.sql import SQLContext job_name='pyspark_test_on_las' spark = SparkSession.builder.appName(job_name).getOrCreate() spark.sql("select 1").show(10) spark.stop()
注意
设置系统环境变量时,避免直接覆盖系统环境变量,请按照追加方式指定,例如PATH=$PATH:/home/lihua/apps/bin/
;
参数 | 说明 |
---|---|
Spark 参数 | |
Main Class/Py-files |
|
Conf参数 | 配置任务中需设置的一些 conf 参数,例如您可通过
您可通过以下两种方式来进行配置:
|
任务参数 | |
自定义参数 | 输入任务中已定义的参数,多个参数以空格形式进行分隔,例如 param1 param2 param3,参数最终将以字符串形式传入。 |
任务产出数据登记,用于记录任务---数据血缘信息,并不会对代码逻辑造成影响。对于系统无法通过解析获取产出信息的任务,可手动登记其产出信息。
如果任务含有 LAS 表的写入操作,强烈建议填写。您填写的内容即为任务产出,支持填写多个。其他任务的依赖推荐会根据此处填写的 LAS 表信息进行推荐。 具体登记内容包括:
以下示例将为您演示如何通过 LAS Spark 任务中 Python 语言方式,来直接访问 LAS 表中的数据。
CREATE TABLE IF NOT EXISTS test_schema.student_demo ( id INT COMMENT 'id', name STRING COMMENT 'name', age INT COMMENT 'age' ) PARTITIONED BY (date STRING COMMENT 'date partition') stored as bytelake; INSERT INTO test_schema.student_demo PARTITION (date = '20230518') VALUES(1, 'TOM', 10);
说明
LAS ByteLake 2.0 和 Managed Hive 表类型,推荐使用以下 lasfs 示例;对于 ByteLake1.0 内表,需继续使用 Tunnel 方式。详见 Spark Jar 作业开发。
from pyspark import SparkConf from pyspark.context import SparkContext from pyspark.sql import SparkSession, SQLContext SparkContext._ensure_initialized() spark = SparkSession.builder \ .appName("pyspark_test_read_hive_x_new_1") \ .config("spark.sql.lasfs.enabled", "true") \ .config("spark.hadoop.hive.exec.scratchdir", "lasfs:/warehouse/tmp/hive") \ .config("spark.hadoop.tmp.dir", "lasfs:/warehouse/tmp/hadoop") \ .config("spark.hadoop.fs.lasfs.impl", "com.volcengine.las.fs.LasFileSystem") \ .config("spark.hadoop.fs.lasfs.endpoint", "100.96.4.84:80") \ .config("spark.hadoop.fs.lasfs.service.region", "cn-beijing") \ .config("spark.hadoop.fs.lasfs.access.key", "AKxxx") \ .config("spark.hadoop.fs.lasfs.secret.key", "SKxxx==") \ .config("spark.sql.analysis.hoodie.relation.conversion.skip.enabled", "false") \ .config("spark.sql.extensions", "org.apache.spark.sql.LASExtension") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext sql = spark.sql sqlContext = spark._wrapped sqlCtx = sqlContext df = sql("insert into test_schema.student_demo partition (date='20240619') values('1','TOM','10')") df.show() spark.stop()
替换参数说明:
参数 | 说明 |
---|---|
appName | 自定义 Spark 任务名称。 |
access.key | 可前往访问控制的密钥管理中查看 Access Key ID。相关操作说明请参见 Access Key(密钥)管理。 |
secret.key | 可前往访问控制的密钥管理中查看 Secret Access Key。相关操作说明请参见 Access Key(密钥)管理。 |
endpoint | 需按照您所在的 Region 信息进行配置,不同地域值,详见 Spark Jar 作业开发 - spark.hadoop.fs.lasfs.endpoint。 |
region | 按照您所在的 Region 信息,对应配置:
|
任务配置完成后,您可单击操作栏中的保存和调试按钮,进行任务调试。
注意
待任务执行成功后,您可进入火山引擎控制台:
调试任务成功,并查看日志校验数据情况无误后,返回数据开发界面,将任务提交发布到运维中心离线任务运维中执行。
单击上方操作栏中的保存和提交上线按钮,在提交上线对话框中,选择回溯数据、监控设置、提交设置等参数,最后单击确认按钮,完成作业提交。 提交上线说明详见:数据开发概述---离线任务提交。
后续任务运维操作详见:离线任务运维。