新手求教:如何在Apache Airflow中运行带参数的Spark Submit任务
Hey there! As someone new to both Spark and Airflow, I get exactly what you're trying to do—translate that familiar spark-submit command into an Airflow task that can accept and pass parameters smoothly. Let's break this down into two straightforward approaches that fit your needs:
Since you're already comfortable with the spark-submit syntax, this is the most intuitive way to start. We'll use Airflow's BashOperator and leverage Jinja2 templating to inject parameters dynamically.
Here's a complete DAG example:
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime # 基础配置,按需修改 default_args = { 'owner': 'your_username', 'start_date': datetime(2024, 1, 1), 'retries': 1 } with DAG( dag_id='spark_daily_job', default_args=default_args, schedule_interval=None, # 手动触发,或者改成你需要的调度周期比如 '@daily' catchup=False ) as dag: # 把你的spark-submit命令转换成模板化的字符串 spark_submit_template = """ EXECUTORS_MEM={{ params.executors_mem }} EXECUTORS_NUM={{ params.executors_num }} \ STARTDAY={{ params.startday }} ENDDAY={{ params.endday }} QUEUE={{ params.queue }} jobname={{ params.jobname }} \ /home/spark/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --verbose --master yarn --deploy-mode client \ --num-executors {{ params.executors_num }} --executor-memory {{ params.executors_mem }} \ --executor-cores 1 --driver-memory 2G \ # 替换成你的Spark作业脚本的实际路径 /path/to/your/spark/script.py """ run_spark_task = BashOperator( task_id='execute_spark_job', bash_command=spark_submit_template, # 这里定义默认参数,触发DAG时可以在UI里修改这些值 params={ 'executors_mem': '4G', 'executors_num': 300, 'startday': '20180401', 'endday': '20180401', 'queue': 'm', 'jobname': 'x' } )
怎么用?
When you trigger the DAG from the Airflow UI, you can click "Trigger DAG w/ config" and override the params values (like changing startday and endday for different date ranges) without touching the code. The Jinja2 templates {{ params.xxx }} will automatically replace with your input values.
If you want a cleaner, more Airflow-native approach, use the SparkSubmitOperator—it wraps spark-submit into a dedicated operator with built-in parameters, so you don't have to write the full command string.
First, make sure you have the Apache Spark provider installed:
pip install apache-airflow-providers-apache-spark
Then here's the DAG example:
from airflow import DAG from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from datetime import datetime default_args = { 'owner': 'your_username', 'start_date': datetime(2024, 1, 1), 'retries': 1 } with DAG( dag_id='spark_daily_job_native', default_args=default_args, schedule_interval=None, catchup=False ) as dag: run_spark_task = SparkSubmitOperator( task_id='execute_spark_job', # 你的Spark作业脚本路径 application='/path/to/your/spark/script.py', # 提前在Airflow UI的「连接」里配置好spark_default(指向YARN集群) conn_id='spark_default', verbose=True, master='yarn', deploy_mode='client', # Spark资源配置 num_executors='300', executor_memory='4G', executor_cores='1', driver_memory='2G', # YARN队列 queue='m', # 传递给Spark脚本的命令行参数(比如STARTDAY/ENDDAY) application_args=['--startday', '{{ params.startday }}', '--endday', '{{ params.endday }}'], # 传递环境变量(比如jobname) env_vars={'jobname': '{{ params.jobname }}'}, # 默认参数,同样可以在触发时修改 params={ 'startday': '20180401', 'endday': '20180401', 'jobname': 'x' } )
注意事项:
- You'll need to configure the
spark_defaultconnection in Airflow UI: Go to Admin > Connections, findspark_default, and set the host to your YARN resource manager address (e.g.,yarn://your-resource-manager:8032). - Your Spark script should be set up to parse command-line arguments (using
argparsein Python, for example) to handle the--startdayand--enddayvalues passed viaapplication_args.
- Dynamic Date Parameters: Instead of hardcoding dates, you can use Airflow's built-in macros like
{{ ds_nodash }}(formatYYYYMMDD) to automatically pass the DAG run date, e.g.,STARTDAY={{ ds_nodash }} ENDDAY={{ ds_nodash }}. - Airflow Variables: For parameters that don't change often (like executor memory/num), you can store them in Airflow Variables (Admin > Variables) and reference them with
{{ var.value.executors_mem }}instead of hardcoding inparams. - Worker Access: Ensure that Airflow worker nodes have access to the Spark installation path and your Spark script (either via shared storage like HDFS or local paths that exist on all workers).
内容的提问来源于stack exchange,提问作者Nick




