Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Jul 19, 2023
2 parents a7ce9e9 + b14ad8f commit b884656
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @PrefectHQ/integrations
* @PrefectHQ/open-source
2 changes: 1 addition & 1 deletion .github/workflows/add-to-project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
app_id: ${{ secrets.SYNC_APP_ID }}
private_key: ${{ secrets.SYNC_APP_PRIVATE_KEY }}

- uses: actions/add-to-project@v0.4.0
- uses: actions/add-to-project@v0.5.0
with:
project-url: ${{ secrets.ADD_TO_PROJECT_URL }}
github-token: ${{ steps.generate-token.outputs.token }}
2 changes: 1 addition & 1 deletion .github/workflows/kubernetes-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
helm repo update
- name: Create kind cluster
uses: helm/kind-action@v1.5.0
uses: helm/kind-action@v1.7.0
with:
cluster_name: kind
version: v0.18.0
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
mkdocs build
- name: Publish docs
uses: JamesIves/github-pages-deploy-action@v4.4.1
uses: JamesIves/github-pages-deploy-action@v4.4.2
with:
branch: docs
folder: site
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
mkdocs build
- name: Publish docs
uses: JamesIves/github-pages-deploy-action@v4.4.1
uses: JamesIves/github-pages-deploy-action@v4.4.2
with:
branch: docs
folder: site
2 changes: 1 addition & 1 deletion .github/workflows/template-sync.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
private_key: ${{ secrets.SYNC_APP_PRIVATE_KEY }}

- name: Submit PR
uses: peter-evans/create-pull-request@v4
uses: peter-evans/create-pull-request@v5
with:
commit-message: Updating collection with changes to prefect-collection-template
token: ${{ steps.generate-token.outputs.token }}
Expand Down
32 changes: 32 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,38 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Security

## 0.2.11

Released July 20th, 2023.

### Changed
- Promoted workers to GA, removed beta disclaimers

## 0.2.10

Released June 30th, 2023.

### Added

- Handling for rescheduled Kubernetes jobs - [#78](https://github.com/PrefectHQ/prefect-kubernetes/pull/78)

## 0.2.9

Released June 20th, 2023.

### Fixed

- Fixed issue where `generateName` was not populating correctly for some flow runs submitted by `KubernetesWorker` - [#76](https://github.com/PrefectHQ/prefect-kubernetes/pull/76)

## 0.2.8

Released May 25th, 2023.

### Changed

- Improve failure message when creating a Kubernetes job fails - [#71](https://github.com/PrefectHQ/prefect-kubernetes/pull/71)
- Stream Kubernetes Worker flow run logs to the API - [#72](https://github.com/PrefectHQ/prefect-kubernetes/pull/72)

## 0.2.7

Released May 4th, 2023.
Expand Down
121 changes: 97 additions & 24 deletions prefect_kubernetes/worker.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
"""
<span class="badge-api beta"/>
Module containing the Kubernetes worker used for executing flow runs as Kubernetes jobs.
Note this module is in **beta**. The interfaces within may change without notice.
To start a Kubernetes worker, run the following command:
```bash
Expand Down Expand Up @@ -88,6 +85,7 @@
checkout out the [Prefect docs](https://docs.prefect.io/concepts/work-pools/).
"""
import enum
import logging
import math
import os
import time
Expand All @@ -97,7 +95,11 @@
import anyio.abc
from prefect.blocks.kubernetes import KubernetesClusterConfig
from prefect.docker import get_prefect_image_name
from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound
from prefect.exceptions import (
InfrastructureError,
InfrastructureNotAvailable,
InfrastructureNotFound,
)
from prefect.server.schemas.core import Flow
from prefect.server.schemas.responses import DeploymentResponse
from prefect.utilities.asyncutils import run_sync_in_worker_thread
Expand Down Expand Up @@ -143,6 +145,7 @@ def _get_default_job_manifest_template() -> Dict[str, Any]:
"generateName": "{{ name }}-",
},
"spec": {
"backoffLimit": 0,
"ttlSecondsAfterFinished": "{{ finished_job_ttl }}",
"template": {
"spec": {
Expand Down Expand Up @@ -384,7 +387,16 @@ def _populate_generate_name_if_not_present(self):
"""Ensures that the generateName is present in the job manifest."""
manifest_generate_name = self.job_manifest["metadata"].get("generateName", "")
has_placeholder = len(find_placeholders(manifest_generate_name)) > 0
if not manifest_generate_name or has_placeholder:
# if name wasn't present during template rendering, generateName will be
# just a hyphen
manifest_generate_name_templated_with_empty_string = (
manifest_generate_name == "-"
)
if (
not manifest_generate_name
or has_placeholder
or manifest_generate_name_templated_with_empty_string
):
generate_name = None
if self.name:
generate_name = _slugify_name(self.name)
Expand Down Expand Up @@ -459,6 +471,13 @@ class KubernetesWorker(BaseWorker):
type = "kubernetes"
job_configuration = KubernetesWorkerJobConfiguration
job_configuration_variables = KubernetesWorkerVariables
_description = (
"Execute flow runs within jobs scheduled on a Kubernetes cluster. Requires a "
"Kubernetes cluster."
)
_display_name = "Kubernetes"
_documentation_url = "https://prefecthq.github.io/prefect-kubernetes/worker/"
_logo_url = "https://images.ctfassets.net/gm98wzqotmnx/1zrSeY8DZ1MJZs2BAyyyGk/20445025358491b8b72600b8f996125b/Kubernetes_logo_without_workmark.svg.png?h=250" # noqa

async def run(
self,
Expand All @@ -480,7 +499,10 @@ async def run(
KubernetesWorkerResult: A result object containing information about the
final state of the flow run
"""
logger = self.get_flow_run_logger(flow_run)

with self._get_configured_kubernetes_client(configuration) as client:
logger.info("Creating Kubernetes job...")
job = await run_sync_in_worker_thread(
self._create_job, configuration, client
)
Expand All @@ -506,7 +528,7 @@ async def run(

with events_replicator:
status_code = await run_sync_in_worker_thread(
self._watch_job, job.metadata.name, configuration, client
self._watch_job, logger, job.metadata.name, configuration, client
)
return KubernetesWorkerResult(identifier=pid, status_code=status_code)

Expand Down Expand Up @@ -601,10 +623,23 @@ def _create_job(
"""
Creates a Kubernetes job from a job manifest.
"""
with self._get_batch_client(client) as batch_client:
job = batch_client.create_namespaced_job(
configuration.namespace, configuration.job_manifest
)
try:
with self._get_batch_client(client) as batch_client:
job = batch_client.create_namespaced_job(
configuration.namespace, configuration.job_manifest
)
except kubernetes.client.exceptions.ApiException as exc:
# Parse the reason and message from the response if feasible
message = ""
if exc.reason:
message += ": " + exc.reason
if exc.body and "message" in exc.body:
message += ": " + exc.body["message"]

raise InfrastructureError(
f"Unable to create Kubernetes job{message}"
) from exc

return job

@contextmanager
Expand Down Expand Up @@ -680,6 +715,7 @@ def _get_cluster_uid(self, client: "ApiClient") -> str:

def _watch_job(
self,
logger: logging.Logger,
job_name: str,
configuration: KubernetesWorkerJobConfiguration,
client: "ApiClient",
Expand All @@ -689,13 +725,13 @@ def _watch_job(
Return the final status code of the first container.
"""
self._logger.debug(f"Job {job_name!r}: Monitoring job...")
logger.debug(f"Job {job_name!r}: Monitoring job...")

job = self._get_job(job_name, configuration, client)
job = self._get_job(logger, job_name, configuration, client)
if not job:
return -1

pod = self._get_job_pod(job_name, configuration, client)
pod = self._get_job_pod(logger, job_name, configuration, client)
if not pod:
return -1

Expand Down Expand Up @@ -728,7 +764,7 @@ def _watch_job(
break

except Exception:
self._logger.warning(
logger.warning(
(
"Error occurred while streaming logs - "
"Job will continue to run but logs will "
Expand All @@ -749,7 +785,7 @@ def _watch_job(
math.ceil(deadline - time.monotonic()) if deadline else None
)
if deadline and remaining_time <= 0:
self._logger.error(
logger.error(
f"Job {job_name!r}: Job did not complete within "
f"timeout of {configuration.job_watch_timeout_seconds}s."
)
Expand All @@ -769,24 +805,60 @@ def _watch_job(
namespace=configuration.namespace,
**timeout_seconds,
):
if event["object"].status.completion_time:
if event["type"] == "DELETED":
logger.error(f"Job {job_name!r}: Job has been deleted.")
completed = True
elif event["object"].status.completion_time:
if not event["object"].status.succeeded:
# Job failed, exit while loop and return pod exit code
self._logger.error(f"Job {job_name!r}: Job failed.")
logger.error(f"Job {job_name!r}: Job failed.")
completed = True
# Check if the job has reached its backoff limit
# and stop watching if it has
elif (
event["object"].spec.backoff_limit is not None
and event["object"].status.failed is not None
and event["object"].status.failed
> event["object"].spec.backoff_limit
):
logger.error(f"Job {job_name!r}: Job reached backoff limit.")
completed = True
# If the job has no backoff limit, check if it has failed
# and stop watching if it has
elif (
not event["object"].spec.backoff_limit
and event["object"].status.failed
):
completed = True

if completed:
watch.stop()
break

with self._get_core_client(client) as core_client:
pod_status = core_client.read_namespaced_pod_status(
namespace=configuration.namespace, name=pod.metadata.name
# Get all pods for the job
pods = core_client.list_namespaced_pod(
namespace=configuration.namespace, label_selector=f"job-name={job_name}"
)
# Get the status for only the most recently used pod
pods.items.sort(
key=lambda pod: pod.metadata.creation_timestamp, reverse=True
)
most_recent_pod = pods.items[0] if pods.items else None
first_container_status = (
most_recent_pod.status.container_statuses[0]
if most_recent_pod
else None
)
first_container_status = pod_status.status.container_statuses[0]
if not first_container_status:
logger.error(f"Job {job_name!r}: No pods found for job.")
return -1

return first_container_status.state.terminated.exit_code

def _get_job(
self,
logger: logging.Logger,
job_id: str,
configuration: KubernetesWorkerJobConfiguration,
client: "ApiClient",
Expand All @@ -798,19 +870,20 @@ def _get_job(
name=job_id, namespace=configuration.namespace
)
except kubernetes.client.exceptions.ApiException:
self._logger.error(f"Job {job_id!r} was removed.", exc_info=True)
logger.error(f"Job {job_id!r} was removed.", exc_info=True)
return None
return job

def _get_job_pod(
self,
logger: logging.Logger,
job_name: str,
configuration: KubernetesWorkerJobConfiguration,
client: "ApiClient",
) -> Optional["V1Pod"]:
"""Get the first running pod for a job."""
watch = kubernetes.watch.Watch()
self._logger.debug(f"Job {job_name!r}: Starting watch for pod start...")
logger.debug(f"Job {job_name!r}: Starting watch for pod start...")
last_phase = None
with self._get_core_client(client) as core_client:
for event in watch.stream(
Expand All @@ -821,12 +894,12 @@ def _get_job_pod(
):
phase = event["object"].status.phase
if phase != last_phase:
self._logger.info(f"Job {job_name!r}: Pod has status {phase!r}.")
logger.info(f"Job {job_name!r}: Pod has status {phase!r}.")

if phase != "Pending":
watch.stop()
return event["object"]

last_phase = phase

self._logger.error(f"Job {job_name!r}: Pod never started.")
logger.error(f"Job {job_name!r}: Pod never started.")
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
prefect>=2.10.7
prefect>=2.10.9
kubernetes >= 24.2.0
Loading

0 comments on commit b884656

Please sign in to comment.