You need to enable JavaScript to run this app.
导航

Ksana for SparkSQL

最近更新时间2024.04.10 14:01:38

首次发布时间2022.08.19 11:39:52

1 权限管理

安装后默认已经预置了部分用户的权限,如已经预置 hive 用户的权限,如需添加新的用户和新的权限,可以在 Ranger Admin 界面添加新的权限 Policy,详细可以参考 Ranger 帮助文档下 Spark集成 章节。

2 Ksana for SparkSQL 高级配置

说明

  • 在 EMR-3.4.0 及以后的版本中,将下线 Ksana 组件相关功能;

  • 在 EMR-3.3.0 及之前的版本中,仍保留 Ksana 组件相关功能,您可创建 EMR-3.3.0 及之前的集群版本,来使用 Ksana 功能。

2.1 使用 Hudi

Hudi可通过创建连接的时候指定Hudi的参数,该方式针对当前连接生效:

beeline --hiveconf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --hiveconf spark.serializer=org.apache.spark.serializer.KryoSerializer --hiveconf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog  -u "jdbc:hive2://emr-master-1:10005/default;auth=LDAP" -n <user> -p <password>

创建Hudi表

create table hudi_mor_tbl (
    id int,
    name string,
    price double,
    ts bigint

) using hudi

tblproperties (
    type = 'cow',
    primaryKey = 'id',
    preCombineField = 'ts'

);

插入数据

insert into hudi_mor_tbl_1(id, name, price, ts)values(1, 'test', 1, 1);

查询结果

select * from hudi_mor_tbl;

[pool-30-thread-5] INFO  com.bytedance.emr.midas.engine.spark.operation.SparkOperation  - Processing EXECUTE_STATEMENT  statement: EXECUTE_STATEMENT , time taken: 0.471 seconds
+----------------------+------------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------+--------+-----+
| _hoodie_commit_time  |  _hoodie_commit_seqno  | _hoodie_record_key  | _hoodie_partition_path  |                 _hoodie_file_name                  | id  |  name  | price  | ts  |
+----------------------+------------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------+--------+-----+
| 20220429103652951    | 20220429103652951_0_1  | id:1                |                         | a4cadbf7-973c-44c1-b54c-1b07222e6040-0_0-2.parquet | 1   | test  | 1.0    | 1   |
+----------------------+------------------------+---------------------+-------------------------+----------------------------------------------------+-----+--------+--------+-----+
1 row selected (2.077 seconds)

2.2 使用Iceberg

Iceberg可通过创建连接的时候指定Hudi的参数,该方式针对当前连接生效:

beeline --hiveconf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog --hiveconf spark.sql.catalog.iceberg.type=hive --hiveconf spark.sql.catalog.iceberg.uri=thrift://emr-master-1:9083,thrift://emr-master-2:9083,thrift://emr-master-3:9083 --hiveconf spark.sql.catalog.iceberg.warehouse=hdfs://emr-master-1:8020/user/hive/warehouse/iceberg/hive --hiveconf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions -u "jdbc:hive2://emr-master-1:10005/default;auth=LDAP" -n <user> -p <password>

2.3 引擎默认参数

Ksana for SparkSQL 是提前在 Yarn 上提交 Spark 引擎,进行 SQL 的接收,返回执行结果,可以在提交作业的时候,指定参数的信息,也可以修改组件参数配置,设置默认资源,修改方式为:

  1. 集群详情 > 服务列表 > Spark > 服务参数

  2. 搜索如下参数,进行默认值的修改:

参数名称说明
spark.master作业的提交方式,默认为 yarn
spark.driver-memorydriver 端默认内存为 1g
spark.driver-coresdriver 默认 core 数量为 1
spark.executor-coresexecutor 默认 core 数量为 1
spark.num-executors默认 executor 数量为 1
spark.executor-memory默认 executor 内存为 1g
spark.queue作业提交队列,默认为 default

除了通过配置文件修改全局的默认参数外,也可在创建连接的时候动态的对某个参数进行覆盖,参数生效范围为本次连接,例如可通过:

./bin/beeline  --hiveconf spark.queue=队列名称

进行默认队列的修改,其余参数均可通过该方式进行设置,此外 Ksana for SparkSQL 兼容所有原生 Spark 参数,可以通过以下路径进行设置:

集群详情 > 服务列表 > Spark > 服务参数 > 自定义参数

关于 Spark 原生参数,请查看:https://spark.apache.org/docs/latest/configuration.html#overriding-configuration-directory

2.4 资源隔离级别

Ksana for SparkSQL 支持 Session,User,Open 三种不同的资源隔离级别,具体的定义如下:

  • Session:

    Session 级别下,Ksana for SparkSQL 会为每一次 connection 创建一个 Spark 引擎,该 connection 下的所有 SQL 作业均由该 Spark 引擎进行执行,connection 关闭后,该 Spark 引擎会被停止。

    对于资源隔离要求比较高,或者对资源独占有较高诉求的作业,可以选择此种模式。

  • User:

    User 级别下,Ksana for SparkSQL 会为每一个用户创建一个 Spark 引擎,该用户的所有 SQL 作业均会由该 Spark 引擎进行执行,connection 关闭后,该 Spark 引擎不会停止,在指定阈值周期内,没有新的用户作业提交,该引擎会自动停止。

    对于需要按照用户级别进行作业提交的场景,可以选择此种模式。

  • Open:

    Open 模式下,所有的用户的 SQL 作业会提交至同一个 Spark 引擎。

    对于没有严格的账号权限控制,或者希望共享资源信息的情况下,可以选择此种模式。

不同隔离级别的使用方式为:
建立连接的时候通过设置 midas.engine.isolation 为 SESSION/USER/OPEN,用来声明本次连接是什么隔离级别模式,例如:

./bin/beeline --hiveconf midas.engine.isolation=SESSION

以上命令启动一个 SESSION 模式的 Spark 引擎执行作业。由于 Ksana for SparkSQL 本质上是在 Yarn 上启动一个常驻的 Spark 作业,当资源隔离级别为 USER 或者 OPEN 的时候,如果该 Spark 作业在一定时间内没有接收到 SQL 作业,则该 Spark 作业会从 Yarn 上被停止,默认的空闲周期为1小时,用户可选择使用 ssh 登陆安装了 Ksana for SparkSQL 的机器,编辑如下文件/usr/lib/emr/current/midas-dist/config/midas-env.sh对如下值进行修改:

export KSANA_ENGINE_TIME_OUT=3600000
export KSANA_SESSION_TIME_OUT=3600000

该参数默认单位为毫秒,修改完成后,需要重启 Ksana for SparkSQL。

2.5 引擎预热

引擎预热为 Yarn 会长期保持多个活跃的 Spark 引擎,处于等待接收 SparkSQL 引擎的状态,降低作业第一次执行的时候冷启动带来的等待时间,引擎预热功能默认处于关闭状态,若要使用需要先打开引擎预热,打开方式为:

  1. 集群详情 > 服务列表 > Spark > 服务参数

  2. 修改 midas.prepare.engine 的值为 true,且对引擎参数进行配置,相关的参数如下:

参数名称说明
midas.prepare.engine引擎预热功能,默认为 false 不开启。
midas.prepare.engine.maximum最大引擎个数,默认为 3。
midas.prepare.engine.minimum最小引擎个数,默认为 1。
midas.prepare.engine.username提交 Spark 引擎的用户,默认为内置的 hive 用户。
midas.prepare.engine.level资源隔离级别,默认为 USER 级别。
midas.prepare.engine.queue引擎队列,默认为 default。
midas.prepare.engine.driverMemorydriver 内存资源,默认 1G。
midas.prepare.engine.driverCoresdriver core 数量,默认 1。
midas.prepare.engine.executorCoresexecutor core 数量,默认 1。
midas.prepare.engine.numExecutorexecutor 数量,默认 1。
midas.prepare.engine.executorMemoryexecutor 内存资源,默认 1G。

Ksana for SparkSQL 会基于当前集群资源,基于 midas.prepare.engine.maximum 和 midas.prepare.engine.minimum 的值计算出合适的作业个数,提前在 Yarn 上运行。

2.6 开启 Spark 动态资源

Ksana for SparkSQL 默认关闭 Spark 动态资源管理,若需要开启此功能,您可以按以下方法进行配置:

  1. 集群详情 > 服务列表 > Spark > 服务参数

  2. 修改如下参数:

参数名称说明
spark.dynamicAllocation.enabled动态资源开关,将其设置为 true,默认 false。
spark.shuffle.service.enabled动态资源开关,将其设置为 true,默认 false。
spark.dynamicAllocation.minExecutors最小资源数量,默认为 1。

即可开启 Spark 动态资源管理。

2.7 作业日志查看

Ksana for SparkSQL 本身为 Spark 引擎运行 SparkSQL 作业,因此可以在 Yarn 页面上查看对应的作业运行情况,同时也可在机器内查看到对应的日志,查看方式为:

  1. 集群管理 > 集群列表 > 具体集群名称, 进入 集群详情 界面.

  2. 导航栏中点击 服务列表,点击 Spark 服务并进入。

  3. 点击 emr 集群节点 (emr-master-1 主机名称)的 ECS ID,跳转进入到云服务器的实例界面,点击右上角的 远程连接 按钮,输入集群创建时的 root 密码,并进入远程终端。

进入 /usr/lib/emr/current/midas-dist/workers 目录,Ksana for SparkSQL 默认按照提交用户进行归类,可在此目录下查看具体 SQL 的执行日志。

2.8 日志周期配置

Ksana for SparkSQL 默认保留7天的日志,若需要保留更长时间的日志,进入:

  1. 集群详情 > 服务列表 > Spark > 服务参数

  2. 修改以下参数信息:

    参数名称说明
    midas.log.timeout.period日志保存周期,默认为 7 天,1000 * 60 * 60 * 24 * 7。
    midas.log.archive.size日志文件归档大小,默认 1GB。

可设置为业务需要的过期时间。

2.9 使用代码连接 Ksana for SparkSQL

Ksana for SparkSQL 兼容 Hive Driver,因此可以使用 Hive 驱动,修改连接端口即可执行 SQL 作业,以 Java 代码连接 Ksana for SparkSQL 为例:

  1. 引入相关驱动:

    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-jdbc</artifactId>
        <version>您的Hive版本</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>您的hadoop版本</version>
    </dependency>
    
  2. 使用如下代码执行SparkSQL作业:

    public static void main(String[] args) throws SQLException {
        Connection connection = null;
        try {
            Class.forName("org.apache.hive.jdbc.HiveDriver");
            Properties properties = new Properties();
            properties.put("user", "您的用户名");
            properties.put("password", "您的密码");
            connection = DriverManager.getConnection("您的Ksana for SparkSQL连接地址", properties);
            HiveStatement hiveStatement = (HiveStatement) connection.createStatement();
            ResultSet rs = hiveStatement.executeQuery("SLQ作业");
            while (rs.next()) {
                //todo 输出结果
            }
            for (String log: hiveStatement.getQueryLog()) {
                //todo 可获取日志
            }
        } catch (SQLException | ClassNotFoundException e) {
            e.printStackTrace();
        } finally {
            assert connection != null;
            connection.close();
        }
    }
    

可在 properties 对象中,基于业务需要调整相关参数,例如队列,内存等。