Skip to content

Conversation

@dheerajturaga
Copy link
Member

EdgeExecutor was not implementing the revoke_task() method, causing it to raise NotImplementedError when the scheduler attempted to handle tasks stuck in queued state. This meant the task_queued_timeout feature was completely bypassed for EdgeExecutor.

This commit implements revoke_task() to:

  • Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  • Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  • Enable proper task queued timeout handling by the scheduler

Before:
image

After:
image

…t to

  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.
@boring-cyborg boring-cyborg bot added area:providers provider:edge Edge Executor / Worker (AIP-69) / edge3 labels Sep 30, 2025
Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this! So far did not notice and seems we did not have failed tasks in queue so far in our setup :-D

One nit in regards to commit(), else LGTM!

…cutor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
@jscheffl jscheffl merged commit 178eda4 into apache:main Oct 1, 2025
76 checks passed
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 2, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 3, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 4, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 5, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 7, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 8, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 9, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 10, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 11, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 12, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
dabla pushed a commit to dabla/airflow that referenced this pull request Oct 12, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 14, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 15, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 17, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
abdulrahman305 bot pushed a commit to abdulrahman305/airflow that referenced this pull request Oct 19, 2025
…t support (apache#56240)

* EdgeExecutor was not implementing the revoke_task() method, causing it to
  raise NotImplementedError when the scheduler attempted to handle tasks stuck
  in queued state. This meant the task_queued_timeout feature was completely
  bypassed for EdgeExecutor.

  This commit implements revoke_task() to:
  - Remove tasks from executor's internal state (running, queued_tasks, last_reported_state)
  - Delete corresponding EdgeJobModel records to prevent edge workers from picking them up
  - Enable proper task queued timeout handling by the scheduler

  Added tests to verify revoke_task correctly removes tasks from both executor
  state and database, and handles non-existent tasks gracefully.

* Update providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>

---------

Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:edge Edge Executor / Worker (AIP-69) / edge3

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants