You need to enable JavaScript to run this app.
导航
集成示例
最近更新时间:2023.02.23 17:04:35首次发布时间:2023.02.23 11:06:57

本文通过演示在火山引擎 E-MapReduce(EMR)的 DolphinScheduler 中运行 Spark、Flink 任务的示例,帮助您更好地理解 DolphinScheduler 的使用。

1 使用前提

  1. 已创建包含 DolphinScheduler 组件的 EMR 集群。详见创建集群

  2. DolphinScheduler 组件服务,需要为 emr-master-1 机器实例绑定弹性公网 IP,并配置服务端口(DolphinScheduler 默认的服务端口是 12345)才能访问 DolphinScheduler Web UI,详见访问链接

2 资源中心

在 DolphinScheduler 中,资源中心通常用于上传文件、UDF 函数和任务组管理。 EMR DolphinScheduler 中的资源中心基于同集群内的 Hadoop 集群,默认目录是:/dolphinscheduler,由配置项 resource_upload_path 定义。关于如何修改服务配置参数,请参阅管理服务配置参数

本示例中,我们会运行一个经典的 WordCount 程序,需要先将所需的 spark jar、flink jar 与 word 文本文件(见下文)上传到资源中心,然后在后续定义具体工作流时进行引用。

  • Spark jar:

    spark_test_jar.jar
    1.49MB

  • Flink jar:

    Flink_test_jar.jar
    14.32KB

  • Word 文本信息

    wordcount文件.txt
    1.15KB

上传操作如下:

  1. 登录 EMR 控制台

  2. 在左侧导航栏中,单击集群管理 > 集群列表 > DolphinScheduler 集群详情 > 访问链接 > DolphinScheduler UI 访问链接, 进入 Web UI 登录界面。

  3. 输入对应的用户名和密码信息,确认后进入 Web UI 界面。创建用户请参阅快速开始---创建用户

  4. 在上方导航栏中,单击资源中心按钮,进入资源文件夹管理界面。

  5. 文件管理界面,单击上传文件按钮,从本地选择对应文件,单击确定按钮,完成资源上传。
    alt

3 数据源中心

在运行 Spark SQL、Hive SQL 类型任务时,DolphinScheduler 要求在数据源中心中预先配置好数据源连接信息。这里以 EMR 3.x 版本的 Hadoop 类型集群中配置一个 Spark 数据源为例。

3.1 HA 集群

HA集群配置示例如下:

  1. 单击界面上方导航栏中的数据源中心按钮,进入数据源中心界面。

  2. 单击创建数据源,配置以下数据源信息:

    配置项示例说明
    数据源spark下拉选择数据源类型,此处选择 spark 数据源类型。
    数据源名称spark_source输入数据源名称信息。
    描述HA 集群配置输入该数据源的描述信息,方便后续管理。
    IP 主机名emr-master-1,emr-master-2,emr-master-3输入 spark 数据源的 master 名称。
    端口2181填写对应的端口号信息。
    用户名hiveSpark 数据源下对应的用户名信息
    密码******对应的密码信息。
    数据库名default数据源下对应的数据库名信息。
    Jdbc 链接参数{"serviceDiscoveryMode":"zooKeeper","zooKeeperNamespace":"midas/ha","auth":"LDAP"}输入 json 格式的连接参数,以 {"key1":"value1","key2":"value2"...} 格式输入,非必填。
  3. 以上使用的用户名密码可以从以下路径获取:

    1. 进入 EMR 控制台 > 集群管理 > 集群列表 > DolphinScheduler 集群详情 > 服务列表 > OpenLDAP > 服务参数界面。

    2. 获取 Hive 服务参数名称的管理员账号信息:

      • 账号:hive_admin 配置项内容;

        • 密码:hive_password配置项内容。

3.2 非 HA 集群

非 HA 集群配置示例如下:

配置项示例说明
数据源spark下拉选择数据源类型,此处选择 spark 数据源类型。
数据源名称spark_data输入数据源名称信息。
描述非 HA 集群配置输入该数据源的描述信息,方便后续管理。
IP 主机名emr-master-1输入 spark 数据源的 master 名称。
端口10005填写对应的端口号信息。
用户名hiveSpark 数据源下对应的用户名信息
密码******对应的密码信息。
数据库名default数据源下对应的数据库名信息。
Jdbc 链接参数输入 json 格式的连接参数,以 {"key1":"value1","key2":"value2"...} 格式输入,非必填。

4 工作流定义

4.1 使用前提

  1. 已创建相关 DolphinScheduler 项目、用户信息及队列信息。详见快速开始

4.2 操作步骤

  1. 单击界面上方项目管理按钮,进入项目列表界面。

  2. 单击详细项目名称,进入到项目概览界面。

  3. 在左侧导航栏中,单击工作流 > 工作流定义按钮,进入工作流定义页面。

  4. 单击创建工作流按钮,进入工作流 DAG 编辑页面,拖动相应的任务类型到右边栏中,新增对应任务。

4.3 Hive / Spark SQL

4.3.1 配置作业

拖动 SQL 任务类型到右边栏中,完成以下配置信息:

参数说明
节点名称输入任务类型的节点名称信息。
运行标志选择任务运行的状态,支持选择正常、禁止执行,若勾选“禁止执行”,运行工作流不会执行该任务。
描述为当前节点填写任务描述信息,方便后续区分。
任务优先级选择当前任务的优先级情况,从 HIGHEST 到 LOWSET 分为5个等级,您可根据实际情况进行选择,当 worker 线程数不足时,级别高的任务在执行队列中会优先执行,相同优先级的任务按照先进先出的顺序执行。
Worker 分组下拉选择已创建成功的 Worker 分组信息。
环境名称配置任务执行的环境,下拉选择已创建成功的环境名称。
任务组名称通过任务组来管理任务实例占用的资源,避免占用太多资源导致了集群其他组件受到资源上的影响。您可在资源中心 > 任务组管理中,创建任务组信息。详见任务组管理
组内优先级设置该任务,在当前任务组内的优先级,在同个任务组内优先获得资源。
失败重试次数任务失败重新提交的次数,可进行手动填充。
失败重试间隔任务失败重新提交任务的时间间隔,可以进行手动填充。
延时执行时间任务延迟执行的时间,以分为单位。
超时告警设置超时告警、超时失败。当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。
数据源类型下拉选择已创建成功的 Spark 数据源。
数据源实例选择的测试连通性成功的 Spark 数据源实例信息。

SQL 类型---查询

  • SQL 查询类型,您可以选择是否开启发送邮件告警:

    • 主题:告警邮件发送的主题信息。

    • 告警组:选择对应的告警组信息,用于邮件接收。配置详见快速开始

  • 日志显示:指定日志中展示的查询结果行数。

SQL 类型---非查询可执行多段SQL,并设置分段执行符号。
SQL 语句输入 SQL 执行语句。
UDF 函数对于 HIVE 类型的数据源,可以引用资源中心中创建的 UDF 函数,其他类型的数据源暂不支持UDF函数。
自定义参数SQL 任务类型,而存储过程是自定义参数顺序,给方法设置值自定义参数类型和数据类型,同存储过程任务类型一样。区别在于 SQL 任务类型自定义参数会替换 SQL 语句中${变量}。
前置 SQL 语句前置 SQL 在 SQL 语句之前执行。
后置 SQL 语句后置 SQL 在 SQL 语句之后执行。
前置任务设置当前任务的前置(上游)任务,形成上下游依赖关系。

Spark SQL 与 Hive 一致,只需要在数据源选择的时候选择正确的类型以及实例即可。
SQL 语句示例:

-- SQL语句:
create table if not exists dolphin_test_spark_x(id int);

4.3.2 运行作业

作业配置完成后,您可执行以下步骤,执行配置的作业:

  1. 在编辑界面,右上角单击保存按钮,保存工作流的配置。

  2. 回到工作流定义列表界面,单击操作列中的上线按钮,完成作业发布。

  3. 单击运行运行按钮,工作流生成相应的工作流实例,您可以进入工作流实例界面,进行查看具体实例执行情况。

    alt

  4. 待工作流实例运行完成后,登录到 DolphinScheduler 集群的实例节点,执行如下命令:

    登入方式详见创建租户-集群远程登录

spark-sql

show tables;

alt

Flink 类型任务中统一包含了 SQL 与 JAR 两种提交方式。

4.4.1 JAR 包提交

工作流主要参数配置如下:

参数示例值说明
程序类型JAVA支持选择 JAVA、SQL、SCALA、PYTHON 类型。
主函数的 Classorg.apache.flink.streaming.examples.wordcount.WordCount输入 Jar 包中的主函数 Class 信息。
主程序包Flink_test_jar.jar下拉选择已上传成功的 Jar 包资源。
部署方式pre-job/cluster支持 pre-job/cluster(Flink 版本大于 1.10 时可选)、application、local的部署方式
Flink 版本>=1.12支持选择 '<1.10'、'1.11'、'>=1.12' 三个 Flink 版本信息。

主程序参数

-input hdfs://emr-master-1:8020/dolphinscheduler/项目名称/resources/wordcount文件.txt --output hdfs:/emr-master-1:8020/tmp/test_x

根据实际的路径和文件名,填写主程序参数。

说明

  • ha集群用hdfs://emr-cluster/...

  • 本示例运行一个 WordCount 程序,该程序要求有一个输入的文本文件,并将结果输出到指定位置,这里我们通过主程序参数中的 --input--output 予以指定。

资源wordcount文件.txt下拉选择已上传成功的文件信息。

4.4.2 SQL

运行 Flink SQL 任务只需要将程序类型选择为 SQL,并在脚本输入框中录入需要执行的 SQL 即可。关于 Flink 不同部署方式在 EMR 中的支持情况,请参阅 Flink 基础使用

4.5 Spark-submit

与 Flink JAR 提交的方式相同,Spark-submit 运行同样依赖通过资源中心上传的程序包和配套资源文件。
工作流主要参数配置如下:

参数示例值说明
程序类型JAVA支持选择 JAVA、SQL、SCALA、PYTHON 类型。
Spark 版本SPARK2根据实际场景,选择 Spark 的版本信息,支持选择 Spark1、Spark2。
主函数的 Classorg.apache.spark.examples.JavaWordCount输入 Jar 包中的主函数 Class 信息。
主程序包spark_test_jar.jar下拉选择已上传成功的 Jar 包资源。
部署方式cluster支持 cluster、client、local 的部署方式。

主程序参数

/dolphinscheduelr/租户名/resources/资源中心文件夹名/文件名称

根据实际的租户名和文件名,填写主程序参数。

资源wordcount文件.txt下拉选择已上传成功的文件信息。

配置完成,并运行工作流任务成功后,您可登入到 YARN UI 界面,查看对应的执行结果:

  1. 进入集群管理 > 集群列表 > DolphinScheduler 集群详情 > 服务列表 > OpenLDAP > 服务参数界面。

  2. 获取登录 YARN UI 的用户名和密码信息。

  3. 进入DolphinScheduler 集群详情 > 访问链接, 单击 YARN ResourceManager UI 访问链接,并登录。

  4. 在对应的 spark 作业 ID 下,查看最终的执行日志结果。