LAS Spark任务适用于定时执行Spark离线任务的场景,为满足您定制化的数据查询分析,DataLeap支持Jar包资源和Python资源引用的方式,来进行定制化的数据分析工作。
任务新建步骤如下:
任务新建完成后,在任务配置界面完成以下参数配置。
语言类型支持 Java、Python。
注意
语言类型暂不支持互相转换,切换语言类型会清空当前配置,需谨慎切换。
资源类型默认选择 Python 类型,且需选择资源所关联的 LAS Schema 库信息。
在编辑器中输入 Python 语句,执行引擎只支持 Python3.7。若需使用 python2.7 时,请在代码顶行加上 # -- coding: utf-8 --以表示文件编码。
示例脚本如下:
from pyspark import SparkConf from pyspark.context import SparkContext from pyspark.sql import SparkSession, SQLContext SparkContext._ensure_initialized() spark = SparkSession.builder \ .appName("pyspark_test_read_hive") \ .config("spark.hive.client.minibase.username", "axxxxxn") \ .config("spark.hive.client.minibase.password", "xxxxxxxxx") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext sql = spark.sql sqlContext = spark._wrapped sqlCtx = sqlContext df = sql("INSERT INTO TABLE db.las_table_name PARTITION (date = '20221213') VALUES ('test1', 'test2')") df.show() spark.stop()
注意
设置系统环境变量时,避免直接覆盖系统环境变量,请按照追加方式指定,例如PATH=$PATH:/home/lihua/apps/bin/;
参数 | 说明 |
|---|---|
Spark 参数 | |
Main Class | 语言类型为 Java 时,需填写Main Class主类信息,如org.apache.spark.examples.JavaSparkPi。 |
Py-files | 语言类型为 Python 时,可添加Py-files资源文件,您可下拉选择已上传的 LAS 资源,或 新建资源,详见资源库。 |
Conf参数 | Spark配置信息,支持单行编辑模式和脚本编辑模式。相关配置信息说明,可参见下方“Spark优化参数说明表”。 |
任务参数 | |
自定义参数 | 输入任务中已定义的参数,多个参数以空格形式进行分隔,例如 param1 param2 param3,参数最终将以字符串形式传入。 |
任务产出数据登记,用于记录任务---数据血缘信息,并不会对代码逻辑造成影响。对于系统无法通过解析获取产出信息的任务,可手动登记其产出信息。
如果任务含有 LAS 表的写入操作,强烈建议填写。您填写的内容即为任务产出,支持填写多个。其他任务的依赖推荐会根据此处填写的 LAS 表信息进行推荐。 具体登记内容包括:
任务配置完成后,您可单击右侧侧边栏的调度设置,配置调度设置参数。调度设置说明详见“调度设置”。
Spark insert overwrite 使用示例df.write.insertInto("aaa_bbb_doc_html_detail", True)
用户自定义参数较多时,且存在K-V参数时,可参考如下配置:
Spark程序中,如果自定义参数较多,可直接在自定义参数输入框中配置,使用空格分隔。数据开发运行时,会将自定义参数拼接到Main Class之后,配置输入参数格式需符合spark main class程序解析方式,如下图所示。
如需使用时间变量,也可以直接在自定义参数中配置${date}/${DATE}等系统变量。
Spark优化参数
Spark优化参数说明表
参数 | 说明 | 默认值 |
|---|---|---|
spark.executor.instances | 静态资源下:executor数 | 2 |
spark.executor.cores | 每个executor和CPU数 | 4 |
spark.dynamicAllocation.enabled | 动态资源开关 | false |
spark.dynamicAllocation.maxExecutors | 动态资源下:executor的最大个数 | 500 |
spark.executor.memory | 每个executor的内存大小 | 8g |
spark.memory.fraction | executor用于计算的内存比例,剩余部分用于存储元数据和运行信息。对于executor内存开的较大的任务,可以适当提高这个值,让更多内存参与计算,但会增加OOM风险 | 0.6 |
spark.executor.memoryOverhead/ spark.yarn.executor.memoryOverhead | 每个executor的堆外内存大小,堆外内存主要用于数据IO,对于报堆外OOM的任务要适当调大,单位Mb,与之配合要调大executor JVM参数,例如: set spark.executor.memoryOverhead=3072 set spark.executor.extraJavaOptions=-XX:MaxDirectMemorySize=2560m | 6144 |
spark.sql.adaptive.enabled | Adaptive execution开关,包含自动调整并行度,解决数据倾斜等优化 | true |
spark.sql.adaptive.minNumPostShufflePartitions | AE相关,动态最小的并行度 | 1 |
spark.sql.adaptive.maxNumPostShufflePartitions | AE相关,动态最大的并行度,对于shuffle量大的任务适当增大可以减少每个task的数据量,如1024 | 1000 |
spark.sql.adaptive.join.enabled | AE相关,开启后能够根据数据量自动判断能否将sortMergeJoin转换成broadcast join | true |
spark.sql.adaptiveBroadcastJoinThreshold | AE相关,spark.sql.adaptive.join.enabled设置为true后会判断join的数据量是否小于该参数值,如果小于则能将sortMergeJoin转换成broadcast join | spark.sql.autoBroadcastJoinThreshold |
spark.sql.adaptive.skewedJoin.enabled | AE相关,开启后能够自动处理join时的数据倾斜,对于数据量明显高于中位数的task拆分成多个小task | false |
spark.sql.adaptive.skewedPartitionFactor | AE相关,数据倾斜判定标准,当同一stage的某个task数据量超过中位数的N倍,将会判定为数据倾斜 | 5 |
spark.sql.adaptive.skewedPartitionMaxSplits | AE相关,被判定为数据倾斜后最多会被拆分成的份数 | 5 |
spark.shuffle.accurateBlockThreshold | AE相关,数据倾斜判定基于shuffle数据量统计,如果统计所有的block数据,消耗内存较大,因此设有阈值,当shuffle的单个数据块超过大小和行数阈值时,才会进入统计,这个参数即大小阈值 | 10010241024(100MB) |
spark.shuffle.accurateBlockRecordThreshold | AE相关,同上,行数阈值,如果设置了上面的数据倾斜处理开关,仍然倾斜,可能是因为这几个参数设得偏大,适当缩小 | 2 * 1024 * 1024 |
spark.sql.files.maxPartitionBytes | 默认一个task处理的数据大小,如果给的太小会造成最终任务task太多,太大会是输入环节计算较慢 | 1073741824 |
spark.vcore.boost.ratio | vcore,虚拟核数,设置大于1的数可以使一个核分配多个task,对于简单sql可以提升CPU利用率,对于复杂任务有OOM风险 | 1 |
spark.shuffle.hdfs.enabled(长任务推荐) | HDFS based Spark Shuffle开关,可以提高任务容错性。遇到org.apache.spark.shuffle.FetchFailedException报错需设置 | false |
set spark.shuffle.io.maxRetries=1; set spark.shuffle.io.retryWait=0s; | 一般在开启hdfs shuffle后还可以开启这两个参数,避免不必要的重试和等待 | |
spark.sql.crossJoin.enabled | 对于会产生笛卡尔积的sql,默认配置是限制不能跑的,在hive里可以配置set hive.mapred.mode=nonstrict跳过限制,相对应的在spark里可以配置set spark.sql.crossJoin.enabled=true起到同样的效果。 | false |
spark.sql.broadcastTimeout | broadcast joins时,广播数据最长等待时间,网络不稳定时,容易出现超时造成任务失败,可适当增大此参数。 | 300(单位:s) |
spark.sql.autoBroadcastJoinThreshold | 表能够使用broadcast join的最大阈值 | 10MB |
spark.network.timeout | 网络连接超时参数 | 120s |
spark.maxRemoteBlockSizeFetchToMem | reduce端获取的remote block存放到内存的阈值,超过该阈值后数据会写磁盘,当出现数据量比较大的block时,建议调小该参数(比如512MB)。 | Long.MaxValue |
spark.reducer.maxSizeInFlight | 控制从一个worker拉数据缓存的最大值 | 48m |
spark.merge.files.enabled | 合并输出文件,如果insert结果的输出文件数很多,希望合并,可以设为true,会多增加一个repartition stage合并文件,repartition的分区数由spark.merge.files.number控制 | false |
spark.merge.files.number | 控制合并输出文件的输出数量 | 512 |
spark.speculation | 推测执行开关。如果是原生任务很有可能没开这个参数,会出现个别task拖慢整个任务,可以开启这个参数。 | true |
spark.speculation.multiplier | 开启推测执行的时间倍数阈值:当某个任务运行时间/中位数时间大于该值,触发推测执行。对于因为推测执行而浪费较多资源的任务可以适当调高这个参数。 | 1.5 |
spark.speculation.quantile | 同一个stage中的task超过这个参数比例的task完成后,才会开启推测执行。对于因为推测执行而浪费较多资源的任务可以适当调高这个参数。 | 0.75 |
spark.default.parallelism | Spark Core默认并发度,原生spark程序并发度设置 | 200 |
spark.sql.shuffle.partitions | Spark SQL默认并发度,AE开启后被spark.sql.adaptive.maxNumPostShufflePartitions取代 | 200 |
spark.sql.sources.bucketing.enabled | 分桶表相关,当设置为false,会将分桶表当作普通表来处理。做为普通表会忽略分桶特性,部分情况性能会下降。但如果分桶表没有被正确生成(即表定义是分桶表,但数据未按分桶表生成)会报错RuntimeException: Invalid bucket file,避免这个错误,要将这个参数设为false | true |
spark.sql.partition.rownum.collect.enable | 统计生成固定分区表行数 | false |
spark.sql.dynamic.partition.rownum.collect.enable | 统计生成动态分区表行数 | false |
以下将为您演示如何通过LAS Spark任务,使用JAR和Python语言方式,来访问LAS表中的数据。
您可通过LAS管理员账户或添加的托管账号信息来访问LAS数据。
需要在 pom 里面加入以下依赖:
说明
使用provided的依赖,同时不要带入hive相关的依赖,避免与已有的数据中台冲突。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.12</artifactId> <version>3.0.1</version> <scope>provided</scope> </dependency>
您也可以根据实际情况添加需要使用的自定义connector。
import org.apache.spark.sql.SparkSession; import java.util.Arrays; public class SparkJarTest { SparkSession spark = SparkSession.builder() .appName("las-spark-test:spark_sdk_test") .enableHiveSupport() .config("spark.hive.client.minibase.username", ${username}) .config("spark.hive.client.minibase.password", ${password}) .getOrCreate(); String sql = "insert into ${schema.table} PARTITION(dt=2023) select * from ${schema.table} where dt=2022"; spark.sql(sql); spark.close(); }
在初始化SparkSession时,通过config传入相关配置:
参数 | 说明 |
|---|---|
spark.hive.client.minibase.username | 替换为LAS控制台中创建的username信息 |
spark.hive.client.minibase.password | 替换为LAS控制台账号对应的password信息 |

您可参考下方示例代码,通过Python语言方式,来访问LAS数据表信息。
新建LAS Spark任务,语言类型选择Python方式。创建任务操作详“新建任务”。
输入以下示例代码:
from pyspark.sql import SparkSession from pyspark.context import SparkFiles from pyspark.sql import SparkSession,SQLContext SparkContext._ensure_initialized() spark=SparkSession .builder.appName("las_test") .config("spark.hive.client.minibase.username","las控制台使用的username") .config("spark.hive.client.minibase.password","las控制台使用的password") .enableHiveSupport() .getOrCreate() sc = spark.sparkContext sql = spark.sql sqlContext = spark._wrapped sqlCtx = sqlContext #下面sql语句请替换成需要执行的sql语句 df = sql("select * from database.table_name where date='xxxx';") df.show() spark.stop()
参数说明:
参数 | 说明 |
|---|---|
spark.hive.client.minibase.username | 替换为LAS控制台中创建的username信息 |
spark.hive.client.minibase.password | 替换为LAS控制台账号对应的password信息 |