From 0af5662b4288cc5428c07d97be7963ebcc8205da Mon Sep 17 00:00:00 2001 From: Evgenii Prusov Date: Sat, 19 Jul 2025 14:08:21 +0200 Subject: [PATCH 1/4] Fix double prefix in task_id when unmapping MappedOperator in TaskGroup (#52334) --- .../airflow/sdk/definitions/mappedoperator.py | 14 +++++++++++- .../definitions/test_mappedoperator.py | 22 +++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py index d8b0dc029d538..b8fc87e150378 100644 --- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -736,7 +736,19 @@ def unmap(self, resolve: None | Mapping[str, Any]) -> BaseOperator: is_setup = kwargs.pop("is_setup", False) is_teardown = kwargs.pop("is_teardown", False) on_failure_fail_dagrun = kwargs.pop("on_failure_fail_dagrun", False) - kwargs["task_id"] = self.task_id + + # Fix task_id duplication issue: if task_group is present and has prefix enabled, + # strip the prefix from task_id to avoid double prefixing when BaseOperator.__init__ is called + if self.task_group and self.task_group.prefix_group_id: + # Strip the task group prefix to avoid double prefixing + prefix = f"{self.task_group.group_id}." + if self.task_id.startswith(prefix): + kwargs["task_id"] = self.task_id[len(prefix) :] + else: + kwargs["task_id"] = self.task_id + else: + kwargs["task_id"] = self.task_id + op = self.operator_class(**kwargs, _airflow_from_mapped=True) op.is_setup = is_setup op.is_teardown = is_teardown diff --git a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py index 42d218c2c640a..8cf8e145ee710 100644 --- a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py +++ b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py @@ -732,3 +732,25 @@ def test_setters(setter_name: str, old_value: object, new_value: object) -> None assert getattr(op, setter_name) == old_value setattr(op, setter_name, new_value) assert getattr(op, setter_name) == new_value + + +def test_mapped_operator_in_task_group_no_duplicate_prefix(): + """Test that task_id doesn't get duplicated prefix when unmapping a mapped operator in a task group.""" + from airflow.sdk.definitions.taskgroup import TaskGroup + + with DAG("test-dag") as dag: + with TaskGroup(group_id="tg1") as tg1: + # Create a mapped task within the task group + mapped_task = MockOperator.partial(task_id="mapped_task", arg1="a").expand(arg2=["a", "b", "c"]) + + # Check the mapped operator has correct task_id + assert mapped_task.task_id == "tg1.mapped_task" + assert mapped_task.task_group == tg1 + assert mapped_task.task_group.group_id == "tg1" + + # Simulate what happens during execution - unmap the operator + # unmap expects resolved kwargs + unmapped = mapped_task.unmap({"arg2": "a"}) + + # The unmapped operator should have the same task_id, not a duplicate prefix + assert unmapped.task_id == "tg1.mapped_task", f"Expected 'tg1.mapped_task' but got '{unmapped.task_id}'" From a03516c2d916495e2d45d0364045529d6d407ad9 Mon Sep 17 00:00:00 2001 From: Evgenii Prusov Date: Sat, 19 Jul 2025 14:08:21 +0200 Subject: [PATCH 2/4] Fix task_id duplication in mapped operators within task groups (#52334) --- task-sdk/src/airflow/sdk/bases/operator.py | 4 +++- .../src/airflow/sdk/definitions/mappedoperator.py | 14 +------------- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/task-sdk/src/airflow/sdk/bases/operator.py b/task-sdk/src/airflow/sdk/bases/operator.py index c430f38982348..6ba75b0af84e6 100644 --- a/task-sdk/src/airflow/sdk/bases/operator.py +++ b/task-sdk/src/airflow/sdk/bases/operator.py @@ -1022,7 +1022,9 @@ def __init__( ): # Note: Metaclass handles passing in the DAG/TaskGroup from active context manager, if any - self.task_id = task_group.child_id(task_id) if task_group else task_id + # Only apply task_group prefix if this operator was not created from a mapped operator + # Mapped operators already have the prefix applied during their creation + self.task_id = task_group.child_id(task_id) if task_group and not self.__from_mapped else task_id if not self.__from_mapped and task_group: task_group.add(self) diff --git a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py index b8fc87e150378..d8b0dc029d538 100644 --- a/task-sdk/src/airflow/sdk/definitions/mappedoperator.py +++ b/task-sdk/src/airflow/sdk/definitions/mappedoperator.py @@ -736,19 +736,7 @@ def unmap(self, resolve: None | Mapping[str, Any]) -> BaseOperator: is_setup = kwargs.pop("is_setup", False) is_teardown = kwargs.pop("is_teardown", False) on_failure_fail_dagrun = kwargs.pop("on_failure_fail_dagrun", False) - - # Fix task_id duplication issue: if task_group is present and has prefix enabled, - # strip the prefix from task_id to avoid double prefixing when BaseOperator.__init__ is called - if self.task_group and self.task_group.prefix_group_id: - # Strip the task group prefix to avoid double prefixing - prefix = f"{self.task_group.group_id}." - if self.task_id.startswith(prefix): - kwargs["task_id"] = self.task_id[len(prefix) :] - else: - kwargs["task_id"] = self.task_id - else: - kwargs["task_id"] = self.task_id - + kwargs["task_id"] = self.task_id op = self.operator_class(**kwargs, _airflow_from_mapped=True) op.is_setup = is_setup op.is_teardown = is_teardown From c7ebae5b5fd1e979c0ed865fb05a43a303ca2b0c Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Sun, 20 Jul 2025 12:14:02 +0200 Subject: [PATCH 3/4] style: fix ruff warning --- task-sdk/tests/task_sdk/definitions/test_mappedoperator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py index 8cf8e145ee710..30dec22e2554a 100644 --- a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py +++ b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py @@ -738,7 +738,7 @@ def test_mapped_operator_in_task_group_no_duplicate_prefix(): """Test that task_id doesn't get duplicated prefix when unmapping a mapped operator in a task group.""" from airflow.sdk.definitions.taskgroup import TaskGroup - with DAG("test-dag") as dag: + with DAG("test-dag"): with TaskGroup(group_id="tg1") as tg1: # Create a mapped task within the task group mapped_task = MockOperator.partial(task_id="mapped_task", arg1="a").expand(arg2=["a", "b", "c"]) From d9193ba0311356a2eebe08e0c23cd8d7871c215b Mon Sep 17 00:00:00 2001 From: Evgenii Prusov <114025336+evgenii-prusov@users.noreply.github.com> Date: Mon, 21 Jul 2025 10:33:14 +0200 Subject: [PATCH 4/4] Update task-sdk/src/airflow/sdk/bases/operator.py Co-authored-by: Wei Lee --- task-sdk/src/airflow/sdk/bases/operator.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/task-sdk/src/airflow/sdk/bases/operator.py b/task-sdk/src/airflow/sdk/bases/operator.py index 6ba75b0af84e6..6f0078eef535c 100644 --- a/task-sdk/src/airflow/sdk/bases/operator.py +++ b/task-sdk/src/airflow/sdk/bases/operator.py @@ -1024,9 +1024,11 @@ def __init__( # Only apply task_group prefix if this operator was not created from a mapped operator # Mapped operators already have the prefix applied during their creation - self.task_id = task_group.child_id(task_id) if task_group and not self.__from_mapped else task_id - if not self.__from_mapped and task_group: + if task_group and not self.__from_mapped: + self.task_id = task_group.child_id(task_id) task_group.add(self) + else: + self.task_id = task_id super().__init__() self.task_group = task_group