Skip to content

Skipped dynamically mapped taskgroup with a downstream task with trigger_rule = all done leads to state mismatch, fail to populate all mapping metadata error #52730

@chmnata

Description

@chmnata

Apache Airflow version

3.0.2

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When there are skipped dynamically mapped taskgroup with a downstream task with trigger_rule = all done, the downstream tasks does not run and will get stuck in a queued state with state match with no task logs.

Executor LocalExecutor(parallelism=64) reported that the task instance <TaskInstance: task_state_changed_externally.any_task manual__2025-07-02T17:26:56.340268+00:00 [queued]> finished with state failed, but the task instance's state attribute is queued. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally  

What you think should happen instead?

skipped upstream task should be considered "executed" therefore the downstream task with trigger_rule = all done should still run.

How to reproduce

A modified version from this closed issue, which resulted in the same error and were fixed in 3.0.2. However the same error persist with skipped upstream task and trigger_rule = all_done downstream task.

from airflow.sdk import dag, task_group, task
from datetime import datetime
from airflow.exceptions import AirflowSkipException, AirflowFailException

@dag(start_date=datetime(2025, 6, 28), schedule=None, catchup=False)
def task_state_changed_externally():
    
    @task
    def get_nums():
        raise AirflowSkipException("Skipping this one intentionally")

        return [1, 2]

    # creating a task group using the decorator with the dynamic input my_num
    @task_group(group_id="group1")
    def tg1(my_num):
        @task
        def print_num(num):
            raise AirflowSkipException("Skipping this one intentionally")
            return num

        @task
        def add_42(num):
            if num == 2: 
                raise AirflowSkipException("Skipping this one intentionally")
            return num + 42

        print_num(my_num) >> add_42(my_num)
        
    # a task downstream of dynamic task group
    @task(trigger_rule="all_done")
    def any_task():
        print("Test")

    nums = get_nums()
    dynamic_tg = tg1.expand(my_num = nums) #a dynamic task group mapped over an xcom
    dynamic_tg >> any_task() #any task downstream of a mapped task

task_state_changed_externally()

This is the error in api-server.log,

INFO:     127.0.0.1:54114 - "PATCH /airflow/execution/task-instances/0197cc2d-6471-7abe-b0db-f13c0242cdd9/run HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
  + Exception Group Traceback (most recent call last):
  |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_utils.py", line 76, in collapse_excgroups
  |     yield
  |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 181, in __call__
  |     recv_stream.close()
  |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
  |     raise BaseExceptionGroup(
  | exceptiongroup.ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi
    |     result = await app(  # type: ignore[func-returns-value]
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 60, in __call__
    |     return await self.app(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/fastapi/applications.py", line 1054, in __call__
    |     await super().__call__(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/applications.py", line 112, in __call__
    |     await self.middleware_stack(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/errors.py", line 187, in __call__
    |     raise exc
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/errors.py", line 165, in __call__
    |     await self.app(scope, receive, _send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/gzip.py", line 29, in __call__
    |     await responder(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/gzip.py", line 126, in __call__
    |     await super().__call__(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/gzip.py", line 46, in __call__
    |     await self.app(scope, receive, self.send_with_compression)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/cors.py", line 85, in __call__
    |     await self.app(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 183, in __call__
    |     raise app_exc
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 141, in coro
    |     await self.app(scope, receive_or_disconnect, send_no_error)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    |     raise exc
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    |     await app(scope, receive, sender)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py", line 714, in __call__
    |     await self.middleware_stack(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py", line 734, in app
    |     await route.handle(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py", line 460, in handle
    |     await self.app(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/applications.py", line 259, in __call__
    |     await self.__call__(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/fastapi/applications.py", line 1054, in __call__
    |     await super().__call__(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/applications.py", line 112, in __call__
    |     await self.middleware_stack(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/errors.py", line 187, in __call__
    |     raise exc
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/errors.py", line 165, in __call__
    |     await self.app(scope, receive, _send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 181, in __call__
    |     recv_stream.close()
    |   File "/usr/lib64/python3.9/contextlib.py", line 137, in __exit__
    |     self.gen.throw(typ, value, traceback)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_utils.py", line 82, in collapse_excgroups
    |     raise exc
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 178, in __call__
    |     response = await self.dispatch_func(request, call_next)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/middleware.py", line 124, in dispatch
    |     response = await call_next(request)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 156, in call_next
    |     raise app_exc
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/base.py", line 141, in coro
    |     await self.app(scope, receive_or_disconnect, send_no_error)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    |     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    |     raise exc
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    |     await app(scope, receive, sender)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/routing.py", line 93, in __call__
    |     await self.process_request(scope=scope, receive=receive, send=send, routes=routes)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/routing.py", line 131, in process_request
    |     await route.handle(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py", line 288, in handle
    |     await self.app(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py", line 76, in app
    |     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    |     raise exc
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    |     await app(scope, receive, sender)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/routing.py", line 73, in app
    |     response = await f(request)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/fastapi/routing.py", line 301, in app
    |     raw_response = await run_endpoint_function(
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/fastapi/routing.py", line 212, in run_endpoint_function
    |     return await dependant.call(**values)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/structure/versions.py", line 474, in decorator
    |     response = await self._convert_endpoint_response_to_version(
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/structure/versions.py", line 520, in _convert_endpoint_response_to_version
    |     response_or_response_body: Union[FastapiResponse, object] = await run_in_threadpool(
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/starlette/concurrency.py", line 37, in run_in_threadpool
    |     return await anyio.to_thread.run_sync(func)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/anyio/to_thread.py", line 56, in run_sync
    |     return await get_async_backend().run_sync_in_worker_thread(
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/anyio/_backends/_asyncio.py", line 2470, in run_sync_in_worker_thread
    |     return await future
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/anyio/_backends/_asyncio.py", line 967, in run
    |     result = context.run(func, *args)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/cadwyn/schema_generation.py", line 511, in __call__
    |     return self._original_callable(*args, **kwargs)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/airflow/api_fastapi/execution_api/routes/task_instances.py", line 258, in ti_run
    |     upstream_map_indexes = dict(
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/airflow/api_fastapi/execution_api/routes/task_instances.py", line 311, in _get_upstream_map_
indexes
    |     mapped_ti_count = upstream_mapped_group._expand_input.get_total_map_length(
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/airflow/models/expandinput.py", line 110, in get_total_map_length
    |     lengths = self._get_map_lengths(run_id, session=session)
    |   File "/home/airflow/airflow_venv/lib64/python3.9/site-packages/airflow/models/expandinput.py", line 104, in _get_map_lengths
    |     raise NotFullyPopulated(set(self.value).difference(map_lengths))
    | airflow.sdk.definitions._internal.expandinput.NotFullyPopulated: Failed to populate all mapping metadata; missing: 'my_num'
    +------------------------------------

Operating System

Red Hat Enterprise Linux 8.10 (Ootpa)

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

This happens every time consistently. Initially I thought it would be fixed with 3.0.2 with this PR by @Lee-W .

Might also be related to #51320 , that's recently merged by #51701, which will be released in 3.0.3.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions