Skip to content

Scheduler is crashing when using Assets + the presence of Zombie tasks #56763

@Ferdinanddb

Description

@Ferdinanddb

Apache Airflow version

3.1.0

If "Other Airflow 2/3 version" selected, which one?

No response

What happened?

I am using the Asset-aware scheduling feature to schedule some DAGs. For some reason, I got some tasks which seemed to be zombie task (marked as running in api-server, but they were not running). Those tasks are part of DAGs which are scheduled using an Asset event, and those tasks use a value from the extra dict of the asset, for example: {{ (triggering_asset_events.values() | first | last).extra['ds'] }}.

What happened is that the scheduler kept crashing for a while, repeatedly, with the following error", the logs were mentioning something about the tasks like "inheriting the following tasks from a dead scheduler", then the scheduler was crashing with an error like:

pydantic_core._pydantic_core.ValidationError: 2 validation errors for DagRun
consumed_asset_events.0.asset
  Error extracting attribute: DetachedInstanceError: Parent instance <AssetEvent at 0x7aebf77c1f70> is not bound to a Session; lazy load operation of attribute 'asset' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3) [type=get_attribute_error, input_value=<unprintable AssetEvent object>, input_type=AssetEvent]
    For further information visit https://errors.pydantic.dev/2.11/v/get_attribute_error
consumed_asset_events.0.source_aliases

After a while, everything got back to normal. I destroyed the scheduler for it to be re-created, but I don't think this is the reason of the fix, what I think is that the zombie tasks were cleaned after the task_instance_heartbeat_timeout=1600 got reached.

What you think should happen instead?

The scheduler should not crash in such situation.

How to reproduce

I would say that a way to reproduce that is to enter a situation where a task is a zombie task, and depends on an argument from an Asset (like using {{ (triggering_asset_events.values() | first | last).extra['ds'] }} )

Operating System

using official airflow image 3.1.0-python3.12

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions