Skip to content

Conversation

@dabla
Copy link
Contributor

@dabla dabla commented Jun 4, 2025

AIP-88 allows triggerers to yield and have the scheduler/process handle multiple events in a single trigger run, enabling true streamable dynamic task mapping for async operators (e.g., HttpOperator/MSGraphAsyncOperator). This reduces repeated context switching and "ping-pong" between scheduler → worker → triggerer, enabling efficient in-trigger pagination and lazy task expansion.

To be able to complete this PR, I first need the the Fix rendering of template fields with start from trigger PR to be merged, once that one is merged I can finish this one as it heavily depends on the start_from_trigger mechanism which was broken and temporarily disabled.

Current implementation:
image

New improved mechanism:
image

https://www.youtube.com/watch?v=mWUAk-Gwmws


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

dheerajturaga and others added 22 commits October 12, 2025 08:02
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
* Add async support for Amazon SQS Notifier

* fix bad test

MESSAGE_ID is not the same is MESSAGE_BODY, that assertion was failing as it should have.  Removed the bad assertion
…56241)

* Prepare fab and amazon providers to release (September 2025)
* UI: Add Expand/Collapse all to XComs page

* Restored few changes

* Fixing lint issue

* Made suggested Changes
…pache#56266)

These imports ultimately aren't cheap - it results in ~80mb of memory
use, so lets skip it unless the plugin is enabled.
…of wait_for_completion (apache#56158)

* refactor: deprecate wait_policy in EmrCreateJobFlowOperator in favor of wait_for_completion

* added unit test for refactor

* resolved copilot comments

* resolved copilot comments

* fixed failing test

* fixed: refactor of wait_policy

* ensured backward compatibility

* removed "self.wait_policy = wait_policy"
Ensure removed/historical tasks from selected runs are visible in
Grid even if they no longer exist in the current DAG version.

We now:
- Include synthetic leaf nodes for task_ids present in TIs but
missing from the serialized DAG in both grid/structure and grid/ti_summaries.
- Aggregate TI states for these synthetic nodes

Add tests covering structure and TI summaries for removed tasks.
Co-authored-by: Brent Bovenzi <brent@astronomer.io>
As the Edeg API server is currently implemented we need to "pretend" to be  a
specific Airflow and Edge provider version. These default to the currently
released versions, but can be changed via env vars to work elsewhere.

This works enough to run tasks, but there might need to be some changes to the
Edge API to support non-python clients (for example, working out the
versioning strategy to make it long-term supportible and not need the Airflow
and Edge Provider version to match 100%).

A chunk of the changes here are to make the config and global setup "more well
structured" -- so that they are suitable to be easily called from multiple
workers (Celery and Go, useful if we don't end up keeping the Celery worker)
* modify test_ftp

Signed-off-by: Xch1 <qchwan@gmail.com>

* modify test_sql

Signed-off-by: Xch1 <qchwan@gmail.com>

* modify test_exasol

Signed-off-by: Xch1 <qchwan@gmail.com>

* modify test_es_task_handler

Signed-off-by: Xch1 <qchwan@gmail.com>

* modify test_edge_executor

Signed-off-by: Xch1 <qchwan@gmail.com>

* fix test_edge_executor match

Signed-off-by: Xch1 <qchwan@gmail.com>

* fix test_edge_executor match

Signed-off-by: Xch1 <qchwan@gmail.com>

* fix test_edge_executor match

Signed-off-by: Xch1 <qchwan@gmail.com>

---------

Signed-off-by: Xch1 <qchwan@gmail.com>
…che#56308)

EKS test sometimes fails with exceptions like:
`botocore.errorfactory.ResourceInUseException: An error occurred (ResourceInUseException) when calling the DeleteFargateProfile operation: Cannot Delete Fargate Profile [profile name] because cluster [cluster name] currently has an update in progress`

This is (a potentially temporary) fix for that error.
dabla added 30 commits December 27, 2025 16:14
# Conflicts:
#	airflow-core/src/airflow/models/taskinstance.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.