-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix critical CeleryKubernetesExecutor bug #13247
Conversation
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
a63eaa2
to
f5c7010
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
f5c7010
to
62b2886
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
62b2886
to
a642bf7
Compare
a642bf7
to
b2270be
Compare
b2270be
to
8c34465
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
8c34465
to
3793d2c
Compare
db4540c
to
0ef3c92
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
0ef3c92
to
8e95086
Compare
The Workflow run is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason. |
51d8f90
to
7bb8cb5
Compare
airflow/cli/cli_parser.py
Outdated
@@ -30,7 +30,6 @@ | |||
from airflow.cli.commands.legacy_commands import check_legacy_command | |||
from airflow.configuration import conf | |||
from airflow.exceptions import AirflowException | |||
from airflow.executors.executor_constants import CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you stop using these constants?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a particularly good reason, will add back
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for context, in a previous change, when i thought that CeleryKubernetesExecutor should be consistently set across the cluster, i had added one of them.
in this pr though i was essentially reverting that change and in my mind the constants were not use prior to that change. but in actuality, the constant was used prior to my prev change -- in the if
statement but not in the logging. so this is actually a removal and not just a revert of a recent change.
i will restore use of constant.
c958271
to
a6853fd
Compare
The Workflow run is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason. |
a6853fd
to
acfb0c9
Compare
acfb0c9
to
7367e46
Compare
airflow/cli/cli_parser.py
Outdated
message = f'celery subcommand works only with CeleryExecutor, your current executor: {executor}' | ||
raise ArgumentError(action, message) | ||
if value == 'celery': | ||
if executor != CELERY_EXECUTOR: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not cover CELERY_KUBERNETES_EXECUTOR
though, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i.e. if we don't run it via the Helm Chart, it should still work so I don't think this change is needed, apart from improving the error message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kaxil the problem is that when executor==CELERY_KUBERNETES_EXECUTOR on the celery worker, the celery worker does not run properly.
you get some fork pool error
running airflow celery worker
with CELERY_EXECUTOR
instead resolves the issue
celery workers don't need to know that the scheduler is using CKE
i know this is a bit hacky. our code should handle having all components set to use the same executor. however, that simply doesn't work right now, and this change (forcing the C workers to think CeleryExecutor is used) makes CKE work again immediately.
perhaps it makes sense to push out this hacky fix and then later look into why we get fork pool issues. but that's a question for you guys
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea that one feels too hacky, we should at least take care of it when running "airflow celery worker" (a less hacky --- so that users don't need to have a different configuration for the Celery Worker.
Can you paste the full stack trace of the error please too -- want to see if I have seen that before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See issue #13263
(Moved into the 2.0.3 milestone as it was never cherry picked into 2.0.1) |
closes apache#13263 (cherry picked from commit b59e416)
🤦 I just cherry-picked it to v2-0-test |
closes apache#13263 (cherry picked from commit b59e416)
CeleryKubernetesExeucutor is currently broken.
This PR gets it working again.
Issues resolved
1. missing job id
Before SchedulerJob starts executor it sets
self.executor.job_id = self.id
, where self.id is the primary key in the jobs table.See here: https://github.com/apache/airflow/blob/master/airflow/jobs/scheduler_job.py#L1265.
So with KubernetesExecutor, when
start
is called, the executor has a value underjob_id
.But with CeleryKubernetesExecutor enabled, the actual executors are under attributes on the CKE object, so they don't have the job ID, so when K8s executor tries to start, it fails because it doesn't find a job id.
Resolution:
I add a setter for job_id, so that when you set job_id on the CKE, it immediately propagates the value to the child executors.
2. missing slots_available
Since this executor does not inherit BaseExecutor, this property is missing and we must add it.
Resolution:
And I add slots_available property.
3. Celery worker does not run properly with CeleryKubernetesExecutor
When
airflow celery worker
is run withexecutor=CeleryKubernetesExecutor
, celery worker fails to fork properly.See more details in issue #13263.
Resolution:
This is perhaps a bit of a hack, but it provides a workable way to use this executor.