You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Airflow DAG中模板变量{{ ds }}及宏未被解析的问题求助

Airflow DAG中模板变量{{ ds }}及宏未被解析的问题求助

我现在在开发Airflow DAG,想要动态创建SFTPToS3Operator任务,把SFTP服务器上的文件同步到Amazon S3。服务器上的文件名格式是xxx_date_hours_minutes_seconds.csv,所以需要借助Airflow的日期变量来匹配对应日期的文件,再为每个文件生成单独的同步任务。

我的DAG基本结构如下:

with DAG(...) as dag: 
    init_task = DummyOperator(task_id='dag_start')
    alertPerfOperatorsTask = list_files_to_transfer()
    init_task >> alertPerfOperatorsTask

list_files_to_transfer函数的作用是连接SFTP服务器,遍历目标路径下的文件,为每个文件生成一个SFTPToS3Operator任务,最后返回任务列表。函数代码如下:

def list_files_to_transfer():
    logger = logging.getLogger('airflow.task')
    yesterday_date = '{{ macros.ds_add(ds, -1) }}'
    logger.info("yesterday_date: " + yesterday_date)
    sftp_hook = SFTPHook(ssh_conn_id='SFTP__gas__PointBreakExporter')
    with sftp_hook.get_conn() as sftp_client:
        remote_path = 'out/IQUAL_DEV/'
        # 后续逻辑会用yesterday_date匹配对应日期的文件
        ...
    return alertPerfOperatorsTask  # 生成的SFTPToS3Operator任务列表

但运行后我发现日志里的输出完全不符合预期:

INFO - yesterday_date: {{ macros.ds_add(ds, -1) }}

我本来应该看到类似yesterday_date: 2025-03-19这样的实际日期字符串,结果模板宏根本没有被解析。用这个未解析的字符串去匹配文件,自然找不到任何符合条件的文件,任务完全无法正常执行。

我实在搞不懂为什么{{ macros.ds_add(ds, -1) }}没有被正确解析成实际日期?有没有朋友能帮我看看问题出在哪里?

备注:内容来源于stack exchange,提问作者maxime G

火山引擎 最新活动