You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在Airflow中实现带依赖关系的定时任务调度?

如何在Airflow中实现带依赖关系的定时任务调度?

当然有地道的Airflow原生解法,完全不用搞外部变量那种hack操作~我给你两种常用的方案,都是行业里的常规做法:

方案一:单DAG+任务依赖+时间传感器

这个方案思路很直接,把两个任务放进同一个DAG里,利用任务依赖和时间传感器同时满足时间和依赖要求:

  • 把DAG的调度时间设为每天5:30(schedule_interval="30 5 * * *"
  • 给T08:30任务加一个时间传感器,等待3小时(从5:30到8:30)
  • 用Airflow的任务依赖语法,让T05:30任务成功后才触发时间传感器,传感器完成后再触发T08:30任务

具体代码示例:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.time_delta import TimeDeltaSensor

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'daily_dependent_tasks',
    default_args=default_args,
    description='单DAG实现带时间依赖的任务调度',
    schedule_interval='30 5 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False,  # 禁止补跑历史任务,避免资源浪费
    tags=['daily', 'dependency']
) as dag:
    # 5:30执行的任务
    task_t0530 = BashOperator(
        task_id='T0530',
        bash_command='echo "执行T05:30的业务逻辑"'
    )

    # 等待3小时,到8:30再继续
    wait_until_0830 = TimeDeltaSensor(
        task_id='wait_until_0830',
        delta=timedelta(hours=3),
        mode='poke'  # 轮询模式检查时间是否达标
    )

    # 8:30执行的任务,依赖T0530成功且时间到点
    task_t0830 = BashOperator(
        task_id='T0830',
        bash_command='echo "执行T08:30的业务逻辑"'
    )

    # 设置任务依赖链
    task_t0530 >> wait_until_0830 >> task_t0830

注意:如果T05:30任务的执行时间超过3小时,那T08:30会延后执行,这个方案适合对任务执行时长有把握的场景。

方案二:双DAG+ExternalTaskSensor

如果你希望T08:30的任务DAG能严格在8:30触发,而不是依赖T05:30的结束时间来倒计时,那用双DAG的方案更灵活:

  1. 一个DAG专门负责每天5:30执行T05:30任务
  2. 另一个DAG每天8:30触发,先通过ExternalTaskSensor监听当天的T05:30任务是否成功,成功了才继续执行自己的T08:30任务

第一个DAG(仅执行T05:30)

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'dag_t0530',
    default_args=default_args,
    schedule_interval='30 5 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    task_t0530 = BashOperator(
        task_id='T0530',
        bash_command='echo "执行T05:30的业务逻辑"'
    )

第二个DAG(执行T08:30,依赖T0530成功)

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.external_task import ExternalTaskSensor

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

def get_today_t0530_exec_date(execution_date):
    # 生成当天5:30的执行时间,用来匹配第一个DAG的任务实例
    return execution_date.replace(hour=5, minute=30)

with DAG(
    'dag_t0830',
    default_args=default_args,
    schedule_interval='30 8 * * *',
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    # 监听当天的T0530任务是否成功
    wait_for_t0530 = ExternalTaskSensor(
        task_id='wait_for_t0530',
        external_dag_id='dag_t0530',
        external_task_id='T0530',
        execution_date_fn=get_today_t0530_exec_date,
        mode='poke',
        timeout=3600,  # 超时1小时,防止无限等待
        poke_interval=60  # 每分钟检查一次任务状态
    )

    task_t0830 = BashOperator(
        task_id='T0830',
        bash_command='echo "执行T08:30的业务逻辑"'
    )

    wait_for_t0530 >> task_t0830

这个方案的好处是T08:30的DAG会准时在8:30触发,然后等着T05:30任务成功,就算T05:30跑慢了,也会一直等(直到超时),不会错过触发窗口。

两种方案都是Airflow原生支持的 idiomatic 解法,完全不用借助外部变量或其他hack手段,你可以根据自己对任务时间的严格程度来选~

备注:内容来源于stack exchange,提问作者J. Mini

火山引擎 最新活动