diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 2b1984c..506f738 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @PrefectHQ/integrations +* @PrefectHQ/open-source diff --git a/.github/workflows/add-to-project.yml b/.github/workflows/add-to-project.yml index 08c201d..6133f8b 100644 --- a/.github/workflows/add-to-project.yml +++ b/.github/workflows/add-to-project.yml @@ -18,7 +18,7 @@ jobs: app_id: ${{ secrets.SYNC_APP_ID }} private_key: ${{ secrets.SYNC_APP_PRIVATE_KEY }} - - uses: actions/add-to-project@v0.4.0 + - uses: actions/add-to-project@v0.5.0 with: project-url: ${{ secrets.ADD_TO_PROJECT_URL }} github-token: ${{ steps.generate-token.outputs.token }} diff --git a/.github/workflows/kubernetes-test.yml b/.github/workflows/kubernetes-test.yml index ee6a1a3..e3d925f 100644 --- a/.github/workflows/kubernetes-test.yml +++ b/.github/workflows/kubernetes-test.yml @@ -37,7 +37,7 @@ jobs: helm repo update - name: Create kind cluster - uses: helm/kind-action@v1.5.0 + uses: helm/kind-action@v1.7.0 with: cluster_name: kind version: v0.18.0 diff --git a/.github/workflows/publish-docs.yml b/.github/workflows/publish-docs.yml index 10166bd..9dd62dd 100644 --- a/.github/workflows/publish-docs.yml +++ b/.github/workflows/publish-docs.yml @@ -25,7 +25,7 @@ jobs: mkdocs build - name: Publish docs - uses: JamesIves/github-pages-deploy-action@v4.4.1 + uses: JamesIves/github-pages-deploy-action@v4.4.2 with: branch: docs folder: site diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e527848..433b6ac 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -73,7 +73,7 @@ jobs: mkdocs build - name: Publish docs - uses: JamesIves/github-pages-deploy-action@v4.4.1 + uses: JamesIves/github-pages-deploy-action@v4.4.2 with: branch: docs folder: site diff --git a/.github/workflows/template-sync.yml b/.github/workflows/template-sync.yml index 19a648e..6cbfefd 100644 --- a/.github/workflows/template-sync.yml +++ b/.github/workflows/template-sync.yml @@ -30,7 +30,7 @@ jobs: private_key: ${{ secrets.SYNC_APP_PRIVATE_KEY }} - name: Submit PR - uses: peter-evans/create-pull-request@v4 + uses: peter-evans/create-pull-request@v5 with: commit-message: Updating collection with changes to prefect-collection-template token: ${{ steps.generate-token.outputs.token }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 58d81ee..e2311ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,38 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security +## 0.2.11 + +Released July 20th, 2023. + +### Changed +- Promoted workers to GA, removed beta disclaimers + +## 0.2.10 + +Released June 30th, 2023. + +### Added + +- Handling for rescheduled Kubernetes jobs - [#78](https://github.com/PrefectHQ/prefect-kubernetes/pull/78) + +## 0.2.9 + +Released June 20th, 2023. + +### Fixed + +- Fixed issue where `generateName` was not populating correctly for some flow runs submitted by `KubernetesWorker` - [#76](https://github.com/PrefectHQ/prefect-kubernetes/pull/76) + +## 0.2.8 + +Released May 25th, 2023. + +### Changed + +- Improve failure message when creating a Kubernetes job fails - [#71](https://github.com/PrefectHQ/prefect-kubernetes/pull/71) +- Stream Kubernetes Worker flow run logs to the API - [#72](https://github.com/PrefectHQ/prefect-kubernetes/pull/72) + ## 0.2.7 Released May 4th, 2023. diff --git a/prefect_kubernetes/worker.py b/prefect_kubernetes/worker.py index 9ef7d98..49aff03 100644 --- a/prefect_kubernetes/worker.py +++ b/prefect_kubernetes/worker.py @@ -1,10 +1,7 @@ """ - Module containing the Kubernetes worker used for executing flow runs as Kubernetes jobs. -Note this module is in **beta**. The interfaces within may change without notice. - To start a Kubernetes worker, run the following command: ```bash @@ -88,6 +85,7 @@ checkout out the [Prefect docs](https://docs.prefect.io/concepts/work-pools/). """ import enum +import logging import math import os import time @@ -97,7 +95,11 @@ import anyio.abc from prefect.blocks.kubernetes import KubernetesClusterConfig from prefect.docker import get_prefect_image_name -from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound +from prefect.exceptions import ( + InfrastructureError, + InfrastructureNotAvailable, + InfrastructureNotFound, +) from prefect.server.schemas.core import Flow from prefect.server.schemas.responses import DeploymentResponse from prefect.utilities.asyncutils import run_sync_in_worker_thread @@ -143,6 +145,7 @@ def _get_default_job_manifest_template() -> Dict[str, Any]: "generateName": "{{ name }}-", }, "spec": { + "backoffLimit": 0, "ttlSecondsAfterFinished": "{{ finished_job_ttl }}", "template": { "spec": { @@ -384,7 +387,16 @@ def _populate_generate_name_if_not_present(self): """Ensures that the generateName is present in the job manifest.""" manifest_generate_name = self.job_manifest["metadata"].get("generateName", "") has_placeholder = len(find_placeholders(manifest_generate_name)) > 0 - if not manifest_generate_name or has_placeholder: + # if name wasn't present during template rendering, generateName will be + # just a hyphen + manifest_generate_name_templated_with_empty_string = ( + manifest_generate_name == "-" + ) + if ( + not manifest_generate_name + or has_placeholder + or manifest_generate_name_templated_with_empty_string + ): generate_name = None if self.name: generate_name = _slugify_name(self.name) @@ -459,6 +471,13 @@ class KubernetesWorker(BaseWorker): type = "kubernetes" job_configuration = KubernetesWorkerJobConfiguration job_configuration_variables = KubernetesWorkerVariables + _description = ( + "Execute flow runs within jobs scheduled on a Kubernetes cluster. Requires a " + "Kubernetes cluster." + ) + _display_name = "Kubernetes" + _documentation_url = "https://prefecthq.github.io/prefect-kubernetes/worker/" + _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/1zrSeY8DZ1MJZs2BAyyyGk/20445025358491b8b72600b8f996125b/Kubernetes_logo_without_workmark.svg.png?h=250" # noqa async def run( self, @@ -480,7 +499,10 @@ async def run( KubernetesWorkerResult: A result object containing information about the final state of the flow run """ + logger = self.get_flow_run_logger(flow_run) + with self._get_configured_kubernetes_client(configuration) as client: + logger.info("Creating Kubernetes job...") job = await run_sync_in_worker_thread( self._create_job, configuration, client ) @@ -506,7 +528,7 @@ async def run( with events_replicator: status_code = await run_sync_in_worker_thread( - self._watch_job, job.metadata.name, configuration, client + self._watch_job, logger, job.metadata.name, configuration, client ) return KubernetesWorkerResult(identifier=pid, status_code=status_code) @@ -601,10 +623,23 @@ def _create_job( """ Creates a Kubernetes job from a job manifest. """ - with self._get_batch_client(client) as batch_client: - job = batch_client.create_namespaced_job( - configuration.namespace, configuration.job_manifest - ) + try: + with self._get_batch_client(client) as batch_client: + job = batch_client.create_namespaced_job( + configuration.namespace, configuration.job_manifest + ) + except kubernetes.client.exceptions.ApiException as exc: + # Parse the reason and message from the response if feasible + message = "" + if exc.reason: + message += ": " + exc.reason + if exc.body and "message" in exc.body: + message += ": " + exc.body["message"] + + raise InfrastructureError( + f"Unable to create Kubernetes job{message}" + ) from exc + return job @contextmanager @@ -680,6 +715,7 @@ def _get_cluster_uid(self, client: "ApiClient") -> str: def _watch_job( self, + logger: logging.Logger, job_name: str, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient", @@ -689,13 +725,13 @@ def _watch_job( Return the final status code of the first container. """ - self._logger.debug(f"Job {job_name!r}: Monitoring job...") + logger.debug(f"Job {job_name!r}: Monitoring job...") - job = self._get_job(job_name, configuration, client) + job = self._get_job(logger, job_name, configuration, client) if not job: return -1 - pod = self._get_job_pod(job_name, configuration, client) + pod = self._get_job_pod(logger, job_name, configuration, client) if not pod: return -1 @@ -728,7 +764,7 @@ def _watch_job( break except Exception: - self._logger.warning( + logger.warning( ( "Error occurred while streaming logs - " "Job will continue to run but logs will " @@ -749,7 +785,7 @@ def _watch_job( math.ceil(deadline - time.monotonic()) if deadline else None ) if deadline and remaining_time <= 0: - self._logger.error( + logger.error( f"Job {job_name!r}: Job did not complete within " f"timeout of {configuration.job_watch_timeout_seconds}s." ) @@ -769,24 +805,60 @@ def _watch_job( namespace=configuration.namespace, **timeout_seconds, ): - if event["object"].status.completion_time: + if event["type"] == "DELETED": + logger.error(f"Job {job_name!r}: Job has been deleted.") + completed = True + elif event["object"].status.completion_time: if not event["object"].status.succeeded: # Job failed, exit while loop and return pod exit code - self._logger.error(f"Job {job_name!r}: Job failed.") + logger.error(f"Job {job_name!r}: Job failed.") + completed = True + # Check if the job has reached its backoff limit + # and stop watching if it has + elif ( + event["object"].spec.backoff_limit is not None + and event["object"].status.failed is not None + and event["object"].status.failed + > event["object"].spec.backoff_limit + ): + logger.error(f"Job {job_name!r}: Job reached backoff limit.") completed = True + # If the job has no backoff limit, check if it has failed + # and stop watching if it has + elif ( + not event["object"].spec.backoff_limit + and event["object"].status.failed + ): + completed = True + + if completed: watch.stop() break with self._get_core_client(client) as core_client: - pod_status = core_client.read_namespaced_pod_status( - namespace=configuration.namespace, name=pod.metadata.name + # Get all pods for the job + pods = core_client.list_namespaced_pod( + namespace=configuration.namespace, label_selector=f"job-name={job_name}" + ) + # Get the status for only the most recently used pod + pods.items.sort( + key=lambda pod: pod.metadata.creation_timestamp, reverse=True + ) + most_recent_pod = pods.items[0] if pods.items else None + first_container_status = ( + most_recent_pod.status.container_statuses[0] + if most_recent_pod + else None ) - first_container_status = pod_status.status.container_statuses[0] + if not first_container_status: + logger.error(f"Job {job_name!r}: No pods found for job.") + return -1 return first_container_status.state.terminated.exit_code def _get_job( self, + logger: logging.Logger, job_id: str, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient", @@ -798,19 +870,20 @@ def _get_job( name=job_id, namespace=configuration.namespace ) except kubernetes.client.exceptions.ApiException: - self._logger.error(f"Job {job_id!r} was removed.", exc_info=True) + logger.error(f"Job {job_id!r} was removed.", exc_info=True) return None return job def _get_job_pod( self, + logger: logging.Logger, job_name: str, configuration: KubernetesWorkerJobConfiguration, client: "ApiClient", ) -> Optional["V1Pod"]: """Get the first running pod for a job.""" watch = kubernetes.watch.Watch() - self._logger.debug(f"Job {job_name!r}: Starting watch for pod start...") + logger.debug(f"Job {job_name!r}: Starting watch for pod start...") last_phase = None with self._get_core_client(client) as core_client: for event in watch.stream( @@ -821,7 +894,7 @@ def _get_job_pod( ): phase = event["object"].status.phase if phase != last_phase: - self._logger.info(f"Job {job_name!r}: Pod has status {phase!r}.") + logger.info(f"Job {job_name!r}: Pod has status {phase!r}.") if phase != "Pending": watch.stop() @@ -829,4 +902,4 @@ def _get_job_pod( last_phase = phase - self._logger.error(f"Job {job_name!r}: Pod never started.") + logger.error(f"Job {job_name!r}: Pod never started.") diff --git a/requirements.txt b/requirements.txt index 7dd7a64..f4647c5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -prefect>=2.10.7 +prefect>=2.10.9 kubernetes >= 24.2.0 \ No newline at end of file diff --git a/tests/test_worker.py b/tests/test_worker.py index 41350e9..7f4ae22 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,3 +1,5 @@ +import re +import uuid from contextlib import contextmanager from time import monotonic, sleep from unittest import mock @@ -13,7 +15,11 @@ from kubernetes.config import ConfigException from prefect.client.schemas import FlowRun from prefect.docker import get_prefect_image_name -from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound +from prefect.exceptions import ( + InfrastructureError, + InfrastructureNotAvailable, + InfrastructureNotFound, +) from prefect.server.schemas.core import Flow from prefect.server.schemas.responses import DeploymentResponse from prefect.settings import ( @@ -149,10 +155,11 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): "kind": "Job", "metadata": { "namespace": "default", - "generateName": "{{ name }}-", + "generateName": "-", "labels": {}, }, "spec": { + "backoffLimit": 0, "template": { "spec": { "parallelism": 1, @@ -165,7 +172,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): } ], } - } + }, }, }, cluster_config=None, @@ -207,6 +214,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): }, }, "spec": { + "backoffLimit": 0, "template": { "spec": { "parallelism": 1, @@ -235,7 +243,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): } ], } - } + }, }, }, cluster_config=None, @@ -284,6 +292,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): "generateName": "test-", }, "spec": { + "backoffLimit": 0, "ttlSecondsAfterFinished": 60, "template": { "spec": { @@ -348,6 +357,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): }, }, "spec": { + "backoffLimit": 0, "ttlSecondsAfterFinished": 60, "template": { "spec": { @@ -680,7 +690,7 @@ def _mock_pods_stream_that_returns_running_pod(*args, **kwargs): class TestKubernetesWorkerJobConfiguration: @pytest.fixture def flow_run(self): - return FlowRun(name="my-flow-run-name") + return FlowRun(flow_id=uuid.uuid4(), name="my-flow-run-name") @pytest.fixture def deployment(self): @@ -913,7 +923,7 @@ async def default_configuration(self): @pytest.fixture def flow_run(self): - return FlowRun(name="my-flow-run-name") + return FlowRun(flow_id=uuid.uuid4(), name="my-flow-run-name") async def test_creates_job_by_building_a_manifest( self, @@ -929,7 +939,10 @@ async def test_creates_job_by_building_a_manifest( async with KubernetesWorker(work_pool_name="test") as k8s_worker: await k8s_worker.run(flow_run=flow_run, configuration=default_configuration) - mock_core_client.read_namespaced_pod_status.assert_called_once() + mock_core_client.list_namespaced_pod.assert_called_with( + namespace=default_configuration.namespace, + label_selector="job-name=mock-job", + ) mock_batch_client.create_namespaced_job.assert_called_with( "default", @@ -1071,6 +1084,146 @@ async def test_uses_image_variable( ]["spec"]["containers"][0]["image"] assert image == "foo" + async def test_create_job_failure( + self, + flow_run, + mock_core_client, + mock_watch, + mock_batch_client, + ): + response = MagicMock() + response.data = { + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "message": 'jobs.batch is forbidden: User "system:serviceaccount:helm-test:prefect-worker-dev" cannot create resource "jobs" in API group "batch" in the namespace "prefect"', + "reason": "Forbidden", + "details": {"group": "batch", "kind": "jobs"}, + "code": 403, + } + response.status = 403 + response.reason = "Forbidden" + + mock_batch_client.create_namespaced_job.side_effect = ApiException( + http_resp=response + ) + + configuration = await KubernetesWorkerJobConfiguration.from_template_and_values( + KubernetesWorker.get_default_base_job_template(), {"image": "foo"} + ) + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + with pytest.raises( + InfrastructureError, + match=re.escape( + "Unable to create Kubernetes job: Forbidden: jobs.batch is forbidden: User " + '"system:serviceaccount:helm-test:prefect-worker-dev" cannot ' + 'create resource "jobs" in API group "batch" in the namespace ' + '"prefect"' + ), + ): + await k8s_worker.run(flow_run, configuration) + + async def test_create_job_failure_no_reason( + self, + flow_run, + mock_core_client, + mock_watch, + mock_batch_client, + ): + response = MagicMock() + response.data = { + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "message": 'jobs.batch is forbidden: User "system:serviceaccount:helm-test:prefect-worker-dev" cannot create resource "jobs" in API group "batch" in the namespace "prefect"', + "reason": "Forbidden", + "details": {"group": "batch", "kind": "jobs"}, + "code": 403, + } + response.status = 403 + response.reason = None + + mock_batch_client.create_namespaced_job.side_effect = ApiException( + http_resp=response + ) + + configuration = await KubernetesWorkerJobConfiguration.from_template_and_values( + KubernetesWorker.get_default_base_job_template(), {"image": "foo"} + ) + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + with pytest.raises( + InfrastructureError, + match=re.escape( + "Unable to create Kubernetes job: jobs.batch is forbidden: User " + '"system:serviceaccount:helm-test:prefect-worker-dev" cannot ' + 'create resource "jobs" in API group "batch" in the namespace ' + '"prefect"' + ), + ): + await k8s_worker.run(flow_run, configuration) + + async def test_create_job_failure_no_message( + self, + flow_run, + mock_core_client, + mock_watch, + mock_batch_client, + ): + response = MagicMock() + response.data = { + "kind": "Status", + "apiVersion": "v1", + "metadata": {}, + "status": "Failure", + "reason": "Forbidden", + "details": {"group": "batch", "kind": "jobs"}, + "code": 403, + } + response.status = 403 + response.reason = "Test" + + mock_batch_client.create_namespaced_job.side_effect = ApiException( + http_resp=response + ) + + configuration = await KubernetesWorkerJobConfiguration.from_template_and_values( + KubernetesWorker.get_default_base_job_template(), {"image": "foo"} + ) + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + with pytest.raises( + InfrastructureError, + match=re.escape("Unable to create Kubernetes job: Test"), + ): + await k8s_worker.run(flow_run, configuration) + + async def test_create_job_failure_no_response_body( + self, + flow_run, + mock_core_client, + mock_watch, + mock_batch_client, + ): + response = MagicMock() + response.data = None + response.status = 403 + response.reason = "Test" + + mock_batch_client.create_namespaced_job.side_effect = ApiException( + http_resp=response + ) + + configuration = await KubernetesWorkerJobConfiguration.from_template_and_values( + KubernetesWorker.get_default_base_job_template(), {"image": "foo"} + ) + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + with pytest.raises( + InfrastructureError, + match=re.escape("Unable to create Kubernetes job: Test"), + ): + await k8s_worker.run(flow_run, configuration) + async def test_allows_image_setting_from_manifest( self, default_configuration: KubernetesWorkerJobConfiguration, @@ -1622,7 +1775,9 @@ def mock_stream(*args, **kwargs): # Yield the completed job job.status.completion_time = True - yield {"object": job} + job.status.failed = 0 + job.spec.backoff_limit = 6 + yield {"object": job, "type": "ADDED"} def mock_log_stream(*args, **kwargs): anyio.sleep(500) @@ -1671,7 +1826,7 @@ def mock_stream(*args, **kwargs): if kwargs["func"] == mock_core_client.list_namespaced_pod: job_pod = MagicMock(spec=kubernetes.client.V1Pod) job_pod.status.phase = "Running" - yield {"object": job_pod} + yield {"object": job_pod, "type": "ADDED"} if kwargs["func"] == mock_batch_client.list_namespaced_job: job = MagicMock(spec=kubernetes.client.V1Job) @@ -1801,7 +1956,9 @@ def mock_stream(*args, **kwargs): # Yield the job then return exiting the stream job.status.completion_time = None - yield {"object": job} + job.status.failed = 0 + job.spec.backoff_limit = 6 + yield {"object": job, "type": "ADDED"} mock_watch.stream.side_effect = mock_stream default_configuration.job_watch_timeout_seconds = 40 @@ -1847,6 +2004,79 @@ def mock_stream(*args, **kwargs): ] ) + async def test_watch_stops_after_backoff_limit_reached( + self, + flow_run, + default_configuration, + mock_core_client, + mock_watch, + mock_batch_client, + ): + # The job should not be completed to start + mock_batch_client.read_namespaced_job.return_value.status.completion_time = None + job_pod = MagicMock(spec=kubernetes.client.V1Pod) + job_pod.status.phase = "Running" + mock_container_status = MagicMock(spec=kubernetes.client.V1ContainerStatus) + mock_container_status.state.terminated.exit_code = 137 + job_pod.status.container_statuses = [mock_container_status] + mock_core_client.list_namespaced_pod.return_value.items = [job_pod] + + def mock_stream(*args, **kwargs): + if kwargs["func"] == mock_core_client.list_namespaced_pod: + yield {"object": job_pod} + + if kwargs["func"] == mock_batch_client.list_namespaced_job: + job = MagicMock(spec=kubernetes.client.V1Job) + + # Yield the job then return exiting the stream + job.status.completion_time = None + job.spec.backoff_limit = 6 + for i in range(0, 8): + job.status.failed = i + yield {"object": job, "type": "ADDED"} + + mock_watch.stream.side_effect = mock_stream + + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + result = await k8s_worker.run(flow_run, default_configuration) + + assert result.status_code == 137 + + async def test_watch_handles_no_pod( + self, + flow_run, + default_configuration, + mock_core_client, + mock_watch, + mock_batch_client, + ): + # The job should not be completed to start + mock_batch_client.read_namespaced_job.return_value.status.completion_time = None + mock_core_client.list_namespaced_pod.return_value.items = [] + + def mock_stream(*args, **kwargs): + if kwargs["func"] == mock_core_client.list_namespaced_pod: + job_pod = MagicMock(spec=kubernetes.client.V1Pod) + job_pod.status.phase = "Running" + yield {"object": job_pod} + + if kwargs["func"] == mock_batch_client.list_namespaced_job: + job = MagicMock(spec=kubernetes.client.V1Job) + + # Yield the job then return exiting the stream + job.status.completion_time = None + job.spec.backoff_limit = 6 + for i in range(0, 8): + job.status.failed = i + yield {"object": job, "type": "ADDED"} + + mock_watch.stream.side_effect = mock_stream + + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + result = await k8s_worker.run(flow_run, default_configuration) + + assert result.status_code == -1 + class TestKillInfrastructure: async def test_kill_infrastructure_calls_delete_namespaced_job( self,