如何在Apache Airflow中按每日三个指定时间运行DAG?
实现Airflow DAG每日多不同时间点调度的方案
当然有可行的方法!要让你的Airflow DAG每天在三个完全独立的时间点(比如下午3:50、早上7:15、晚上11:59)运行,根据你的Airflow版本,有几种实用的方案:
方案1:多Cron表达式列表(推荐,Airflow 2.2+)
从Airflow 2.2版本开始,schedule_interval支持传入一个Cron表达式列表,每个表达式对应一个触发时间点,Airflow会自动在每个时间点触发DAG。这是最直接且易维护的方式:
from airflow import DAG from datetime import datetime with DAG( dag_id="daily_multi_time_dag", # 对应:15:50(下午3:50)、07:15(早上7:15)、23:59(晚上11:59) schedule_interval=["50 15 * * *", "15 07 * * *", "59 23 * * *"], start_date=datetime(2024, 1, 1), catchup=False, # 关闭回溯执行,避免历史时间点触发 timezone="Asia/Shanghai" # 根据你的时区调整 ) as dag: # 在这里定义你的任务(比如PythonOperator、BashOperator等) pass
方案2:合并为单个Cron表达式(兼容旧版本)
如果你的Airflow版本低于2.2,不支持列表形式的schedule_interval,可以将三个时间点合并成一个Cron表达式(利用逗号实现多值匹配,注意精确对应时间组合):
# 精确匹配15:50、07:15、23:59三个时间点 schedule_interval="50 15 * * *,15 07 * * *,59 23 * * *"
不过要注意,部分旧版本Airflow对这种多表达式合并的支持可能有限,测试后再投入使用。
方案3:自定义Timetable(灵活适配复杂场景)
如果需要更灵活的调度逻辑(比如动态调整时间点),可以自定义Timetable类来实现精准触发:
from airflow import DAG from airflow.timetables.base import DagRunInfo, DataInterval, Timetable from datetime import datetime, time, timedelta class MultiTimeTimetable(Timetable): def __init__(self, target_times): self.target_times = [time.fromisoformat(t) for t in target_times] def next_dagrun_info( self, last_automated_dagrun: datetime, restriction ) -> DagRunInfo: # 获取下一个符合条件的时间点 next_run = last_automated_dagrun + timedelta(days=1) for target_time in self.target_times: candidate = next_run.replace(hour=target_time.hour, minute=target_time.minute, second=0) if candidate > restriction.earliest: return DagRunInfo( run_date=candidate, data_interval=DataInterval(start=candidate - timedelta(days=1), end=candidate) ) # 如果当天的时间点都过了,取第二天第一个时间点 first_target = self.target_times[0] next_day = next_run + timedelta(days=1) return DagRunInfo( run_date=next_day.replace(hour=first_target.hour, minute=first_target.minute, second=0), data_interval=DataInterval(start=next_day - timedelta(days=1), end=next_day) ) # 使用自定义Timetable with DAG( dag_id="custom_multi_time_dag", timetable=MultiTimeTimetable(target_times=["15:50", "07:15", "23:59"]), start_date=datetime(2024, 1, 1), catchup=False, timezone="Asia/Shanghai" ) as dag: # 任务定义 pass
总结
- 优先选择方案1,实现简单、维护成本低,适合Airflow 2.2及以上版本;
- 旧版本可以尝试方案2,如果合并表达式不生效,再考虑方案3的自定义Timetable;
- 也可以创建一个“调度器DAG”,通过
TriggerDagRunOperator在指定时间触发目标DAG,但这种方式会增加额外的DAG维护量,一般不推荐。
内容的提问来源于stack exchange,提问作者Md Sirajus Salayhin




