Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: us-docker.pkg.dev/cloud-tpu-v2-images/pathways-job/pathwaysjob-controller
newTag: v0.1.2
newName: us-docker.pkg.dev/cloud-tpu-multipod-dev/pathways/controller
newTag: akshu-ssi-test-26
47 changes: 35 additions & 12 deletions internal/controller/pathwaysjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,16 @@ func (r *PathwaysJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)
pw := &pathwaysjob.PathwaysJob{}
log := ctrl.LoggerFrom(ctx).WithValues("pathwaysjob", klog.KObj(pw))
ctx = ctrl.LoggerInto(ctx, log)
log.Info("PathwaysJob: findme req: ", "req", req)

log.Info("PathwaysJob: CONTROLLER WORKING...", "req.NamespacedName", req.NamespacedName.String(), "req.Namespace", req.Namespace)
log.Info("PathwaysJob: CONTROLLER WORKING... findme req.NamespacedName:", "req.NamespacedName", req.NamespacedName.String(), "req.Namespace", req.Namespace)

// 1. Fetch the Pathways object
if err := r.Get(ctx, req.NamespacedName, pw); err != nil {
log.Info("PathwaysJob: Unable to fetch Pathways ")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log.Info("PathwaysJob: findme fetched pw: ", "pw", pw)

// 2. Process the Pathways object and build a JobSet client
kubeconfig := ctrl.GetConfigOrDie()
Expand Down Expand Up @@ -218,11 +220,15 @@ func (r *PathwaysJobReconciler) createJobSet(ctx context.Context, pw *pathwaysjo
workerJob, _ := MakeWorkerJob(ctx, pw)
successPolicy := MakeSuccessPolicy(pw)

log.Info("PathwaysJob: findme dummy log")
log.Info("PathwaysJob: findme meta:", "meta", pw.GetObjectMeta())
log.Info("PathwaysJob: findme annotations1: ", "anno", pw.GetObjectMeta().GetAnnotations())
log.Info("PathwaysJob: findme labels1: ", "labels", pw.GetObjectMeta().GetLabels())
mainJobSetConfig := jobsetv1alpha2.JobSet{
ObjectMeta: metav1.ObjectMeta{
Name: pw.GetName(),
Namespace: pw.GetNamespace(),
Labels: pw.GetObjectMeta().GetLabels(),
Name: pw.GetName(),
Namespace: pw.GetNamespace(),
Labels: pw.GetObjectMeta().GetLabels(),
Annotations: pw.GetObjectMeta().GetAnnotations(),
},
Spec: jobsetv1alpha2.JobSetSpec{
Expand All @@ -237,6 +243,7 @@ func (r *PathwaysJobReconciler) createJobSet(ctx context.Context, pw *pathwaysjo
ReplicatedJobs: []jobsetv1alpha2.ReplicatedJob{job, workerJob},
},
}
log.Info("PathwaysJob: findme mainJobSetConfig:", "mainJobSetConfig", mainJobSetConfig)

// Set Pathways controller as the owner of the JobSet for garbage collection.
if err := ctrl.SetControllerReference(pw, &mainJobSetConfig, r.Scheme); err != nil {
Expand Down Expand Up @@ -821,7 +828,8 @@ func MakeTolerationToAllowSchedulingOnTPU(pw *pathwaysjob.PathwaysJob) []corev1.
}
}

func MakePathwaysHeadPodSpec(pw *pathwaysjob.PathwaysJob) *corev1.PodSpec {
func MakePathwaysHeadPodSpec(ctx context.Context, pw *pathwaysjob.PathwaysJob) *corev1.PodSpec {
log := ctrl.LoggerFrom(ctx)
var pathwaysHeadPodSpec *corev1.PodSpec
if isUserPodProvided(pw) {
// Inject Pathways RM and proxy into the user provided pod spec
Expand Down Expand Up @@ -849,13 +857,19 @@ func MakePathwaysHeadPodSpec(pw *pathwaysjob.PathwaysJob) *corev1.PodSpec {
Containers: containerList,
} // end PodSpec
}
// The user pod template can have its own annotations.
// We should merge them with the annotations from the PathwaysJob.
// The user pod template can have its own annotations and labels.
// We should merge them with the annotations and labels from the PathwaysJob.
if isUserPodProvided(pw) && pw.Spec.Controller.UserPodTemplate.Annotations != nil {
for k, v := range pw.GetObjectMeta().GetAnnotations() {
pw.Spec.Controller.UserPodTemplate.Annotations[k] = v
}
}
if isUserPodProvided(pw) && pw.Spec.Controller.UserPodTemplate.Labels != nil {
log.Info("PathwaysJob: findme labels2: ", "labels", pw.GetObjectMeta().GetLabels())
for k, v := range pw.GetObjectMeta().GetLabels() {
pw.Spec.Controller.UserPodTemplate.Labels[k] = v
}
}
return pathwaysHeadPodSpec
}

Expand All @@ -880,7 +894,9 @@ func injectJAXBackendTargetIntoMainContainer(pw *pathwaysjob.PathwaysJob, pathwa

}

func MakePathwaysHeadReplicatedJob(pw *pathwaysjob.PathwaysJob, pathwaysHeadPodSpec corev1.PodSpec) jobsetv1alpha2.ReplicatedJob {
func MakePathwaysHeadReplicatedJob(ctx context.Context, pw *pathwaysjob.PathwaysJob, pathwaysHeadPodSpec corev1.PodSpec) jobsetv1alpha2.ReplicatedJob {
log := ctrl.LoggerFrom(ctx)
ctx = ctrl.LoggerInto(ctx, log)
var annotations map[string]string
// Start with annotations from the PathwaysJob.
annotations = make(map[string]string)
Expand All @@ -892,11 +908,17 @@ func MakePathwaysHeadReplicatedJob(pw *pathwaysjob.PathwaysJob, pathwaysHeadPodS
// needed so that head pods are placed exclusively on CPU nodes.
annotations["alpha.jobset.sigs.k8s.io/exclusive-topology"] = "kubernetes.io/hostname"
}
log.Info("PathwaysJob: findme labels3: ", "labels", pw.GetObjectMeta().GetLabels())
podLabels := make(map[string]string)
for k, v := range pw.GetObjectMeta().GetLabels() {
podLabels[k] = v
}
pathwaysHeadJob := jobsetv1alpha2.ReplicatedJob{
Name: PathwaysHeadJobName,
Replicas: 1,
Template: batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
Annotations: annotations,
},
Spec: batchv1.JobSpec{
Expand All @@ -906,6 +928,7 @@ func MakePathwaysHeadReplicatedJob(pw *pathwaysjob.PathwaysJob, pathwaysHeadPodS
Template: corev1.PodTemplateSpec{
Spec: pathwaysHeadPodSpec,
ObjectMeta: metav1.ObjectMeta{
Labels: podLabels,
Annotations: annotations,
},
},
Expand All @@ -918,19 +941,19 @@ func MakePathwaysHeadReplicatedJob(pw *pathwaysjob.PathwaysJob, pathwaysHeadPodS
// Construct pathways-head replicated job containing Pathways RM, Pathways Proxy and the user job containers for the 'colocate' deployment mode.
// In the colocate_head_with_workers mode, the Pathways head pod is placed on TPU nodes, beside a worker pod.
func MakePathwaysHeadJobForColocateHeadWithWorkersDeployment(ctx context.Context, pw *pathwaysjob.PathwaysJob) (jobsetv1alpha2.ReplicatedJob, error) {
podSpec := *MakePathwaysHeadPodSpec(pw)
podSpec := *MakePathwaysHeadPodSpec(ctx, pw)
// Add affinity and tolerations to allow the Pathways head pod to be scheduled on TPUs.
affinitySpec, _ := MakePodAffinityRules(pw)
tolerations := MakeTolerationToAllowSchedulingOnTPU(pw)
podSpec.Affinity = affinitySpec
podSpec.Tolerations = tolerations

return MakePathwaysHeadReplicatedJob(pw, podSpec), nil
return MakePathwaysHeadReplicatedJob(ctx, pw, podSpec), nil
}

// Construct pathways-head replicated job containing Pathways RM, Pathways Proxy and the user job containers for the 'default' deployment mode.
// In the default mode, the Pathways head pod is placed on CPU nodes.
func MakePathwaysHeadJobForDefaultDeployment(ctx context.Context, pw *pathwaysjob.PathwaysJob) (jobsetv1alpha2.ReplicatedJob, error) {
podSpec := *MakePathwaysHeadPodSpec(pw)
return MakePathwaysHeadReplicatedJob(pw, podSpec), nil
podSpec := *MakePathwaysHeadPodSpec(ctx, pw)
return MakePathwaysHeadReplicatedJob(ctx, pw, podSpec), nil
}
16 changes: 12 additions & 4 deletions internal/controller/pathwaysjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ var _ = Describe("PathwaysJob Controller", func() {
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Namespace: "default",
Labels: map[string]string{
"test-label": "test-label-value",
},
Annotations: map[string]string{
"test-annotation": "test-value",
"test-annotation": "test-annotation-value",
},
},
Spec: pathwaysjobv1.PathwaysJobSpec{
Expand Down Expand Up @@ -108,10 +111,15 @@ var _ = Describe("PathwaysJob Controller", func() {
Expect(jobSet.Spec.ReplicatedJobs[0].Name).To(Equal(PathwaysHeadJobName))
Expect(jobSet.Spec.ReplicatedJobs[1].Name).To(Equal("worker"))

By("Checking if annotations were propagated to JobSet")
Expect(jobSet.Annotations).To(HaveKeyWithValue("test-annotation", "test-value"))
headJobTemplate := jobSet.Spec.ReplicatedJobs[0].Template
Expect(headJobTemplate.Annotations).To(HaveKeyWithValue("test-annotation", "test-value"))

By("Checking if labels were propagated to JobSet")
Expect(jobSet.Labels).To(HaveKeyWithValue("test-label", "test-label-value"))
Expect(headJobTemplate.Labels).To(HaveKeyWithValue("test-label", "test-label-value"))

By("Checking if annotations were propagated to JobSet")
Expect(jobSet.Annotations).To(HaveKeyWithValue("test-annotation", "test-annotation-value"))
Expect(headJobTemplate.Annotations).To(HaveKeyWithValue("test-annotation", "test-annotation-value"))
})
})
})
33 changes: 33 additions & 0 deletions test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: pathways-job.pathways.domain/v1
kind: PathwaysJob
metadata:
name: pw-akshu-l6
spec:
maxRestarts: 1
workers:
- type: ct4p-hightpu-4t
topology: 2x2x1
numSlices: 2
pathwaysDir: "gs://akshu-v4"
controller:
mainContainerName: main-workload
deploymentMode: default
template:
metadata:
labels:
team: "pw"
annotations:
gke-gcsfuse/volumes: "true"
spec:
containers:
- name: main-workload
image: python:3.13
imagePullPolicy: Always
command:
- /bin/sh
- -c
- |
pip install --upgrade pip
pip install -U --pre jax jaxlib -f https://storage.googleapis.com/jax-releases/jax_nightly_releases.html
pip install pathwaysutils
python -c "import jax; import pathwaysutils; pathwaysutils.initialize(); print(\"Number of JAX devices is\", len(jax.devices()))"