在composer-1.17.1-airflow-2.1.2中,XComs在DAG之间共享数据非常重要。如果您的XComs在Xcom部分中不可见,请确保它们在DAG任务之间正确传递。检查您的代码中是否正确使用了xcom_push和xcom_pull。也可以尝试在设置中使用AIRFLOW__CORE__LOAD_EXAMPLES并重新启动webserver和scheduler。这应该解决此问题。例如:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'my_dag',
default_args=default_args,
description='A simple DAG',
schedule_interval=timedelta(days=1),
)
def create_message():
message = 'Hello, world'
return message
def print_message(**context):
message = context['task_instance'].xcom_pull(task_ids='task1')
print(message)
task1 = PythonOperator(
task_id='task1',
python_callable=create_message,
dag=dag,
)
task2 = PythonOperator(
task_id='task2',
python_callable=print_message,
provide_context=True,
dag=dag,
)
task1 >> task2