Bolt是火山引擎 E-MapReduce(EMR)团队自研的统一湖仓分析加速C++引擎,使用了向量化计算、Codegen等加速技术。Spark on Bolt采用Gluten+Bolt的方式来加速Spark。
Spark on Bolt的整体架构如下:
其核心包含两部分:
执行计划的转换和传递。通过 Substrait 来实现物理执行计划在 JVM 和 Native 的传递,Substrait 可以理解为是基于 google protobuf 的面向关系对象的跨语言序列化 SDK,Bolt 也支持基于 Substrait 作为执行计划输入
数据的传递。SparkSQL 的执行是 Row-based,算子操作的对象是 Row,而在向量化框架里,算子操作的对象是 ColumnarBatch,一个 ColumnarBatch 可以包含多行数据,ColumnarBatch 里每个字段的数据结构是 Column Vector
执行计划的转换发生在 物理执行计划 阶段。这意味着 Plan 相关的优化规则对于Spark on Bolt依然生效。
Spark通过Unified Memory Manager来管理内存,以及协调不同算子之间的Spill 和 内存释放。Spark on Bolt 也接入并复用了这个能力。Bolt 提供了一个 Memory Pool 来管理 Native 算子在运行时对内存的申请和释放,并且通过注册Listerner的方式调用 JNI 回调给 JVM 的 Task Memory Consumer,从而接入 Spark的Unified Memory Manager。
对于一个没有算子Fallback的 Spark on Bolt任务,它的 OnHeap 使用应该是非常少的,所有和数据有关的算子都是用的 OffHeap。这也降低了在大内存场景下 GC 导致的性能损失。
从Runtime的视角看Spark on Bolt 如下
可以看到,在每个Spark Task粒度上,通过JNI调用Bolt来执行单个Task的计算逻辑。我们依然可以复用Spark健壮的调度框架,推测执行,task/stage 不同粒度的失败重试,以及丰富的生态,比如 yarn/k8s等。
目前Spark on Bolt可以通过增加配置的方式启用。
# bolt set spark.plugins=io.glutenproject.GlutenPlugin; set spark.shuffle.manager=org.apache.spark.shuffle.gluten.celeborn.CelebornShuffleManager; set spark.gluten.sql.columnar.numaBinding=false; set spark.gluten.sql.columnar.coreRange=0-35,72-107|36-71,108-143; set spark.gluten.sql.columnar.shuffleSplitDefaultSize=8192; set spark.gluten.sql.columnar.forceShuffledHashJoin=true; set spark.sql.bucket.union.enable=false; set spark.sql.optimizer.dynamicDataPruning.enabled=false; set spark.sql.optimizer.dynamicDataPruning.datasource.enabled=false; set spark.sql.mergeGetMapValue.enabled=false; set spark.sql.collapseGetJsonObject.enabled=false; set spark.sql.CollapseExpensiveExprMode=None; set spark.sql.execution.useObjectHashAggregateExec=false; set spark.sql.optimizer.partialAggregationOptimization.enabled=false; set spark.sql.aggregate.adaptivePartialAggregationThreshold=0; set spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct=80; set spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows=100000; set spark.gluten.sql.columnar.maxBatchSize=32768; set spark.gluten.sql.columnar.maxBatchByteSize=41943040; set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAggregates,org.apache.spark.sql.hive.auth.GeminiSparkAuthorization; set spark.sql.gluten.customHadoopConfEnforce.enabled=false; set spark.kubernetes.driverEnv.CPP_HDFS_CONF=/opt/tiger/hadoop_conf/core-site.xml; set spark.executorEnv.CPP_HDFS_CONF=/opt/tiger/hadoop_conf/core-site.xml; set spark.kubernetes.driverEnv.LD_PRELOAD=/opt/tiger/spark_deploy/spark-stable/native_lib/libjemalloc.so; set spark.executorEnv.LD_PRELOAD=/opt/tiger/spark_deploy/spark-stable/native_lib/libjemalloc.so; set spark.kubernetes.driverEnv.HADOOP_CONF_DIR=/opt/tiger/hadoop_conf; set spark.executorEnv.HADOOP_CONF_DIR=/opt/tiger/hadoop_conf;
Bolt计算使用非堆内存,所以在启用Bolt的时候对应要修改Spark内存使用,默认情况下我们会配置以下2个参数:
// 堆内内存 spark.executor.memory 12G // 预留非堆内存 spark.executor.memoryOverhead 4096
启用Bolt后,建议设置参数如下:
// 堆内内存,建议保留原来堆内内存的25% set spark.executor.memory=3G; // 预留非堆内存,建议不变 set spark.executor.memoryOverhead=4096; // 启用offHeap内存 set spark.memory.offHeap.enabled=true; // offHeap内存大小,建议设置为原来堆内内存的75% set spark.memory.offHeap.size=9G; 注:启用Bolt后 spark.executor.memory+spark.memory.offHeap.size=原spark.executor.memory
注意
由于SQL专用资源队列会预热拉起,不支持重新配置内存,如果需要更改内存配置,请使用公共队列或者具有通用资源的队列,并设置如下参数:
set tqs.query.engine.type = sparkcli;
如果的确希望在SQL专用资源更改内存配置,请 工单联系我们。