Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KubernetesPodOperator new callbacks and allow multiple callbacks #44357

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

johnhoran
Copy link


I would like to have multiple callbacks in the kubernetes pod operator, and add two new callbacks. on_manifest_finalization would allow the callback to make changes just before the manifest is turned into a pod.
on_pod_wrapup would happen after the calls to on_pod_completion but just before the pod is deleted.

Adding both of these plus allowing multiple callbacks would allow you to do things in the kubernetes pod operator akin to how it approaches XComs but in a modular way. My use case here is I'm running DBT in kpo, and I want to do multiple things to the DBT artefacts after the DBT job has run, I could use on_manifest_finalization to insert an alpine sidecar with volumes mounted with the same intention as the XCom sidecar, where the sidecar keeps the volumes alive as files are extracted from it. on_pod_wrapup would allow me to insert a single sidecar and have multiple callbacks run their on_pod_completion before on_pod_wrapup kills the sidecar.

Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good start!
Can you add few test cases that test more scenarios of more than 1 callback?

  1. Few sync callbacks
  2. Few async
  3. Combination of both - trying to assert for order

@@ -50,7 +54,27 @@ def on_sync_client_creation(*, client: k8s.CoreV1Api, **kwargs) -> None:
pass

@staticmethod
def on_pod_creation(*, pod: k8s.V1Pod, client: client_type, mode: str, **kwargs) -> None:
def on_manifest_finalization(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name here sounds a little difficult to understand. Can we do better here?
I have a proposal, how about on_pod_manifest_created?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, sounds good to me. If you have a suggestion for the on_pod_wrapup too, I struggled with that one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That kind of seems ok but would do better with a renaming!

Comment on lines +126 to +142
def on_pod_wrapup(
*,
pod: k8s.V1Pod,
client: client_type,
mode: str,
operator: KubernetesPodOperator,
context: Context,
**kwargs,
) -> None:
"""
Invoke this callback after all pod completion callbacks but before the pod is deleted.

:param pod: the completed pod.
:param client: the Kubernetes client that can be used in the callback.
:param mode: the current execution mode, it's one of (`sync`, `async`).
"""
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we send the completed pod here. That would require some tracking and filtering to send one. Why can this callback's role be achieved by on_pod_completion?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing code makes a call to find the pod if the callbacks are attached.

pod=self.find_pod(self.pod.metadata.namespace, context=context),

Honestly I'd prefer if I was sending a stale reference and that it was the responsibility of the callback to get an updated pod if it needs it since we're sending the client too. Especially since its possible a callback might not implement the on_pod_completion method. So to maintain existing behaviour I'm getting it once. An alternative might be to test if the on_pod_completion is implemented in the callback and get an updated pod for each callback call, but again this assumes we need an updated pod, which might not be the case.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the need for on_pod_wrapup my thought here was that you might have multiple callbacks running before a sidecar container is killed. Rather than attaching mutliple sidecars to the container, you could have a class that attaches a single sidecar and kills it in the on_pod_wrapup, any subclasses of it could pull whatever files they need or run any commands in the pod during the on_pod_completion callback.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite sure if I understand you well here. Are you talking about a specific case of a running sidecar?

Copy link
Author

@johnhoran johnhoran Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I'm not being clear. The way I've been using this so far is that I have one class that extends off the KubernetesPodOperatorCallback and does the insertion of a sidecar on_pod_manifest_created, killing the sidecar on_pod_wrapup, and some code to ensure that the sidecar is only added/killed once. Then extending off that I have a bunch of other classes that are responsible for doing some actual work with the sidecar, in my case I pull DBT artifacts in the on_pod_completion method, then in on_pod_wrapup they call super().on_pod_wrapup() before extracting dataset events from DBT artifacts and a seperate callback that uploads them to S3.

Comment on lines +449 to 451
for callback in self._callbacks:
callback.progress_callback(
line=line, client=self._client, mode=ExecutionMode.SYNC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems ok when callbacks are running in SYNC mode. What about async?
Would probably require some more thinking

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callbacks aren't really implemented for async operation at the moment unfortunately. #35714 (comment).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks!
Yea in that case, this will do

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants