Airflow调度模式可行性咨询:每月三次执行DAG的特定时间调度规则是否可行
Hey there! Definitely possible to set up this exact schedule for your Airflow DAG. Below are two practical approaches to implement it:
Approach 1: Daily Schedule + Date Check (Recommended)
Set your DAG to run daily, then use a BranchPythonOperator to only execute your core task on the three target dates. This is flexible and easy to adjust later.
Example Code
from airflow import DAG from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.operators.dummy import DummyOperator from datetime import datetime, timedelta from dateutil.relativedelta import relativedelta default_args = { 'owner': 'airflow', 'start_date': datetime(2024, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=5) } def your_core_task(**context): # Replace this with your actual task logic print("Executing core task on target date!") def check_target_date(**context): today = datetime.strptime(context['ds'], '%Y-%m-%d').date() # Check if today is the 5th of the month if today.day == 5: return 'core_task' # Calculate last day of the current month last_day_of_month = today + relativedelta(day=31) # Check if today is the end of the month if today == last_day_of_month: return 'core_task' # Check if today is 14 days before the end of the month fourteen_days_before_end = last_day_of_month - timedelta(days=14) if today == fourteen_days_before_end: return 'core_task' # If none of the above, skip the core task return 'skip_task' with DAG( 'custom_monthly_schedule', default_args=default_args, schedule_interval='@daily', # Run every day catchup=False ) as dag: start = DummyOperator(task_id='start') check_date = BranchPythonOperator( task_id='check_if_target_date', python_callable=check_target_date, provide_context=True ) core_task = PythonOperator( task_id='core_task', python_callable=your_core_task, provide_context=True ) skip_task = DummyOperator( task_id='skip_task', trigger_rule='none_failed_min_one_success' # Ensure branch completes even if skipped ) start >> check_date >> [core_task, skip_task]
Key Notes
- Timezone: Make sure your Airflow instance is configured with the correct timezone (check
airflow.cfgfordefault_timezone) to avoid date mismatches. - Catchup: Set
catchup=Falseif you don't want backdated runs for past target dates. - Trigger Rule: The
skip_taskusesnone_failed_min_one_successto ensure the DAG doesn't fail when the core task is skipped.
Approach 2: Split into Trigger DAGs
If you prefer separate triggers for each date, create a control DAG that checks each target date and triggers your core DAG when needed. This is useful if you want to track each run separately.
Quick Overview
- Create your core DAG (without a schedule, set
schedule_interval=None). - Create a control DAG with three date-checking branches that use
TriggerDagRunOperatorto start the core DAG when the date matches.
This is similar to Approach 1 but decouples the scheduling logic from the core task.
Either approach will get you the monthly three-run schedule you need. Feel free to tweak the code to fit your specific use case!
内容的提问来源于stack exchange,提问作者Infinite




