You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

新手求教:如何在Apache Airflow中运行带参数的Spark Submit任务

Hey there! As someone new to both Spark and Airflow, I get exactly what you're trying to do—translate that familiar spark-submit command into an Airflow task that can accept and pass parameters smoothly. Let's break this down into two straightforward approaches that fit your needs:


方法1:用BashOperator直接复用你的spark-submit命令

Since you're already comfortable with the spark-submit syntax, this is the most intuitive way to start. We'll use Airflow's BashOperator and leverage Jinja2 templating to inject parameters dynamically.

Here's a complete DAG example:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

# 基础配置,按需修改
default_args = {
    'owner': 'your_username',
    'start_date': datetime(2024, 1, 1),
    'retries': 1
}

with DAG(
    dag_id='spark_daily_job',
    default_args=default_args,
    schedule_interval=None,  # 手动触发,或者改成你需要的调度周期比如 '@daily'
    catchup=False
) as dag:
    # 把你的spark-submit命令转换成模板化的字符串
    spark_submit_template = """
    EXECUTORS_MEM={{ params.executors_mem }} EXECUTORS_NUM={{ params.executors_num }} \
    STARTDAY={{ params.startday }} ENDDAY={{ params.endday }} QUEUE={{ params.queue }} jobname={{ params.jobname }} \
    /home/spark/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --verbose --master yarn --deploy-mode client \
    --num-executors {{ params.executors_num }} --executor-memory {{ params.executors_mem }} \
    --executor-cores 1 --driver-memory 2G \
    # 替换成你的Spark作业脚本的实际路径
    /path/to/your/spark/script.py
    """

    run_spark_task = BashOperator(
        task_id='execute_spark_job',
        bash_command=spark_submit_template,
        # 这里定义默认参数,触发DAG时可以在UI里修改这些值
        params={
            'executors_mem': '4G',
            'executors_num': 300,
            'startday': '20180401',
            'endday': '20180401',
            'queue': 'm',
            'jobname': 'x'
        }
    )

怎么用?

When you trigger the DAG from the Airflow UI, you can click "Trigger DAG w/ config" and override the params values (like changing startday and endday for different date ranges) without touching the code. The Jinja2 templates {{ params.xxx }} will automatically replace with your input values.


方法2:用Airflow原生的SparkSubmitOperator

If you want a cleaner, more Airflow-native approach, use the SparkSubmitOperator—it wraps spark-submit into a dedicated operator with built-in parameters, so you don't have to write the full command string.

First, make sure you have the Apache Spark provider installed:

pip install apache-airflow-providers-apache-spark

Then here's the DAG example:

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime

default_args = {
    'owner': 'your_username',
    'start_date': datetime(2024, 1, 1),
    'retries': 1
}

with DAG(
    dag_id='spark_daily_job_native',
    default_args=default_args,
    schedule_interval=None,
    catchup=False
) as dag:
    run_spark_task = SparkSubmitOperator(
        task_id='execute_spark_job',
        # 你的Spark作业脚本路径
        application='/path/to/your/spark/script.py',
        # 提前在Airflow UI的「连接」里配置好spark_default(指向YARN集群)
        conn_id='spark_default',
        verbose=True,
        master='yarn',
        deploy_mode='client',
        # Spark资源配置
        num_executors='300',
        executor_memory='4G',
        executor_cores='1',
        driver_memory='2G',
        # YARN队列
        queue='m',
        # 传递给Spark脚本的命令行参数(比如STARTDAY/ENDDAY)
        application_args=['--startday', '{{ params.startday }}', '--endday', '{{ params.endday }}'],
        # 传递环境变量(比如jobname)
        env_vars={'jobname': '{{ params.jobname }}'},
        # 默认参数,同样可以在触发时修改
        params={
            'startday': '20180401',
            'endday': '20180401',
            'jobname': 'x'
        }
    )

注意事项:

  • You'll need to configure the spark_default connection in Airflow UI: Go to Admin > Connections, find spark_default, and set the host to your YARN resource manager address (e.g., yarn://your-resource-manager:8032).
  • Your Spark script should be set up to parse command-line arguments (using argparse in Python, for example) to handle the --startday and --endday values passed via application_args.

一些实用提示
  • Dynamic Date Parameters: Instead of hardcoding dates, you can use Airflow's built-in macros like {{ ds_nodash }} (format YYYYMMDD) to automatically pass the DAG run date, e.g., STARTDAY={{ ds_nodash }} ENDDAY={{ ds_nodash }}.
  • Airflow Variables: For parameters that don't change often (like executor memory/num), you can store them in Airflow Variables (Admin > Variables) and reference them with {{ var.value.executors_mem }} instead of hardcoding in params.
  • Worker Access: Ensure that Airflow worker nodes have access to the Spark installation path and your Spark script (either via shared storage like HDFS or local paths that exist on all workers).

内容的提问来源于stack exchange,提问作者Nick

火山引擎 最新活动