-
Notifications
You must be signed in to change notification settings - Fork 781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[@parallel on Kubernetes] support for Jobsets #1804
Conversation
302ae6a
to
93f8a11
Compare
num_parallel = None | ||
if hasattr(flow, "_parallel_ubf_iter"): | ||
num_parallel = flow._parallel_ubf_iter.num_parallel | ||
|
||
if num_parallel and num_parallel >= 1 and ubf_context == UBF_CONTROL: | ||
control_task_id, worker_task_ids = TaskIdConstructor.join_step_task_ids( | ||
num_parallel | ||
) | ||
mapper_task_ids = [control_task_id] + worker_task_ids | ||
flow._control_mapper_tasks = [ | ||
"%s/%s/%s" % (run_id, step_name, mapper_task_id) | ||
for mapper_task_id in mapper_task_ids | ||
] | ||
flow._control_task_is_mapper_zero = True | ||
|
||
if num_parallel and num_parallel > 1: | ||
_setup_multinode_environment() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed so that Join steps has all the relevant task-ids.
def create_job_spec(self): | ||
client = self._client.get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has to create a sub-function called create_job_spec
so we could reuse the jobspec created for K8s jobs and plug that in directly into Jobsets.
return overall_status, control_exit_code, control_pod_failed | ||
|
||
|
||
def _construct_jobset_logical_status(jobset, control_pod=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Main function which helps compute the logical status of the jobset.
) | ||
|
||
|
||
class RunningJobSet(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interface similar to RunningJob
so that the runtime process can monitor the jobset.
).jobset_failed | ||
|
||
|
||
class TaskIdConstructor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This helps constructing all the task-ids from one place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent! Thanks for this implementation. Otherwise, it could get hard figuring out how/where the task ids get constructed.
) | ||
|
||
|
||
class KubernetesJobSet(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It creates one control and one worker definition. The workers will have replicas set to num_parallel-1
. All workers and control will leverage the "jobspec" created by the KubernetesJob
interface.
@@ -4,7 +4,7 @@ | |||
|
|||
from metaflow.exception import MetaflowException | |||
|
|||
from .kubernetes_job import KubernetesJob | |||
from .kubernetes_job import KubernetesJob, KubernetesJobSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need this import for Kubernetes clients which maybe getting used via extensions.
) | ||
|
||
|
||
def _basic_validation_for_js(jobset): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
93f8a11
to
a25f980
Compare
control_pod_status=None, | ||
worker_pods_failed=False, | ||
control_pod_failed=False, | ||
some_jobs_are_running=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some_jobs_are_running
helps derive if something is running or not.
self._group = KUBERNETES_JOBSET_GROUP | ||
self._version = KUBERNETES_JOBSET_VERSION |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making this configurable so that we can have code paths that deal with different GROUP /VERSIONS based on what they are set in the config.
num_parallel=num_parallel, | ||
namespace=namespace, | ||
) | ||
worker_task_id = TaskIdConstructor.jobset_worker_id(task_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single place where we construct the Task-id
1bc4c17
to
8ca8cd3
Compare
8ca8cd3
to
41bf331
Compare
if num_parallel is not None and num_parallel <= 1: | ||
raise KubernetesException( | ||
"Using @parallel with `num_parallel` <= 1 is not supported with Kubernetes. " | ||
"Please set the value of `num_parallel` to be greater than 1." | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
constraint added because jobset doesn't play nice with replicas = 0
; Once kubernetes-sigs/jobset
allow this, we can lift this constraint and add version logic to verify if the jobset can be submitted or not. Currently not supported with Jobset CRD version jobset.x-k8s.io/v1alpha2
Implementation originates from [Netflix#1744] This commit adds support for @parallel when flows are run `--with kubernetes` Support for Argo workflows will follow in a separate commit. A user can run a flow with the following: @step def start(self): self.next(self.parallel_step, num_parallel=3) @kubernetes(cpu=1, memory=512) @parallel @step def parallel_step(self): ... Some notes about the implementation: - No annotations for task-id in pods since We cannot dynamically construct the task-id during K8s container runtime. - @catch is currently not supported with @parallel on kubernetes - metadata about jobset name exists in the task-metadata - The jobset will contain two job definitions; One for control and one for worker. - The worker will have n-1 replicas created. - We construct the worker task-id determininstically using naming conventions and shell hacking. - Jobset is considered running even if one job amongst all of them are running. - @Retry will work with jobset - num_parallel <=1 will NOT be supported to start with; - One core reason is that jobsets don't allow setting replicas to 0; - jobsets controller will mutate a jobset with replica set to 0 with replicas set to 1. - The implementation accounts for Jobset CRD schema from v0.2.0 - Jobset team changed the schema (just renaming values) after v0.3.0 - The changes were to `replicatedJobsStatus` where certain fields were added and `ReplicatedJobsStatus` was renamed to `replicatedJobsStatus`
41bf331
to
a8207c7
Compare
def _retrieve_replicated_job_statuses(jobset): | ||
# We needed this abstraction because Jobsets changed thier schema | ||
# in version v0.3.0 where `ReplicatedJobsStatus` became `replicatedJobsStatus` | ||
# So to handle users having an older version of jobsets, we need to account | ||
# for both the schemas. | ||
if jobset.get("status", {}).get("replicatedJobsStatus", None): | ||
return jobset.get("status").get("replicatedJobsStatus") | ||
elif jobset.get("status", {}).get("ReplicatedJobsStatus", None): | ||
return jobset.get("status").get("ReplicatedJobsStatus") | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed to add a conditional block to swallow the complexity for the user when the jobset controller may be of a version < 0.3.0. This is because jobset's schema had changed from v0.3.0. No logical changes to the schema, only additions and renaming of values like ReplicatedJobsStatus
to replicatedJobsStatus
if "suspended" in job_status: | ||
# `replicatedJobStatus` didn't have `suspend` field | ||
# until v0.3.0. So we need to account for that. | ||
workers_are_suspended = job_status["suspended"] > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suspend
was also not a supported keyword in Jobset for version v0.2.0; This is why we need a conditional check over here.
Implementation originates from [#1744] but now supersedes the PR.
This commit adds support for @parallel when flows are run
--with kubernetes
Support for Argo workflows will follow in a separate commit.A user can run a flow with the following: