You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Airflow中如何结合TaskGroup实现Dynamic Task Mapping(动态任务映射)

Combining Dynamic Task Mapping with TaskGroups in Airflow

Great question! You absolutely can pair Dynamic Task Mapping (DTM) with TaskGroups to create grouped task sets for each item in your list—each mapped TaskGroup will get its own value from your input list, and every task inside the group can reference that value. Let's walk through fixing your code and breaking down how this works.

First, let's address the issues in your original code:

  • You used list as the mapped parameter name—this is a Python built-in type, so it's better to use a custom name like item to avoid conflicts.
  • Your two tasks inside the TaskGroup have identical task_ids ("print_output")—Airflow requires unique task IDs within a DAG/TaskGroup.
  • You tried to reference the mapped value directly with list, but you need to use either Jinja templating (for traditional operators) or direct parameter passing (for the Task Flow API) to access the mapped item.

Fixed Code (Using PythonOperator)

Here's a corrected version of your original code that works as expected:

from airflow import DAG, XComArg
from datetime import datetime
from airflow.utils.task_group import TaskGroup
from airflow.operators.python import PythonOperator

with DAG(
    'dtm_tg_test',
    schedule_interval=None,
    start_date=datetime(2022, 1, 1),
    catchup=False  # Add this to avoid backfilling past runs
) as dag:
    def getList():
        return ["Hello", "World"]
    
    def printText(text):
        print(text)

    get_list = PythonOperator(
        task_id="get_list",
        python_callable=getList
    )

    # Use a custom parameter name (item) instead of the built-in 'list'
    with TaskGroup.partial(
        group_id="task_group"
    ).expand(item=XComArg(get_list)) as task_group:
        # Unique task IDs for each task in the group
        print_text = PythonOperator(
            task_id="print_output_first",
            python_callable=printText,
            # Reference the mapped item using Jinja template syntax
            op_kwargs={"text": "{{ item }}"}
        )
        print_again = PythonOperator(
            task_id="print_output_second",
            python_callable=printText,
            op_kwargs={"text": "{{ item }}"}
        )
        # Set task dependency inside the group
        print_text >> print_again

    # Connect the upstream task to all mapped TaskGroups
    get_list >> task_group

Key Details for This Implementation:

  • The expand(item=XComArg(get_list)) line tells Airflow to create one TaskGroup instance for each item in the list returned by get_list.
  • Inside the group, we use {{ item }} in the op_kwargs to reference the specific value assigned to that TaskGroup instance.
  • Each task in the group has a unique ID, so Airflow doesn't throw a duplicate task error.

A Cleaner Version (Using Task Flow API)

If you're using Airflow 2.2+, the Task Flow API makes this even more intuitive with decorators. Here's a streamlined version:

from airflow import DAG, XComArg
from datetime import datetime
from airflow.decorators import task, task_group
from airflow.operators.python import PythonOperator

with DAG(
    'dtm_tg_test_taskflow',
    schedule_interval=None,
    start_date=datetime(2022, 1, 1),
    catchup=False
) as dag:
    def getList():
        return ["Hello", "World"]

    get_list = PythonOperator(
        task_id="get_list",
        python_callable=getList
    )

    # Define a reusable TaskGroup that accepts an 'item' parameter
    @task_group
    def process_item(item):
        @task
        def print_text(text):
            print(text)
        
        @task
        def print_again(text):
            print(text)
        
        # Assign tasks and set dependencies
        first_print = print_text(item)
        second_print = print_again(item)
        first_print >> second_print

    # Map the TaskGroup over the list from get_list
    mapped_task_groups = process_item.expand(item=XComArg(get_list))

    get_list >> mapped_task_groups

Why This Works Better:

  • The @task_group decorator lets you define a reusable group that directly accepts the item parameter.
  • You don't need Jinja templating here—you can pass the item directly to your @task functions, making the code easier to read and maintain.
  • The expand() method is called directly on the TaskGroup object, which feels more natural than using partial().

Final Notes:

  • When mapping TaskGroups, always use a unique parameter name (avoid Python built-ins like list, str, etc.).
  • Ensure all task IDs within a TaskGroup are unique—Airflow will throw an error if you have duplicates.
  • The Task Flow API is recommended for newer Airflow versions because it simplifies parameter passing and task dependency management.

内容的提问来源于stack exchange,提问作者Shahid Thaika

火山引擎 最新活动