-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ECS Executor - add support to adopt orphaned tasks. #36803
ECS Executor - add support to adopt orphaned tasks. #36803
Conversation
""" | ||
Try to adopt running task instances if adopt_task_instances option is set to True. | ||
|
||
These tasks instances should have an ECS process which can be adopted by the unique task ARN. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it expected that there is no difference for the adoption process between a completed/terminated state to a running state of the instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is right. For comparison,. k8s executor filters out "status == successful", but local just passes them all, and celery appears to print the state for each but isn't filtering.
@@ -377,6 +383,9 @@ def attempt_task_runs(self): | |||
else: | |||
task = run_task_response["tasks"][0] | |||
self.active_workers.add_task(task, task_key, queue, cmd, exec_config, attempt_number) | |||
# Add Fargate task ARN to executor event buffer, which gets saved | |||
# in TaskInstance.external_executor_id. | |||
self.event_buffer[task_key] = (State.QUEUED, task.task_arn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have self.success
and self.fail
(e.g. link) which handles the other state changes. These are provided by the base executor. I wonder if we should add a new method for putting a task in queued state 🤔 but this might cause weird issues if a provider which contains an executor is installed alongside an older version of airflow... (also, not required for this PR, just thinking out loud)
airflow/jobs/scheduler_job_runner.py
Outdated
@@ -721,6 +721,7 @@ def _process_executor_events(self, session: Session) -> int: | |||
if state == TaskInstanceState.QUEUED: | |||
ti.external_executor_id = info | |||
self.log.info("Setting external_id for %s to %s", ti, info) | |||
session.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a DB/SQLA expert, but do we want to commit here, our outside of the for loop? Here is going to do a commit for each queued task individually that we got from the query. Which could be many? Interested to hear others weigh in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A valid point, and I don't know the right answer. I'll see what others think. It may be safest to move it outside the loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For posterity, I have started a Slack discussion [here] for this to try to get more attention. I'll drop a comment here with whatever decision is reached there.
883e672
to
47a7e05
Compare
Accidentally did a rebase and ingested the bug fixed by #36958, re-rebased so CI should clear now. EDIT: ... or maybe not... I'll look into it. |
47a7e05
to
06fff7a
Compare
Co-authored-by: o-nikolas
Co-authored-by: Niko Oliveira <onikolas@amazon.com>
2ba8803
to
63b10c5
Compare
Unless anyone here feels strongly otherwise, I think I'm going to close this one and open a new PR. It's been a while and it needs a rebase, plus there are now a few commits which can be squashed, and it looks like it is going to be dependent on a bugfix that I'll submit in a separate PR. No real point leaving this open and then doing a rebase. |
Adds support for "adopt orphaned tasks" to the ECS Executor.
A task can become orphaned, for example, when the scheduler container is re-deployed or terminated or other cases where the SchedulerJob got interrupted.
try_adopt_task_instances
is defined in the BaseExecutor (this PR implements it in the ECS Executor) and is called by the Scheduler here.Based heavily on the work here by Matt Ellis (who I can't tag for some reason) .
closes: #35491
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.