Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace DaskJob pod with Kubernetes Job #745

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 33 additions & 27 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,40 +162,46 @@ def build_worker_deployment_spec(
return deployment_spec


def get_job_runner_pod_name(job_name):
def get_job_runner_job_name(job_name):
return f"{job_name}-runner"


def build_job_pod_spec(job_name, cluster_name, namespace, spec, annotations, labels):
def build_job_spec(job_name, cluster_name, namespace, spec, annotations, labels):
labels.update(
**{
"dask.org/cluster-name": cluster_name,
"dask.org/component": "job-runner",
"sidecar.istio.io/inject": "false",
}
)
pod_spec = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": get_job_runner_pod_name(job_name),
"labels": labels,
"annotations": annotations,
metadata = {
"name": get_job_runner_job_name(job_name),
"labels": labels,
"annotations": annotations,
}
job_spec = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": metadata,
"spec": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we probably want our spec to replace this, rather than the "spec" within "template".

That does mean we need to modify our CRDs to ref a JobSpec instead. That is a breaking change to our CRDs so we might need to think about how to bump the version there.

$ref: 'python://k8s_crd_resolver/schemata/k8s-1.21.1.json#/definitions/io.k8s.api.core.v1.PodSpec'

"template": {
"metadata": metadata,
"spec": spec,
},
},
"spec": spec,
}
env = [
{
"name": "DASK_SCHEDULER_ADDRESS",
"value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786",
},
]
for i in range(len(pod_spec["spec"]["containers"])):
if "env" in pod_spec["spec"]["containers"][i]:
pod_spec["spec"]["containers"][i]["env"].extend(env)
for i in range(len(job_spec["spec"]["template"]["spec"]["containers"])):
if "env" in job_spec["spec"]["template"]["spec"]["containers"][i]:
job_spec["spec"]["template"]["spec"]["containers"][i]["env"].extend(env)
else:
pod_spec["spec"]["containers"][i]["env"] = env
return pod_spec
job_spec["spec"]["template"]["spec"]["containers"][i]["env"] = env
return job_spec


def build_default_worker_group_spec(cluster_name, spec, annotations, labels):
Expand Down Expand Up @@ -737,28 +743,28 @@ async def daskjob_create_components(

labels = _get_labels(meta)
annotations = _get_annotations(meta)
job_spec = spec["job"]
if "metadata" in job_spec:
if "annotations" in job_spec["metadata"]:
annotations.update(**job_spec["metadata"]["annotations"])
if "labels" in job_spec["metadata"]:
labels.update(**job_spec["metadata"]["labels"])
job_pod_spec = build_job_pod_spec(
dask_job_spec = spec["job"]
if "metadata" in dask_job_spec:
if "annotations" in dask_job_spec["metadata"]:
annotations.update(**dask_job_spec["metadata"]["annotations"])
if "labels" in dask_job_spec["metadata"]:
labels.update(**dask_job_spec["metadata"]["labels"])
job_spec = build_job_spec(
job_name=name,
cluster_name=cluster_name,
namespace=namespace,
spec=job_spec["spec"],
spec=dask_job_spec["spec"],
annotations=annotations,
labels=labels,
)
kopf.adopt(job_pod_spec)
await corev1api.create_namespaced_pod(
kopf.adopt(job_spec)
await kubernetes.client.BatchV1Api(api_client).create_namespaced_job(
namespace=namespace,
body=job_pod_spec,
body=job_spec,
)
patch.status["clusterName"] = cluster_name
patch.status["jobStatus"] = "ClusterCreated"
patch.status["jobRunnerPodName"] = get_job_runner_pod_name(name)
patch.status["jobRunnerPodName"] = get_job_runner_job_name(name)


@kopf.on.field(
Expand Down
34 changes: 24 additions & 10 deletions dask_kubernetes/operator/controller/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from kr8s.asyncio.objects import Pod, Deployment, Service
from dask_kubernetes.operator.controller import (
KUBERNETES_DATETIME_FORMAT,
get_job_runner_pod_name,
get_job_runner_job_name,
)
from dask_kubernetes.operator._objects import DaskCluster, DaskWorkerGroup, DaskJob

Expand Down Expand Up @@ -425,21 +425,21 @@ def _assert_job_status_created(job_status):
def _assert_job_status_cluster_created(job, job_status):
assert "jobStatus" in job_status
assert job_status["clusterName"] == job
assert job_status["jobRunnerPodName"] == get_job_runner_pod_name(job)
assert job_status["jobRunnerPodName"] == get_job_runner_job_name(job)


def _assert_job_status_running(job, job_status):
assert "jobStatus" in job_status
assert job_status["clusterName"] == job
assert job_status["jobRunnerPodName"] == get_job_runner_pod_name(job)
assert job_status["jobRunnerPodName"] == get_job_runner_job_name(job)
start_time = datetime.strptime(job_status["startTime"], KUBERNETES_DATETIME_FORMAT)
assert datetime.utcnow() > start_time > (datetime.utcnow() - timedelta(seconds=10))


def _assert_final_job_status(job, job_status, expected_status):
assert job_status["jobStatus"] == expected_status
assert job_status["clusterName"] == job
assert job_status["jobRunnerPodName"] == get_job_runner_pod_name(job)
assert job_status["jobRunnerPodName"] == get_job_runner_job_name(job)
start_time = datetime.strptime(job_status["startTime"], KUBERNETES_DATETIME_FORMAT)
assert datetime.utcnow() > start_time > (datetime.utcnow() - timedelta(minutes=1))
end_time = datetime.strptime(job_status["endTime"], KUBERNETES_DATETIME_FORMAT)
Expand All @@ -459,8 +459,6 @@ async def test_job(k8s_cluster, kopf_runner, gen_job):
async with gen_job("simplejob.yaml") as (job, ns):
assert job

runner_name = f"{job}-runner"

# Assert that job was created
while job not in k8s_cluster.kubectl(
"get", "daskjobs.kubernetes.dask.org", "-n", ns
Expand All @@ -480,6 +478,10 @@ async def test_job(k8s_cluster, kopf_runner, gen_job):
job_status = _get_job_status(k8s_cluster, ns)
_assert_job_status_cluster_created(job, job_status)

# Assert job is created
while job not in k8s_cluster.kubectl("get", "jobs", "-n", ns):
await asyncio.sleep(0.1)

# Assert job pod is created
while job not in k8s_cluster.kubectl("get", "po", "-n", ns):
await asyncio.sleep(0.1)
Expand Down Expand Up @@ -507,7 +509,12 @@ async def test_job(k8s_cluster, kopf_runner, gen_job):

# Assert job pod runs to completion (will fail if doesn't connect to cluster)
while "Completed" not in k8s_cluster.kubectl(
"get", "-n", ns, "po", runner_name
"get",
"-n",
ns,
"po",
"-l",
"dask.org/component=job-runner",
):
await asyncio.sleep(0.1)

Expand All @@ -530,8 +537,6 @@ async def test_failed_job(k8s_cluster, kopf_runner, gen_job):
async with gen_job("failedjob.yaml") as (job, ns):
assert job

runner_name = f"{job}-runner"

# Assert that job was created
while job not in k8s_cluster.kubectl(
"get", "daskjobs.kubernetes.dask.org", "-n", ns
Expand All @@ -551,6 +556,10 @@ async def test_failed_job(k8s_cluster, kopf_runner, gen_job):
job_status = _get_job_status(k8s_cluster, ns)
_assert_job_status_cluster_created(job, job_status)

# Assert job is created
while job not in k8s_cluster.kubectl("get", "jobs", "-n", ns):
await asyncio.sleep(0.1)

# Assert job pod is created
while job not in k8s_cluster.kubectl("get", "po", "-n", ns):
await asyncio.sleep(0.1)
Expand All @@ -565,7 +574,12 @@ async def test_failed_job(k8s_cluster, kopf_runner, gen_job):

# Assert job pod runs to failure
while "Error" not in k8s_cluster.kubectl(
"get", "po", "-n", ns, runner_name
"get",
"po",
"-n",
ns,
"-l",
"dask.org/component=job-runner",
):
await asyncio.sleep(0.1)

Expand Down