You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在Apache Airflow中按每日三个指定时间运行DAG?

实现Airflow DAG每日多不同时间点调度的方案

当然有可行的方法!要让你的Airflow DAG每天在三个完全独立的时间点(比如下午3:50、早上7:15、晚上11:59)运行,根据你的Airflow版本,有几种实用的方案:

方案1:多Cron表达式列表(推荐,Airflow 2.2+)

从Airflow 2.2版本开始,schedule_interval支持传入一个Cron表达式列表,每个表达式对应一个触发时间点,Airflow会自动在每个时间点触发DAG。这是最直接且易维护的方式:

from airflow import DAG
from datetime import datetime

with DAG(
    dag_id="daily_multi_time_dag",
    # 对应:15:50(下午3:50)、07:15(早上7:15)、23:59(晚上11:59)
    schedule_interval=["50 15 * * *", "15 07 * * *", "59 23 * * *"],
    start_date=datetime(2024, 1, 1),
    catchup=False,  # 关闭回溯执行,避免历史时间点触发
    timezone="Asia/Shanghai"  # 根据你的时区调整
) as dag:
    # 在这里定义你的任务(比如PythonOperator、BashOperator等)
    pass

方案2:合并为单个Cron表达式(兼容旧版本)

如果你的Airflow版本低于2.2,不支持列表形式的schedule_interval,可以将三个时间点合并成一个Cron表达式(利用逗号实现多值匹配,注意精确对应时间组合):

# 精确匹配15:50、07:15、23:59三个时间点
schedule_interval="50 15 * * *,15 07 * * *,59 23 * * *"

不过要注意,部分旧版本Airflow对这种多表达式合并的支持可能有限,测试后再投入使用。

方案3:自定义Timetable(灵活适配复杂场景)

如果需要更灵活的调度逻辑(比如动态调整时间点),可以自定义Timetable类来实现精准触发:

from airflow import DAG
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from datetime import datetime, time, timedelta

class MultiTimeTimetable(Timetable):
    def __init__(self, target_times):
        self.target_times = [time.fromisoformat(t) for t in target_times]

    def next_dagrun_info(
        self,
        last_automated_dagrun: datetime,
        restriction
    ) -> DagRunInfo:
        # 获取下一个符合条件的时间点
        next_run = last_automated_dagrun + timedelta(days=1)
        for target_time in self.target_times:
            candidate = next_run.replace(hour=target_time.hour, minute=target_time.minute, second=0)
            if candidate > restriction.earliest:
                return DagRunInfo(
                    run_date=candidate,
                    data_interval=DataInterval(start=candidate - timedelta(days=1), end=candidate)
                )
        # 如果当天的时间点都过了,取第二天第一个时间点
        first_target = self.target_times[0]
        next_day = next_run + timedelta(days=1)
        return DagRunInfo(
            run_date=next_day.replace(hour=first_target.hour, minute=first_target.minute, second=0),
            data_interval=DataInterval(start=next_day - timedelta(days=1), end=next_day)
        )

# 使用自定义Timetable
with DAG(
    dag_id="custom_multi_time_dag",
    timetable=MultiTimeTimetable(target_times=["15:50", "07:15", "23:59"]),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    timezone="Asia/Shanghai"
) as dag:
    # 任务定义
    pass

总结

  • 优先选择方案1,实现简单、维护成本低,适合Airflow 2.2及以上版本;
  • 旧版本可以尝试方案2,如果合并表达式不生效,再考虑方案3的自定义Timetable;
  • 也可以创建一个“调度器DAG”,通过TriggerDagRunOperator在指定时间触发目标DAG,但这种方式会增加额外的DAG维护量,一般不推荐。

内容的提问来源于stack exchange,提问作者Md Sirajus Salayhin

火山引擎 最新活动