You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Airflow含多动态SQLExecuteQueryOperator任务的DAG静默消失问题求助

Airflow含多动态SQLExecuteQueryOperator任务的DAG静默消失问题求助

我之前也碰到过Airflow DAG莫名从UI里消失的情况,结合你提供的代码和描述,大概率是DAG解析阶段的问题,咱们来拆解排查:

1. DAG解析时直接读取大CSV的隐患

你现在是在DAG定义的顶层代码里直接用pd.read_csv读取CSV并循环生成任务,而Airflow的调度器会定期(默认每分钟)重新解析所有DAG文件。如果你的Contacts.csv文件比较大,这个解析过程会占用大量CPU/内存,甚至超时,导致调度器无法正常加载这个DAG,直接把它排除在可用DAG列表之外。

换成DummyOperator后没问题,就是因为DummyOperator的定义逻辑很轻,不会触发这个资源瓶颈。

解决办法:把读取CSV、生成chunk参数的逻辑移到任务运行阶段,比如用Airflow 2.x推荐的动态任务映射(Dynamic Task Mapping),示例代码如下:

from datetime import datetime
from airflow import DAG
from airflow.decorators import dag, task
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.operators.dummy import DummyOperator
import pandas as pd

default_args = { 
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "start_date": datetime(2025,1,1),
    "catchup": False,
}

@dag(
    default_args=default_args,
    dag_display_name="Load all Donations",
    schedule=None  # 把schedule从default_args移到这里,这才是正确的配置位置
)
def opportunity_load():
    start = DummyOperator(task_id='start')

    # 用@task封装CSV读取逻辑,仅在任务运行时执行
    @task
    def generate_contact_chunks():
        chunk_size = 100
        chunk_params = []
        for cont_chunk in pd.read_csv('/opt/airflow/data/Contacts.csv', chunksize=chunk_size):
            contacts = []
            for _, row in cont_chunk.iterrows():
                contacts.append({
                    "salesforce_id": row['Id'],
                    "first_name": row['FirstName'],
                    "last_name": row['LastName']
                })
            chunk_params.append({"affiliate": "uk", "contacts": contacts})
        return chunk_params

    # 获取所有chunk的参数列表
    contact_chunk_params = generate_contact_chunks()

    # 用partial+expand实现动态任务映射,自动生成对应chunk的SQL任务
    load_contacts_task = SQLExecuteQueryOperator.partial(
        task_id='load_contacts_chunk',
        sql='load_contacts.sql',  # 这里不要用字典,直接传文件路径字符串
        conn_id='datawarehouse'
    ).expand(params=contact_chunk_params)

    start >> load_contacts_task

opportunity_load()

2. SQL参数的小错误

你写的sql={'load_contacts.sql'}是个小问题:SQLExecuteQueryOperator的sql参数接受字符串路径,或者字典格式(用于区分不同环境的SQL,比如{'prod': 'xxx.sql', 'dev': 'yyy.sql'}),单独的字典{'load_contacts.sql'}是无效格式,可能会在模板渲染时触发隐藏错误,导致DAG解析失败。

把它改成sql='load_contacts.sql'就能避免这个问题。

3. 调度器日志排查

如果上面的修改还没解决问题,一定要去看Airflow调度器的日志(Docker环境下可以用docker logs <airflow-scheduler-container-id>查看),里面会明确记录为什么这个DAG没有被加载——比如模板渲染错误、代码抛出异常等,这是最直接的排查依据。

4. 额外的配置修正

你的default_args里放了"schedule": 'None',这是不正确的配置位置,schedule应该是@dag装饰器的参数,我在上面的示例代码里已经修正了这个问题。

备注:内容来源于stack exchange,提问作者JeanRemyDuboc

火山引擎 最新活动