Airflow含多动态SQLExecuteQueryOperator任务的DAG静默消失问题求助
我之前也碰到过Airflow DAG莫名从UI里消失的情况,结合你提供的代码和描述,大概率是DAG解析阶段的问题,咱们来拆解排查:
1. DAG解析时直接读取大CSV的隐患
你现在是在DAG定义的顶层代码里直接用pd.read_csv读取CSV并循环生成任务,而Airflow的调度器会定期(默认每分钟)重新解析所有DAG文件。如果你的Contacts.csv文件比较大,这个解析过程会占用大量CPU/内存,甚至超时,导致调度器无法正常加载这个DAG,直接把它排除在可用DAG列表之外。
换成DummyOperator后没问题,就是因为DummyOperator的定义逻辑很轻,不会触发这个资源瓶颈。
解决办法:把读取CSV、生成chunk参数的逻辑移到任务运行阶段,比如用Airflow 2.x推荐的动态任务映射(Dynamic Task Mapping),示例代码如下:
from datetime import datetime from airflow import DAG from airflow.decorators import dag, task from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.operators.dummy import DummyOperator import pandas as pd default_args = { "email_on_failure": False, "email_on_retry": False, "retries": 0, "start_date": datetime(2025,1,1), "catchup": False, } @dag( default_args=default_args, dag_display_name="Load all Donations", schedule=None # 把schedule从default_args移到这里,这才是正确的配置位置 ) def opportunity_load(): start = DummyOperator(task_id='start') # 用@task封装CSV读取逻辑,仅在任务运行时执行 @task def generate_contact_chunks(): chunk_size = 100 chunk_params = [] for cont_chunk in pd.read_csv('/opt/airflow/data/Contacts.csv', chunksize=chunk_size): contacts = [] for _, row in cont_chunk.iterrows(): contacts.append({ "salesforce_id": row['Id'], "first_name": row['FirstName'], "last_name": row['LastName'] }) chunk_params.append({"affiliate": "uk", "contacts": contacts}) return chunk_params # 获取所有chunk的参数列表 contact_chunk_params = generate_contact_chunks() # 用partial+expand实现动态任务映射,自动生成对应chunk的SQL任务 load_contacts_task = SQLExecuteQueryOperator.partial( task_id='load_contacts_chunk', sql='load_contacts.sql', # 这里不要用字典,直接传文件路径字符串 conn_id='datawarehouse' ).expand(params=contact_chunk_params) start >> load_contacts_task opportunity_load()
2. SQL参数的小错误
你写的sql={'load_contacts.sql'}是个小问题:SQLExecuteQueryOperator的sql参数接受字符串路径,或者字典格式(用于区分不同环境的SQL,比如{'prod': 'xxx.sql', 'dev': 'yyy.sql'}),单独的字典{'load_contacts.sql'}是无效格式,可能会在模板渲染时触发隐藏错误,导致DAG解析失败。
把它改成sql='load_contacts.sql'就能避免这个问题。
3. 调度器日志排查
如果上面的修改还没解决问题,一定要去看Airflow调度器的日志(Docker环境下可以用docker logs <airflow-scheduler-container-id>查看),里面会明确记录为什么这个DAG没有被加载——比如模板渲染错误、代码抛出异常等,这是最直接的排查依据。
4. 额外的配置修正
你的default_args里放了"schedule": 'None',这是不正确的配置位置,schedule应该是@dag装饰器的参数,我在上面的示例代码里已经修正了这个问题。
备注:内容来源于stack exchange,提问作者JeanRemyDuboc




