如何在Airflow中实现带依赖关系的定时任务调度?
如何在Airflow中实现带依赖关系的定时任务调度?
当然有地道的Airflow原生解法,完全不用搞外部变量那种hack操作~我给你两种常用的方案,都是行业里的常规做法:
方案一:单DAG+任务依赖+时间传感器
这个方案思路很直接,把两个任务放进同一个DAG里,利用任务依赖和时间传感器同时满足时间和依赖要求:
- 把DAG的调度时间设为每天5:30(
schedule_interval="30 5 * * *") - 给T08:30任务加一个时间传感器,等待3小时(从5:30到8:30)
- 用Airflow的任务依赖语法,让T05:30任务成功后才触发时间传感器,传感器完成后再触发T08:30任务
具体代码示例:
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.sensors.time_delta import TimeDeltaSensor default_args = { 'owner': 'airflow', 'retries': 1, 'retry_delay': timedelta(minutes=5) } with DAG( 'daily_dependent_tasks', default_args=default_args, description='单DAG实现带时间依赖的任务调度', schedule_interval='30 5 * * *', start_date=datetime(2024, 1, 1), catchup=False, # 禁止补跑历史任务,避免资源浪费 tags=['daily', 'dependency'] ) as dag: # 5:30执行的任务 task_t0530 = BashOperator( task_id='T0530', bash_command='echo "执行T05:30的业务逻辑"' ) # 等待3小时,到8:30再继续 wait_until_0830 = TimeDeltaSensor( task_id='wait_until_0830', delta=timedelta(hours=3), mode='poke' # 轮询模式检查时间是否达标 ) # 8:30执行的任务,依赖T0530成功且时间到点 task_t0830 = BashOperator( task_id='T0830', bash_command='echo "执行T08:30的业务逻辑"' ) # 设置任务依赖链 task_t0530 >> wait_until_0830 >> task_t0830
注意:如果T05:30任务的执行时间超过3小时,那T08:30会延后执行,这个方案适合对任务执行时长有把握的场景。
方案二:双DAG+ExternalTaskSensor
如果你希望T08:30的任务DAG能严格在8:30触发,而不是依赖T05:30的结束时间来倒计时,那用双DAG的方案更灵活:
- 一个DAG专门负责每天5:30执行T05:30任务
- 另一个DAG每天8:30触发,先通过
ExternalTaskSensor监听当天的T05:30任务是否成功,成功了才继续执行自己的T08:30任务
第一个DAG(仅执行T05:30)
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator default_args = { 'owner': 'airflow', 'retries': 1, 'retry_delay': timedelta(minutes=5) } with DAG( 'dag_t0530', default_args=default_args, schedule_interval='30 5 * * *', start_date=datetime(2024, 1, 1), catchup=False ) as dag: task_t0530 = BashOperator( task_id='T0530', bash_command='echo "执行T05:30的业务逻辑"' )
第二个DAG(执行T08:30,依赖T0530成功)
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator from airflow.sensors.external_task import ExternalTaskSensor default_args = { 'owner': 'airflow', 'retries': 1, 'retry_delay': timedelta(minutes=5) } def get_today_t0530_exec_date(execution_date): # 生成当天5:30的执行时间,用来匹配第一个DAG的任务实例 return execution_date.replace(hour=5, minute=30) with DAG( 'dag_t0830', default_args=default_args, schedule_interval='30 8 * * *', start_date=datetime(2024, 1, 1), catchup=False ) as dag: # 监听当天的T0530任务是否成功 wait_for_t0530 = ExternalTaskSensor( task_id='wait_for_t0530', external_dag_id='dag_t0530', external_task_id='T0530', execution_date_fn=get_today_t0530_exec_date, mode='poke', timeout=3600, # 超时1小时,防止无限等待 poke_interval=60 # 每分钟检查一次任务状态 ) task_t0830 = BashOperator( task_id='T0830', bash_command='echo "执行T08:30的业务逻辑"' ) wait_for_t0530 >> task_t0830
这个方案的好处是T08:30的DAG会准时在8:30触发,然后等着T05:30任务成功,就算T05:30跑慢了,也会一直等(直到超时),不会错过触发窗口。
两种方案都是Airflow原生支持的 idiomatic 解法,完全不用借助外部变量或其他hack手段,你可以根据自己对任务时间的严格程度来选~
备注:内容来源于stack exchange,提问作者J. Mini




