From d0ebca8a61eb183b89c84b4135163c0803963742 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Tue, 23 May 2023 14:13:43 -0500 Subject: [PATCH 01/13] Updates worker metadata to support UI and CLI experiences (#70) --- prefect_kubernetes/worker.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index 9ef7d98..b48d8dc 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -459,6 +459,14 @@ class KubernetesWorker(BaseWorker): type = "kubernetes" job_configuration = KubernetesWorkerJobConfiguration job_configuration_variables = KubernetesWorkerVariables + _description = ( + "Execute flow runs within jobs scheduled on a Kubernetes cluster. Requires a " + "Kubernetes cluster." + ) + _display_name = "Kubernetes" + _documentation_url = "https://prefecthq.github.io/prefect-kubernetes/worker/" + _is_beta = True + _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/1zrSeY8DZ1MJZs2BAyyyGk/20445025358491b8b72600b8f996125b/Kubernetes_logo_without_workmark.svg.png?h=250" # noqa async def run( self, From aec3b72dfa2fc68a1fee63b64606568038d789d1 Mon Sep 17 00:00:00 2001 From: Zanie Adkins Date: Thu, 25 May 2023 12:33:45 -0500 Subject: [PATCH 02/13] Improve failure message when creating the Kubernetes job fails (#71) * Improve failure message when creating the Kubernetes job fails * Add tests * Sort imports --- prefect_kubernetes/worker.py | 27 +++++-- tests/test_worker.py | 147 ++++++++++++++++++++++++++++++++++- 2 files changed, 168 insertions(+), 6 deletions(-) diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index b48d8dc..fad5c2a 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -97,7 +97,11 @@ import anyio.abc from prefect.blocks.kubernetes import KubernetesClusterConfig from prefect.docker import get_prefect_image_name -from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound +from prefect.exceptions import ( + InfrastructureError, + InfrastructureNotAvailable, + InfrastructureNotFound, +) from prefect.server.schemas.core import Flow from prefect.server.schemas.responses import DeploymentResponse from prefect.utilities.asyncutils import run_sync_in_worker_thread @@ -609,10 +613,23 @@ def _create_job( """ Creates a Kubernetes job from a job manifest. """ - with self._get_batch_client(client) as batch_client: - job = batch_client.create_namespaced_job( - configuration.namespace, configuration.job_manifest - ) + try: + with self._get_batch_client(client) as batch_client: + job = batch_client.create_namespaced_job( + configuration.namespace, configuration.job_manifest + ) + except kubernetes.client.exceptions.ApiException as exc: + # Parse the reason and message from the response if feasible + message = "" + if exc.reason: + message += ": " + exc.reason + if exc.body and "message" in exc.body: + message += ": " + exc.body["message"] + + raise InfrastructureError( + f"Unable to create Kubernetes job{message}" + ) from exc + return job @contextmanager diff --git a/tests/test_worker.py b/tests/test_worker.py index 41350e9..1a1608c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,3 +1,4 @@ +import re from contextlib import contextmanager from time import monotonic, sleep from unittest import mock @@ -13,7 +14,11 @@ from kubernetes.config import ConfigException from prefect.client.schemas import FlowRun from prefect.docker import get_prefect_image_name -from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound +from prefect.exceptions import ( + InfrastructureError, + InfrastructureNotAvailable, + InfrastructureNotFound, +) from prefect.server.schemas.core import Flow from prefect.server.schemas.responses import DeploymentResponse from prefect.settings import ( @@ -1071,6 +1076,146 @@ async def test_uses_image_variable( ]["spec"]["containers"][0]["image"] assert image == "foo" + async def test_create_job_failure( + self, + flow_run, + mock_core_client, + mock_watch, + mock_batch_client, + ): + response = MagicMock() + response.data = { + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "message": 'jobs.batch is forbidden: User "system:serviceaccount:helm-test:prefect-worker-dev" cannot create resource "jobs" in API group "batch" in the namespace "prefect"', + "reason": "Forbidden", + "details": {"group": "batch", "kind": "jobs"}, + "code": 403, + } + response.status = 403 + response.reason = "Forbidden" + + mock_batch_client.create_namespaced_job.side_effect = ApiException( + http_resp=response + ) + + configuration = await KubernetesWorkerJobConfiguration.from_template_and_values( + KubernetesWorker.get_default_base_job_template(), {"image": "foo"} + ) + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + with pytest.raises( + InfrastructureError, + match=re.escape( + "Unable to create Kubernetes job: Forbidden: jobs.batch is forbidden: User " + '"system:serviceaccount:helm-test:prefect-worker-dev" cannot ' + 'create resource "jobs" in API group "batch" in the namespace ' + '"prefect"' + ), + ): + await k8s_worker.run(flow_run, configuration) + + async def test_create_job_failure_no_reason( + self, + flow_run, + mock_core_client, + mock_watch, + mock_batch_client, + ): + response = MagicMock() + response.data = { + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "message": 'jobs.batch is forbidden: User "system:serviceaccount:helm-test:prefect-worker-dev" cannot create resource "jobs" in API group "batch" in the namespace "prefect"', + "reason": "Forbidden", + "details": {"group": "batch", "kind": "jobs"}, + "code": 403, + } + response.status = 403 + response.reason = None + + mock_batch_client.create_namespaced_job.side_effect = ApiException( + http_resp=response + ) + + configuration = await KubernetesWorkerJobConfiguration.from_template_and_values( + KubernetesWorker.get_default_base_job_template(), {"image": "foo"} + ) + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + with pytest.raises( + InfrastructureError, + match=re.escape( + "Unable to create Kubernetes job: jobs.batch is forbidden: User " + '"system:serviceaccount:helm-test:prefect-worker-dev" cannot ' + 'create resource "jobs" in API group "batch" in the namespace ' + '"prefect"' + ), + ): + await k8s_worker.run(flow_run, configuration) + + async def test_create_job_failure_no_message( + self, + flow_run, + mock_core_client, + mock_watch, + mock_batch_client, + ): + response = MagicMock() + response.data = { + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "reason": "Forbidden", + "details": {"group": "batch", "kind": "jobs"}, + "code": 403, + } + response.status = 403 + response.reason = "Test" + + mock_batch_client.create_namespaced_job.side_effect = ApiException( + http_resp=response + ) + + configuration = await KubernetesWorkerJobConfiguration.from_template_and_values( + KubernetesWorker.get_default_base_job_template(), {"image": "foo"} + ) + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + with pytest.raises( + InfrastructureError, + match=re.escape("Unable to create Kubernetes job: Test"), + ): + await k8s_worker.run(flow_run, configuration) + + async def test_create_job_failure_no_response_body( + self, + flow_run, + mock_core_client, + mock_watch, + mock_batch_client, + ): + response = MagicMock() + response.data = None + response.status = 403 + response.reason = "Test" + + mock_batch_client.create_namespaced_job.side_effect = ApiException( + http_resp=response + ) + + configuration = await KubernetesWorkerJobConfiguration.from_template_and_values( + KubernetesWorker.get_default_base_job_template(), {"image": "foo"} + ) + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + with pytest.raises( + InfrastructureError, + match=re.escape("Unable to create Kubernetes job: Test"), + ): + await k8s_worker.run(flow_run, configuration) + async def test_allows_image_setting_from_manifest( self, default_configuration: KubernetesWorkerJobConfiguration, From f7b45510ef929fa74d37c2e86266b6c3c1151c99 Mon Sep 17 00:00:00 2001 From: Zanie Adkins Date: Thu, 25 May 2023 14:10:42 -0500 Subject: [PATCH 03/13] Use flow run logger for worker logs (#72) * Use flow run logger for worker logs * Sort imports --- prefect_kubernetes/worker.py | 29 ++++++++++++++++++----------- requirements.txt | 2 +- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index fad5c2a..0999d94 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -88,6 +88,7 @@ checkout out the [Prefect docs](https://docs.prefect.io/concepts/work-pools/). """ import enum +import logging import math import os import time @@ -492,7 +493,10 @@ async def run( KubernetesWorkerResult: A result object containing information about the final state of the flow run """ + logger = self.get_flow_run_logger(flow_run) + with self._get_configured_kubernetes_client(configuration) as client: + logger.info("Creating Kubernetes job...") job = await run_sync_in_worker_thread( self._create_job, configuration, client ) @@ -518,7 +522,7 @@ async def run( with events_replicator: status_code = await run_sync_in_worker_thread( - self._watch_job, job.metadata.name, configuration, client + self._watch_job, logger, job.metadata.name, configuration, client ) return KubernetesWorkerResult(identifier=pid, status_code=status_code) @@ -705,6 +709,7 @@ def _get_cluster_uid(self, client: "ApiClient") -> str: def _watch_job( self, + logger: logging.Logger, job_name: str, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient", @@ -714,13 +719,13 @@ def _watch_job( Return the final status code of the first container. """ - self._logger.debug(f"Job {job_name!r}: Monitoring job...") + logger.debug(f"Job {job_name!r}: Monitoring job...") - job = self._get_job(job_name, configuration, client) + job = self._get_job(logger, job_name, configuration, client) if not job: return -1 - pod = self._get_job_pod(job_name, configuration, client) + pod = self._get_job_pod(logger, job_name, configuration, client) if not pod: return -1 @@ -753,7 +758,7 @@ def _watch_job( break except Exception: - self._logger.warning( + logger.warning( ( "Error occurred while streaming logs - " "Job will continue to run but logs will " @@ -774,7 +779,7 @@ def _watch_job( math.ceil(deadline - time.monotonic()) if deadline else None ) if deadline and remaining_time <= 0: - self._logger.error( + logger.error( f"Job {job_name!r}: Job did not complete within " f"timeout of {configuration.job_watch_timeout_seconds}s." ) @@ -797,7 +802,7 @@ def _watch_job( if event["object"].status.completion_time: if not event["object"].status.succeeded: # Job failed, exit while loop and return pod exit code - self._logger.error(f"Job {job_name!r}: Job failed.") + logger.error(f"Job {job_name!r}: Job failed.") completed = True watch.stop() break @@ -812,6 +817,7 @@ def _watch_job( def _get_job( self, + logger: logging.Logger, job_id: str, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient", @@ -823,19 +829,20 @@ def _get_job( name=job_id, namespace=configuration.namespace ) except kubernetes.client.exceptions.ApiException: - self._logger.error(f"Job {job_id!r} was removed.", exc_info=True) + logger.error(f"Job {job_id!r} was removed.", exc_info=True) return None return job def _get_job_pod( self, + logger: logging.Logger, job_name: str, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient", ) -> Optional["V1Pod"]: """Get the first running pod for a job.""" watch = kubernetes.watch.Watch() - self._logger.debug(f"Job {job_name!r}: Starting watch for pod start...") + logger.debug(f"Job {job_name!r}: Starting watch for pod start...") last_phase = None with self._get_core_client(client) as core_client: for event in watch.stream( @@ -846,7 +853,7 @@ def _get_job_pod( ): phase = event["object"].status.phase if phase != last_phase: - self._logger.info(f"Job {job_name!r}: Pod has status {phase!r}.") + logger.info(f"Job {job_name!r}: Pod has status {phase!r}.") if phase != "Pending": watch.stop() @@ -854,4 +861,4 @@ def _get_job_pod( last_phase = phase - self._logger.error(f"Job {job_name!r}: Pod never started.") + logger.error(f"Job {job_name!r}: Pod never started.") diff --git a/requirements.txt b/requirements.txt index 7dd7a64..f4647c5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -prefect>=2.10.7 +prefect>=2.10.9 kubernetes >= 24.2.0 \ No newline at end of file From 39b03e069cbfb8f610b05c6cbffbbe4895b2f0a1 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Thu, 25 May 2023 14:25:42 -0500 Subject: [PATCH 04/13] Update CHANGELOG.md for v0.2.8 release (#73) * Update CHANGELOG.md for v0.2.8 release * Update CHANGELOG.md Co-authored-by: Serina Grill <42048900+serinamarie@users.noreply.github.com> --------- Co-authored-by: Serina Grill <42048900+serinamarie@users.noreply.github.com> --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 58d81ee..eaeed11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security +## 0.2.8 + +Released May 25th, 2023. + +### Changed + +- Improve failure message when creating a Kubernetes job fails - [#71](https://github.com/PrefectHQ/prefect-kubernetes/pull/71) +- Stream Kubernetes Worker flow run logs to the API - [#72]([https://github.com/PrefectHQ/prefect-aws/pull/267](https://github.com/PrefectHQ/prefect-kubernetes/pull/72)) + ## 0.2.7 Released May 4th, 2023. From 8c33171a7dbe1e2cd304162fcd1331d48cb5248d Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Thu, 25 May 2023 14:26:49 -0500 Subject: [PATCH 05/13] Fix typo in CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eaeed11..5c0e592 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,7 +26,7 @@ Released May 25th, 2023. ### Changed - Improve failure message when creating a Kubernetes job fails - [#71](https://github.com/PrefectHQ/prefect-kubernetes/pull/71) -- Stream Kubernetes Worker flow run logs to the API - [#72]([https://github.com/PrefectHQ/prefect-aws/pull/267](https://github.com/PrefectHQ/prefect-kubernetes/pull/72)) +- Stream Kubernetes Worker flow run logs to the API - [#72](https://github.com/PrefectHQ/prefect-kubernetes/pull/72) ## 0.2.7 From 5e6918c621db6744482b51a92ad933de09d0a508 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Tue, 20 Jun 2023 12:47:12 -0500 Subject: [PATCH 06/13] Fix for `generateName` bug (#76) * Adds fix for generate name bug * Updates test * Adds changelog entry --- CHANGELOG.md | 8 ++++++++ prefect_kubernetes/worker.py | 11 ++++++++++- tests/test_worker.py | 7 ++++--- 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c0e592..e45009e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security +## 0.2.9 + +Released June 20th, 2023. + +### Fixed + +- Fixed issue where `generateName` was not populating correctly for some flow runs submitted by `KubernetesWorker` - [#76](https://github.com/PrefectHQ/prefect-kubernetes/pull/76) + ## 0.2.8 Released May 25th, 2023. diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index 0999d94..d437517 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -389,7 +389,16 @@ def _populate_generate_name_if_not_present(self): """Ensures that the generateName is present in the job manifest.""" manifest_generate_name = self.job_manifest["metadata"].get("generateName", "") has_placeholder = len(find_placeholders(manifest_generate_name)) > 0 - if not manifest_generate_name or has_placeholder: + # if name wasn't present during template rendering, generateName will be + # just a hyphen + manifest_generate_name_templated_with_empty_string = ( + manifest_generate_name == "-" + ) + if ( + not manifest_generate_name + or has_placeholder + or manifest_generate_name_templated_with_empty_string + ): generate_name = None if self.name: generate_name = _slugify_name(self.name) diff --git a/tests/test_worker.py b/tests/test_worker.py index 1a1608c..c06461d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,4 +1,5 @@ import re +import uuid from contextlib import contextmanager from time import monotonic, sleep from unittest import mock @@ -154,7 +155,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): "kind": "Job", "metadata": { "namespace": "default", - "generateName": "{{ name }}-", + "generateName": "-", "labels": {}, }, "spec": { @@ -685,7 +686,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): class TestKubernetesWorkerJobConfiguration: @pytest.fixture def flow_run(self): - return FlowRun(name="my-flow-run-name") + return FlowRun(flow_id=uuid.uuid4(), name="my-flow-run-name") @pytest.fixture def deployment(self): @@ -918,7 +919,7 @@ async def default_configuration(self): @pytest.fixture def flow_run(self): - return FlowRun(name="my-flow-run-name") + return FlowRun(flow_id=uuid.uuid4(), name="my-flow-run-name") async def test_creates_job_by_building_a_manifest( self, From 380fe69d20dfc8ef5eda6a449ce6fb691d5ff377 Mon Sep 17 00:00:00 2001 From: Jonathan Yu Date: Tue, 20 Jun 2023 18:35:47 -0700 Subject: [PATCH 07/13] Update Code Owners (#77) Replace Integrations with Open Source team as code owners. --- .github/CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 2b1984c..506f738 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @PrefectHQ/integrations +* @PrefectHQ/open-source From 8e9b3e6a01504c9f2596c518d88573e826f7d494 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 20 Jun 2023 20:36:09 -0500 Subject: [PATCH 08/13] Bump JamesIves/github-pages-deploy-action from 4.4.1 to 4.4.2 (#74) Bumps [JamesIves/github-pages-deploy-action](https://github.com/JamesIves/github-pages-deploy-action) from 4.4.1 to 4.4.2. - [Release notes](https://github.com/JamesIves/github-pages-deploy-action/releases) - [Commits](https://github.com/JamesIves/github-pages-deploy-action/compare/v4.4.1...v4.4.2) --- updated-dependencies: - dependency-name: JamesIves/github-pages-deploy-action dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/publish-docs.yml | 2 +- .github/workflows/release.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/publish-docs.yml b/.github/workflows/publish-docs.yml index 10166bd..9dd62dd 100644 --- a/.github/workflows/publish-docs.yml +++ b/.github/workflows/publish-docs.yml @@ -25,7 +25,7 @@ jobs: mkdocs build - name: Publish docs - uses: JamesIves/github-pages-deploy-action@v4.4.1 + uses: JamesIves/github-pages-deploy-action@v4.4.2 with: branch: docs folder: site diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e527848..433b6ac 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -73,7 +73,7 @@ jobs: mkdocs build - name: Publish docs - uses: JamesIves/github-pages-deploy-action@v4.4.1 + uses: JamesIves/github-pages-deploy-action@v4.4.2 with: branch: docs folder: site From ca33f0525bffce1e616be9d4d8c529ca32c8d011 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 20 Jun 2023 20:36:33 -0500 Subject: [PATCH 09/13] Bump peter-evans/create-pull-request from 4 to 5 (#49) Bumps [peter-evans/create-pull-request](https://github.com/peter-evans/create-pull-request) from 4 to 5. - [Release notes](https://github.com/peter-evans/create-pull-request/releases) - [Commits](https://github.com/peter-evans/create-pull-request/compare/v4...v5) --- updated-dependencies: - dependency-name: peter-evans/create-pull-request dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/template-sync.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/template-sync.yml b/.github/workflows/template-sync.yml index 19a648e..6cbfefd 100644 --- a/.github/workflows/template-sync.yml +++ b/.github/workflows/template-sync.yml @@ -30,7 +30,7 @@ jobs: private_key: ${{ secrets.SYNC_APP_PRIVATE_KEY }} - name: Submit PR - uses: peter-evans/create-pull-request@v4 + uses: peter-evans/create-pull-request@v5 with: commit-message: Updating collection with changes to prefect-collection-template token: ${{ steps.generate-token.outputs.token }} From 7865ec9e1b53666cab8f43d1ca5f3855c756a321 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 20 Jun 2023 20:36:56 -0500 Subject: [PATCH 10/13] Bump actions/add-to-project from 0.4.0 to 0.5.0 (#47) Bumps [actions/add-to-project](https://github.com/actions/add-to-project) from 0.4.0 to 0.5.0. - [Release notes](https://github.com/actions/add-to-project/releases) - [Commits](https://github.com/actions/add-to-project/compare/v0.4.0...v0.5.0) --- updated-dependencies: - dependency-name: actions/add-to-project dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/add-to-project.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/add-to-project.yml b/.github/workflows/add-to-project.yml index 08c201d..6133f8b 100644 --- a/.github/workflows/add-to-project.yml +++ b/.github/workflows/add-to-project.yml @@ -18,7 +18,7 @@ jobs: app_id: ${{ secrets.SYNC_APP_ID }} private_key: ${{ secrets.SYNC_APP_PRIVATE_KEY }} - - uses: actions/add-to-project@v0.4.0 + - uses: actions/add-to-project@v0.5.0 with: project-url: ${{ secrets.ADD_TO_PROJECT_URL }} github-token: ${{ steps.generate-token.outputs.token }} From b5651614e1fd4d60155d5488bee652455574bbb7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 20 Jun 2023 20:37:32 -0500 Subject: [PATCH 11/13] Bump helm/kind-action from 1.5.0 to 1.7.0 (#69) Bumps [helm/kind-action](https://github.com/helm/kind-action) from 1.5.0 to 1.7.0. - [Release notes](https://github.com/helm/kind-action/releases) - [Commits](https://github.com/helm/kind-action/compare/v1.5.0...v1.7.0) --- updated-dependencies: - dependency-name: helm/kind-action dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/kubernetes-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/kubernetes-test.yml b/.github/workflows/kubernetes-test.yml index ee6a1a3..e3d925f 100644 --- a/.github/workflows/kubernetes-test.yml +++ b/.github/workflows/kubernetes-test.yml @@ -37,7 +37,7 @@ jobs: helm repo update - name: Create kind cluster - uses: helm/kind-action@v1.5.0 + uses: helm/kind-action@v1.7.0 with: cluster_name: kind version: v0.18.0 From d9ce16e635051df2f276badf2a20cf111b072446 Mon Sep 17 00:00:00 2001 From: Alexander Streed Date: Fri, 30 Jun 2023 14:22:48 -0500 Subject: [PATCH 12/13] Add handling for rescheduled Kubernetes jobs (#78) * Add handling for rescheduled jobs * Adds test for backoff limit checking * Adds test for missing pod * Adds changelog entry * Add default backoffLimit to ensure jobs are run only once by default * Updates changelog for release * Updates tests --- CHANGELOG.md | 8 +++ prefect_kubernetes/worker.py | 44 +++++++++++++++-- tests/test_worker.py | 96 +++++++++++++++++++++++++++++++++--- 3 files changed, 138 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e45009e..eddf088 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security +## 0.2.10 + +Released June 30th, 2023. + +### Added + +- Handling for rescheduled Kubernetes jobs - [#78](https://github.com/PrefectHQ/prefect-kubernetes/pull/78) + ## 0.2.9 Released June 20th, 2023. diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index d437517..306ca36 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -148,6 +148,7 @@ def _get_default_job_manifest_template() -> Dict[str, Any]: "generateName": "{{ name }}-", }, "spec": { + "backoffLimit": 0, "ttlSecondsAfterFinished": "{{ finished_job_ttl }}", "template": { "spec": { @@ -808,19 +809,54 @@ def _watch_job( namespace=configuration.namespace, **timeout_seconds, ): - if event["object"].status.completion_time: + if event["type"] == "DELETED": + logger.error(f"Job {job_name!r}: Job has been deleted.") + completed = True + elif event["object"].status.completion_time: if not event["object"].status.succeeded: # Job failed, exit while loop and return pod exit code logger.error(f"Job {job_name!r}: Job failed.") completed = True + # Check if the job has reached its backoff limit + # and stop watching if it has + elif ( + event["object"].spec.backoff_limit is not None + and event["object"].status.failed is not None + and event["object"].status.failed + > event["object"].spec.backoff_limit + ): + logger.error(f"Job {job_name!r}: Job reached backoff limit.") + completed = True + # If the job has no backoff limit, check if it has failed + # and stop watching if it has + elif ( + not event["object"].spec.backoff_limit + and event["object"].status.failed + ): + completed = True + + if completed: watch.stop() break with self._get_core_client(client) as core_client: - pod_status = core_client.read_namespaced_pod_status( - namespace=configuration.namespace, name=pod.metadata.name + # Get all pods for the job + pods = core_client.list_namespaced_pod( + namespace=configuration.namespace, label_selector=f"job-name={job_name}" + ) + # Get the status for only the most recently used pod + pods.items.sort( + key=lambda pod: pod.metadata.creation_timestamp, reverse=True + ) + most_recent_pod = pods.items[0] if pods.items else None + first_container_status = ( + most_recent_pod.status.container_statuses[0] + if most_recent_pod + else None ) - first_container_status = pod_status.status.container_statuses[0] + if not first_container_status: + logger.error(f"Job {job_name!r}: No pods found for job.") + return -1 return first_container_status.state.terminated.exit_code diff --git a/tests/test_worker.py b/tests/test_worker.py index c06461d..7f4ae22 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -159,6 +159,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): "labels": {}, }, "spec": { + "backoffLimit": 0, "template": { "spec": { "parallelism": 1, @@ -171,7 +172,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): } ], } - } + }, }, }, cluster_config=None, @@ -213,6 +214,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): }, }, "spec": { + "backoffLimit": 0, "template": { "spec": { "parallelism": 1, @@ -241,7 +243,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): } ], } - } + }, }, }, cluster_config=None, @@ -290,6 +292,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): "generateName": "test-", }, "spec": { + "backoffLimit": 0, "ttlSecondsAfterFinished": 60, "template": { "spec": { @@ -354,6 +357,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): }, }, "spec": { + "backoffLimit": 0, "ttlSecondsAfterFinished": 60, "template": { "spec": { @@ -935,7 +939,10 @@ async def test_creates_job_by_building_a_manifest( async with KubernetesWorker(work_pool_name="test") as k8s_worker: await k8s_worker.run(flow_run=flow_run, configuration=default_configuration) - mock_core_client.read_namespaced_pod_status.assert_called_once() + mock_core_client.list_namespaced_pod.assert_called_with( + namespace=default_configuration.namespace, + label_selector="job-name=mock-job", + ) mock_batch_client.create_namespaced_job.assert_called_with( "default", @@ -1768,7 +1775,9 @@ def mock_stream(*args, **kwargs): # Yield the completed job job.status.completion_time = True - yield {"object": job} + job.status.failed = 0 + job.spec.backoff_limit = 6 + yield {"object": job, "type": "ADDED"} def mock_log_stream(*args, **kwargs): anyio.sleep(500) @@ -1817,7 +1826,7 @@ def mock_stream(*args, **kwargs): if kwargs["func"] == mock_core_client.list_namespaced_pod: job_pod = MagicMock(spec=kubernetes.client.V1Pod) job_pod.status.phase = "Running" - yield {"object": job_pod} + yield {"object": job_pod, "type": "ADDED"} if kwargs["func"] == mock_batch_client.list_namespaced_job: job = MagicMock(spec=kubernetes.client.V1Job) @@ -1947,7 +1956,9 @@ def mock_stream(*args, **kwargs): # Yield the job then return exiting the stream job.status.completion_time = None - yield {"object": job} + job.status.failed = 0 + job.spec.backoff_limit = 6 + yield {"object": job, "type": "ADDED"} mock_watch.stream.side_effect = mock_stream default_configuration.job_watch_timeout_seconds = 40 @@ -1993,6 +2004,79 @@ def mock_stream(*args, **kwargs): ] ) + async def test_watch_stops_after_backoff_limit_reached( + self, + flow_run, + default_configuration, + mock_core_client, + mock_watch, + mock_batch_client, + ): + # The job should not be completed to start + mock_batch_client.read_namespaced_job.return_value.status.completion_time = None + job_pod = MagicMock(spec=kubernetes.client.V1Pod) + job_pod.status.phase = "Running" + mock_container_status = MagicMock(spec=kubernetes.client.V1ContainerStatus) + mock_container_status.state.terminated.exit_code = 137 + job_pod.status.container_statuses = [mock_container_status] + mock_core_client.list_namespaced_pod.return_value.items = [job_pod] + + def mock_stream(*args, **kwargs): + if kwargs["func"] == mock_core_client.list_namespaced_pod: + yield {"object": job_pod} + + if kwargs["func"] == mock_batch_client.list_namespaced_job: + job = MagicMock(spec=kubernetes.client.V1Job) + + # Yield the job then return exiting the stream + job.status.completion_time = None + job.spec.backoff_limit = 6 + for i in range(0, 8): + job.status.failed = i + yield {"object": job, "type": "ADDED"} + + mock_watch.stream.side_effect = mock_stream + + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + result = await k8s_worker.run(flow_run, default_configuration) + + assert result.status_code == 137 + + async def test_watch_handles_no_pod( + self, + flow_run, + default_configuration, + mock_core_client, + mock_watch, + mock_batch_client, + ): + # The job should not be completed to start + mock_batch_client.read_namespaced_job.return_value.status.completion_time = None + mock_core_client.list_namespaced_pod.return_value.items = [] + + def mock_stream(*args, **kwargs): + if kwargs["func"] == mock_core_client.list_namespaced_pod: + job_pod = MagicMock(spec=kubernetes.client.V1Pod) + job_pod.status.phase = "Running" + yield {"object": job_pod} + + if kwargs["func"] == mock_batch_client.list_namespaced_job: + job = MagicMock(spec=kubernetes.client.V1Job) + + # Yield the job then return exiting the stream + job.status.completion_time = None + job.spec.backoff_limit = 6 + for i in range(0, 8): + job.status.failed = i + yield {"object": job, "type": "ADDED"} + + mock_watch.stream.side_effect = mock_stream + + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + result = await k8s_worker.run(flow_run, default_configuration) + + assert result.status_code == -1 + class TestKillInfrastructure: async def test_kill_infrastructure_calls_delete_namespaced_job( self, From b14ad8f1fe9cc2df11b164d9490ab37fd0f27723 Mon Sep 17 00:00:00 2001 From: Will Raphaelson Date: Tue, 18 Jul 2023 13:17:37 -0400 Subject: [PATCH 13/13] removing-worker-beta (#82) --- CHANGELOG.md | 7 +++++++ prefect_kubernetes/worker.py | 4 ---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eddf088..e2311ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security +## 0.2.11 + +Released July 20th, 2023. + +### Changed +- Promoted workers to GA, removed beta disclaimers + ## 0.2.10 Released June 30th, 2023. diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index 306ca36..49aff03 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -1,10 +1,7 @@ """ - Module containing the Kubernetes worker used for executing flow runs as Kubernetes jobs. -Note this module is in **beta**. The interfaces within may change without notice. - To start a Kubernetes worker, run the following command: ```bash @@ -480,7 +477,6 @@ class KubernetesWorker(BaseWorker): ) _display_name = "Kubernetes" _documentation_url = "https://prefecthq.github.io/prefect-kubernetes/worker/" - _is_beta = True _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/1zrSeY8DZ1MJZs2BAyyyGk/20445025358491b8b72600b8f996125b/Kubernetes_logo_without_workmark.svg.png?h=250" # noqa async def run(