Airflow中如何结合TaskGroup实现Dynamic Task Mapping(动态任务映射)
Combining Dynamic Task Mapping with TaskGroups in Airflow
Great question! You absolutely can pair Dynamic Task Mapping (DTM) with TaskGroups to create grouped task sets for each item in your list—each mapped TaskGroup will get its own value from your input list, and every task inside the group can reference that value. Let's walk through fixing your code and breaking down how this works.
First, let's address the issues in your original code:
- You used
listas the mapped parameter name—this is a Python built-in type, so it's better to use a custom name likeitemto avoid conflicts. - Your two tasks inside the TaskGroup have identical
task_ids ("print_output")—Airflow requires unique task IDs within a DAG/TaskGroup. - You tried to reference the mapped value directly with
list, but you need to use either Jinja templating (for traditional operators) or direct parameter passing (for the Task Flow API) to access the mapped item.
Fixed Code (Using PythonOperator)
Here's a corrected version of your original code that works as expected:
from airflow import DAG, XComArg from datetime import datetime from airflow.utils.task_group import TaskGroup from airflow.operators.python import PythonOperator with DAG( 'dtm_tg_test', schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False # Add this to avoid backfilling past runs ) as dag: def getList(): return ["Hello", "World"] def printText(text): print(text) get_list = PythonOperator( task_id="get_list", python_callable=getList ) # Use a custom parameter name (item) instead of the built-in 'list' with TaskGroup.partial( group_id="task_group" ).expand(item=XComArg(get_list)) as task_group: # Unique task IDs for each task in the group print_text = PythonOperator( task_id="print_output_first", python_callable=printText, # Reference the mapped item using Jinja template syntax op_kwargs={"text": "{{ item }}"} ) print_again = PythonOperator( task_id="print_output_second", python_callable=printText, op_kwargs={"text": "{{ item }}"} ) # Set task dependency inside the group print_text >> print_again # Connect the upstream task to all mapped TaskGroups get_list >> task_group
Key Details for This Implementation:
- The
expand(item=XComArg(get_list))line tells Airflow to create one TaskGroup instance for each item in the list returned byget_list. - Inside the group, we use
{{ item }}in theop_kwargsto reference the specific value assigned to that TaskGroup instance. - Each task in the group has a unique ID, so Airflow doesn't throw a duplicate task error.
A Cleaner Version (Using Task Flow API)
If you're using Airflow 2.2+, the Task Flow API makes this even more intuitive with decorators. Here's a streamlined version:
from airflow import DAG, XComArg from datetime import datetime from airflow.decorators import task, task_group from airflow.operators.python import PythonOperator with DAG( 'dtm_tg_test_taskflow', schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False ) as dag: def getList(): return ["Hello", "World"] get_list = PythonOperator( task_id="get_list", python_callable=getList ) # Define a reusable TaskGroup that accepts an 'item' parameter @task_group def process_item(item): @task def print_text(text): print(text) @task def print_again(text): print(text) # Assign tasks and set dependencies first_print = print_text(item) second_print = print_again(item) first_print >> second_print # Map the TaskGroup over the list from get_list mapped_task_groups = process_item.expand(item=XComArg(get_list)) get_list >> mapped_task_groups
Why This Works Better:
- The
@task_groupdecorator lets you define a reusable group that directly accepts theitemparameter. - You don't need Jinja templating here—you can pass the
itemdirectly to your @task functions, making the code easier to read and maintain. - The
expand()method is called directly on the TaskGroup object, which feels more natural than usingpartial().
Final Notes:
- When mapping TaskGroups, always use a unique parameter name (avoid Python built-ins like
list,str, etc.). - Ensure all task IDs within a TaskGroup are unique—Airflow will throw an error if you have duplicates.
- The Task Flow API is recommended for newer Airflow versions because it simplifies parameter passing and task dependency management.
内容的提问来源于stack exchange,提问作者Shahid Thaika




