You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在Airflow的S3KeySensor中动态设置bucket_key值

如何在被触发的DAG中用传递的变量设置S3KeySensor的bucket_key

我明白你的需求——通过TriggerDagRunOperator触发DAG时传递变量,然后在被触发的DAG里用这个变量动态配置S3KeySensorbucket_key。其实这在Airflow里有两种常用的实现方式,我给你详细拆解:

方式一:直接用Jinja模板引用触发传递的conf

这是最简单的方法,因为S3KeySensorbucket_key字段本身支持Airflow的Jinja模板渲染,你可以直接从dag_run.conf中提取传递的变量。

1. 在触发端(dag_trigger)传递变量

首先确保你在触发DAG时,通过conf参数把需要的S3路径传过去:

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

trigger_target_dag = TriggerDagRunOperator(
    task_id="trigger_dag_triggered",
    trigger_dag_id="dag_triggered",
    # 这里传入你需要的S3 key变量,比如来自上游任务的输出或者固定值
    conf={"target_s3_key": "data/input/20240520/file.parquet"},
    dag=dag_trigger,
)

2. 在被触发端(dag_triggered)配置S3KeySensor

直接在bucket_key中用Jinja模板引用dag_run.conf里的值,还可以加个默认值防止没有传参的情况:

from airflow.sensors.s3_key_sensor import S3KeySensor

wait_for_s3_file = S3KeySensor(
    task_id="wait_for_target_s3_file",
    bucket_name="your-s3-bucket-name",
    # 用Jinja模板从dag_run的conf里取变量,默认值可选
    bucket_key="{{ dag_run.conf.get('target_s3_key', 'default/path/to/file') }}",
    poke_interval=60,  # 每60秒检查一次
    timeout=3600,  # 超时时间1小时
    dag=dag_triggered,
)

方式二:先处理变量再通过XCom传递(适合复杂逻辑)

如果你的S3 key需要做一些预处理(比如拼接路径、格式转换、从数据库拉取补充信息等),可以先通过PythonOperator处理变量,再把结果推到XCom,最后在S3KeySensor里引用XCom的值。

1. 添加变量处理的Python任务

from airflow.operators.python import PythonOperator

def process_s3_key(**kwargs):
    # 从dag_run的conf中获取原始变量
    raw_s3_key = kwargs["dag_run"].conf.get("target_s3_key")
    # 这里可以加任意自定义逻辑,比如拼接前缀
    processed_key = f"processed/{raw_s3_key}"
    # 把处理后的结果推到XCom
    kwargs["ti"].xcom_push(key="final_s3_key", value=processed_key)

process_key_task = PythonOperator(
    task_id="process_target_s3_key",
    python_callable=process_s3_key,
    # Airflow 2.x+ 可以不用provide_context=True,直接用**kwargs接收上下文
    provide_context=True,
    dag=dag_triggered,
)

2. 配置S3KeySensor引用XCom的值

wait_for_s3_file = S3KeySensor(
    task_id="wait_for_processed_s3_file",
    bucket_name="your-s3-bucket-name",
    # 从XCom中拉取处理后的S3 key
    bucket_key="{{ ti.xcom_pull(task_ids='process_target_s3_key', key='final_s3_key') }}",
    poke_interval=60,
    dag=dag_triggered,
)

# 设置任务依赖
process_key_task >> wait_for_s3_file

注意事项

  • 确保被触发的DAG没有禁用conf接收(默认是允许的,不需要额外配置)。
  • 如果使用Jinja模板,一定要处理dag_run.conf为空的情况(比如加默认值),避免渲染失败。
  • Airflow 2.x版本中,PythonOperatorprovide_context=True可以省略,直接用**kwargs来获取上下文参数。

内容的提问来源于stack exchange,提问作者goRunToStack

火山引擎 最新活动