You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Airflow调度模式可行性咨询:每月三次执行DAG的特定时间调度规则是否可行

Custom Monthly Schedule (3 Runs: 5th, 14 Days Before End, End of Month) in Airflow

Hey there! Definitely possible to set up this exact schedule for your Airflow DAG. Below are two practical approaches to implement it:

Set your DAG to run daily, then use a BranchPythonOperator to only execute your core task on the three target dates. This is flexible and easy to adjust later.

Example Code

from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

def your_core_task(**context):
    # Replace this with your actual task logic
    print("Executing core task on target date!")

def check_target_date(**context):
    today = datetime.strptime(context['ds'], '%Y-%m-%d').date()
    
    # Check if today is the 5th of the month
    if today.day == 5:
        return 'core_task'
    
    # Calculate last day of the current month
    last_day_of_month = today + relativedelta(day=31)
    
    # Check if today is the end of the month
    if today == last_day_of_month:
        return 'core_task'
    
    # Check if today is 14 days before the end of the month
    fourteen_days_before_end = last_day_of_month - timedelta(days=14)
    if today == fourteen_days_before_end:
        return 'core_task'
    
    # If none of the above, skip the core task
    return 'skip_task'

with DAG(
    'custom_monthly_schedule',
    default_args=default_args,
    schedule_interval='@daily',  # Run every day
    catchup=False
) as dag:
    start = DummyOperator(task_id='start')
    
    check_date = BranchPythonOperator(
        task_id='check_if_target_date',
        python_callable=check_target_date,
        provide_context=True
    )
    
    core_task = PythonOperator(
        task_id='core_task',
        python_callable=your_core_task,
        provide_context=True
    )
    
    skip_task = DummyOperator(
        task_id='skip_task',
        trigger_rule='none_failed_min_one_success'  # Ensure branch completes even if skipped
    )
    
    start >> check_date >> [core_task, skip_task]

Key Notes

  • Timezone: Make sure your Airflow instance is configured with the correct timezone (check airflow.cfg for default_timezone) to avoid date mismatches.
  • Catchup: Set catchup=False if you don't want backdated runs for past target dates.
  • Trigger Rule: The skip_task uses none_failed_min_one_success to ensure the DAG doesn't fail when the core task is skipped.

Approach 2: Split into Trigger DAGs

If you prefer separate triggers for each date, create a control DAG that checks each target date and triggers your core DAG when needed. This is useful if you want to track each run separately.

Quick Overview

  1. Create your core DAG (without a schedule, set schedule_interval=None).
  2. Create a control DAG with three date-checking branches that use TriggerDagRunOperator to start the core DAG when the date matches.

This is similar to Approach 1 but decouples the scheduling logic from the core task.

Either approach will get you the monthly three-run schedule you need. Feel free to tweak the code to fit your specific use case!

内容的提问来源于stack exchange,提问作者Infinite

火山引擎 最新活动