Skip to content

Conversation

@dheerajturaga
Copy link
Member

When clearing task instances with include_downstream or include_upstream enabled,
the clear operation would fail with a NotMapped exception if any task was specified
with a map_index tuple but wasn't actually a mapped task. This happened because the
code assumed all tasks specified as tuples were mapped tasks and called
get_mapped_ti_count() on them without handling the NotMapped exception.

The fix catches the NotMapped exception and treats such tasks as normal tasks instead.

related to: #57758

Error seen in api-server when I try to clear task instances

INFO:     172.18.0.1:48544 - "POST /api/v2/dags/example_bash_decorator/clearTaskInstances HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/python/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "/usr/python/lib/python3.10/site-packages/fastapi/applications.py", line 1134, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/applications.py", line 107, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 29, in __call__
    await responder(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 130, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 46, in __call__
    await self.app(scope, receive, self.send_with_compression)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/cors.py", line 93, in __call__
    await self.simple_response(scope, receive, send, request_headers=headers)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/cors.py", line 144, in simple_response
    await self.app(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 191, in __call__
    with recv_stream, send_stream, collapse_excgroups():
  File "/usr/python/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/python/lib/python3.10/site-packages/starlette/_utils.py", line 85, in collapse_excgroups
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 193, in __call__
    response = await self.dispatch_func(request, call_next)
  File "/opt/airflow/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py", line 51, in dispatch
    response = await call_next(request)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 168, in call_next
    raise app_exc from app_exc.__cause__ or app_exc.__context__
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 144, in coro
    await self.app(scope, receive_or_disconnect, send_no_error)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 63, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/usr/python/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 716, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 736, in app
    await route.handle(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 290, in handle
    await self.app(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 125, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 111, in app
    response = await f(request)
  File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 391, in app
    raw_response = await run_endpoint_function(
  File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 292, in run_endpoint_function
    return await run_in_threadpool(dependant.call, **values)
  File "/usr/python/lib/python3.10/site-packages/starlette/concurrency.py", line 32, in run_in_threadpool
    return await anyio.to_thread.run_sync(func)
  File "/usr/python/lib/python3.10/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
  File "/usr/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 2485, in run_sync_in_worker_thread
    return await future
  File "/usr/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 976, in run
    result = context.run(func, *args)
  File "/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py", line 800, in post_clear_task_instances
    _collect_relatives(dag_run_id, "downstream")
  File "/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py", line 771, in _collect_relatives
    relevant_relatives = find_relevant_relatives(
  File "/opt/airflow/airflow-core/src/airflow/models/taskinstance.py", line 2346, in find_relevant_relatives
    _visit_relevant_relatives_for_mapped(mapped_tasks)
  File "/opt/airflow/airflow-core/src/airflow/models/taskinstance.py", line 2315, in _visit_relevant_relatives_for_mapped
    ti_count = get_mapped_ti_count(task, run_id, session=session)
  File "/usr/python/lib/python3.10/functools.py", line 889, in wrapper
    return dispatch(args[0].__class__)(*args, **kw)
  File "/opt/airflow/airflow-core/src/airflow/models/mappedoperator.py", line 517, in _
    raise NotMapped()
airflow.exceptions.NotMapped
INFO:     172.18.0.1:48556 - "POST /api/v2/dags/example_bash_decorator/clearTaskInstances HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/python/lib/python3.10/site-packages/uvicorn/protocols/http/httptools_impl.py", line 409, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "/usr/python/lib/python3.10/site-packages/fastapi/applications.py", line 1134, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/applications.py", line 107, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 29, in __call__
    await responder(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 130, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/gzip.py", line 46, in __call__
    await self.app(scope, receive, self.send_with_compression)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/cors.py", line 93, in __call__
    await self.simple_response(scope, receive, send, request_headers=headers)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/cors.py", line 144, in simple_response
    await self.app(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 191, in __call__
    with recv_stream, send_stream, collapse_excgroups():
  File "/usr/python/lib/python3.10/contextlib.py", line 153, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/usr/python/lib/python3.10/site-packages/starlette/_utils.py", line 85, in collapse_excgroups
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 193, in __call__
    response = await self.dispatch_func(request, call_next)
  File "/opt/airflow/airflow-core/src/airflow/api_fastapi/auth/middlewares/refresh_token.py", line 51, in dispatch
    response = await call_next(request)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 168, in call_next
    raise app_exc from app_exc.__cause__ or app_exc.__context__
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/base.py", line 144, in coro
    await self.app(scope, receive_or_disconnect, send_no_error)
  File "/usr/python/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 63, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/usr/python/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 716, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 736, in app
    await route.handle(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/routing.py", line 290, in handle
    await self.app(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 125, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    raise exc
  File "/usr/python/lib/python3.10/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
    await app(scope, receive, sender)
  File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 111, in app
    response = await f(request)
  File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 391, in app
    raw_response = await run_endpoint_function(
  File "/usr/python/lib/python3.10/site-packages/fastapi/routing.py", line 292, in run_endpoint_function
    return await run_in_threadpool(dependant.call, **values)
  File "/usr/python/lib/python3.10/site-packages/starlette/concurrency.py", line 32, in run_in_threadpool
    return await anyio.to_thread.run_sync(func)
  File "/usr/python/lib/python3.10/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
  File "/usr/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 2485, in run_sync_in_worker_thread
    return await future
  File "/usr/python/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 976, in run
    result = context.run(func, *args)
  File "/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py", line 800, in post_clear_task_instances
    _collect_relatives(dag_run_id, "downstream")
  File "/opt/airflow/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py", line 771, in _collect_relatives
    relevant_relatives = find_relevant_relatives(
  File "/opt/airflow/airflow-core/src/airflow/models/taskinstance.py", line 2346, in find_relevant_relatives
    _visit_relevant_relatives_for_mapped(mapped_tasks)
  File "/opt/airflow/airflow-core/src/airflow/models/taskinstance.py", line 2315, in _visit_relevant_relatives_for_mapped
    ti_count = get_mapped_ti_count(task, run_id, session=session)
  File "/usr/python/lib/python3.10/functools.py", line 889, in wrapper
    return dispatch(args[0].__class__)(*args, **kw)
  File "/opt/airflow/airflow-core/src/airflow/models/mappedoperator.py", line 517, in _
    raise NotMapped()
airflow.exceptions.NotMapped

@potiuk
Copy link
Member

potiuk commented Dec 2, 2025

cc: @dheerajturaga -> can you rebase pls after latest checks and maybe add unit tests ? cc: @ephraimbuddy -> this is likely a good candidate to get to 3.1.4 if it will make it

   downstream/upstream

   When clearing task instances with include_downstream or include_upstream enabled,
   the clear operation would fail with a NotMapped exception if any task was specified
   with a map_index tuple but wasn't actually a mapped task. This happened because the
   code assumed all tasks specified as tuples were mapped tasks and called
   get_mapped_ti_count() on them without handling the NotMapped exception.

   The fix catches the NotMapped exception and treats such tasks as normal tasks instead.
@dheerajturaga dheerajturaga force-pushed the bugfix/clear-task-instances branch from 57a0ef1 to da75873 Compare December 3, 2025 03:19
@potiuk potiuk added the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label Dec 3, 2025
@potiuk potiuk merged commit 5aa90c0 into apache:main Dec 3, 2025
66 checks passed
@github-actions
Copy link

github-actions bot commented Dec 3, 2025

Backport failed to create: v3-1-test. View the failure log Run details

Status Branch Result
v3-1-test Commit Link

You can attempt to backport this manually by running:

cherry_picker 5aa90c0 v3-1-test

This should apply the commit to the v3-1-test branch and leave the commit in conflict state marking
the files that need manual conflict resolution.

After you have resolved the conflicts, you can continue the backport process by running:

cherry_picker --continue

itayweb pushed a commit to itayweb/airflow that referenced this pull request Dec 6, 2025
…upstream (apache#58922)

* Fix NotMapped exception when clearing task instances with
   downstream/upstream

   When clearing task instances with include_downstream or include_upstream enabled,
   the clear operation would fail with a NotMapped exception if any task was specified
   with a map_index tuple but wasn't actually a mapped task. This happened because the
   code assumed all tasks specified as tuples were mapped tasks and called
   get_mapped_ti_count() on them without handling the NotMapped exception.

   The fix catches the NotMapped exception and treats such tasks as normal tasks instead.

* Add unit test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants