-
Notifications
You must be signed in to change notification settings - Fork 16.3k
[KubernetesPodOperator] Reads Kubernetes events and writes them into log #50192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KubernetesPodOperator] Reads Kubernetes events and writes them into log #50192
Conversation
69410d7 to
1f155d4
Compare
1f155d4 to
bafbd0a
Compare
780d7e3 to
12483fe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR.
IMO, I will prefer left await_pod_start as original but still be able to support this feature and minimize the change.
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py
Show resolved
Hide resolved
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
Show resolved
Hide resolved
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py
Outdated
Show resolved
Hide resolved
f6e1260 to
e906bb2
Compare
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
Outdated
Show resolved
Hide resolved
|
Setting label "log exception" as no new caplog is added. Just existing are still present |
432511d to
68b5e4b
Compare
jscheffl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good for me. Let's have CI green and then I assume we can merge.
I would wait another 48 hours if somebody else wants to finally review (or object)
Co-authored-by: Jens Scheffler <95105677+jscheffl@users.noreply.github.com>
1b81091 to
bc3c171
Compare
|
Rebasing... if green I'd merge now |
|
@jscheffl @AutomationDev85 Uh-oh. This is making use of asyncio in a normal sync worker, and it's causing KPO to break. I'm double checking versions, this was reported by a user: |
| schedule_timeout=self.schedule_timeout_seconds, | ||
| startup_timeout=self.startup_timeout_seconds, | ||
| check_interval=self.startup_check_interval_seconds, | ||
| loop = asyncio.get_event_loop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifically this: This is running in a normal sync worker there is no running even loop and this raises an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, per the docs:
If there is no running event loop set, the function will return the result of the get_event_loop_policy().get_event_loop() call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I wonder if it's related to this, and some user code turning that into an Error.
Deprecated since version 3.12: Deprecation warning is emitted if there is no current event loop. In some future Python release this will become an error.
Trying to confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what in parallel execution in a sync task is then the right way. For us it is working like this since a year in production. But maybe our environment is not representative?
Challenge is to parse and follow logs and events in parallel. There is no API in K8s delivering both concurrently and flipping back-and forth is very in efficient if you want to listen to log stream. Therefore we took the async approach.
Do you have more details on where and how it is "breaking"? Which environment?
Note that we also initially attempted to run another thread and not using asyncio but this also was blocked by Celery and is probably also not advised.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have many details -- its second hand through one of the Astronomer customers.
I think this is something turning the deprecation warning in to an error -- I think the fix/workaround is to swap the manual loop control to asyncio.run as per this in the docs
Application developers should typically use the high-level asyncio functions, such as
asyncio.run(), and should rarely need to reference the loop object or call its methods. This section is intended mostly for authors of lower-level code, libraries, and frameworks, who need finer control over the event loop behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a full report including a way to reproduce as a GH issue ticket on this? Would be great as well also to include a test to prevent regression then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to see what I can get. All I have right now is the stacktrace I put in a comment above.
| ) | ||
| assert not k.do_xcom_push | ||
|
|
||
| @pytest.mark.asyncio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you should need this -- It's likely needed to make asyncio.get_event_loop() pass, but a worker doesn't have it, and if we swap it to aysncio.run() it won't need this marker. I.e. adding this mark was working around the test failing in a way that is not respective of how the KPO actually runs?
Overview
The idea is to make the events of the Kubernetes pod visible in the log during start phase of the Pod. The KubenetesPodOperator starts the Pod and pulls the events in parallel and writes new events into log.
This enables the user to see what is happening in the background in Kubernetes.
Details of change: