diff --git a/task-sdk/src/airflow/sdk/bases/operator.py b/task-sdk/src/airflow/sdk/bases/operator.py index c430f38982348..6f0078eef535c 100644 --- a/task-sdk/src/airflow/sdk/bases/operator.py +++ b/task-sdk/src/airflow/sdk/bases/operator.py @@ -1022,9 +1022,13 @@ def __init__( ): # Note: Metaclass handles passing in the DAG/TaskGroup from active context manager, if any - self.task_id = task_group.child_id(task_id) if task_group else task_id - if not self.__from_mapped and task_group: + # Only apply task_group prefix if this operator was not created from a mapped operator + # Mapped operators already have the prefix applied during their creation + if task_group and not self.__from_mapped: + self.task_id = task_group.child_id(task_id) task_group.add(self) + else: + self.task_id = task_id super().__init__() self.task_group = task_group diff --git a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py index 42d218c2c640a..30dec22e2554a 100644 --- a/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py +++ b/task-sdk/tests/task_sdk/definitions/test_mappedoperator.py @@ -732,3 +732,25 @@ def test_setters(setter_name: str, old_value: object, new_value: object) -> None assert getattr(op, setter_name) == old_value setattr(op, setter_name, new_value) assert getattr(op, setter_name) == new_value + + +def test_mapped_operator_in_task_group_no_duplicate_prefix(): + """Test that task_id doesn't get duplicated prefix when unmapping a mapped operator in a task group.""" + from airflow.sdk.definitions.taskgroup import TaskGroup + + with DAG("test-dag"): + with TaskGroup(group_id="tg1") as tg1: + # Create a mapped task within the task group + mapped_task = MockOperator.partial(task_id="mapped_task", arg1="a").expand(arg2=["a", "b", "c"]) + + # Check the mapped operator has correct task_id + assert mapped_task.task_id == "tg1.mapped_task" + assert mapped_task.task_group == tg1 + assert mapped_task.task_group.group_id == "tg1" + + # Simulate what happens during execution - unmap the operator + # unmap expects resolved kwargs + unmapped = mapped_task.unmap({"arg2": "a"}) + + # The unmapped operator should have the same task_id, not a duplicate prefix + assert unmapped.task_id == "tg1.mapped_task", f"Expected 'tg1.mapped_task' but got '{unmapped.task_id}'"