diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 9b986dc0..e0fc4d4b 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -18,5 +18,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: us-docker.pkg.dev/cloud-tpu-v2-images/pathways-job/pathwaysjob-controller - newTag: v0.1.2 + newName: us-docker.pkg.dev/cloud-tpu-multipod-dev/pathways/controller + newTag: akshu-ssi-test-26 diff --git a/internal/controller/pathwaysjob_controller.go b/internal/controller/pathwaysjob_controller.go index 09abf74f..f24eb0ad 100644 --- a/internal/controller/pathwaysjob_controller.go +++ b/internal/controller/pathwaysjob_controller.go @@ -142,14 +142,16 @@ func (r *PathwaysJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) pw := &pathwaysjob.PathwaysJob{} log := ctrl.LoggerFrom(ctx).WithValues("pathwaysjob", klog.KObj(pw)) ctx = ctrl.LoggerInto(ctx, log) + log.Info("PathwaysJob: findme req: ", "req", req) - log.Info("PathwaysJob: CONTROLLER WORKING...", "req.NamespacedName", req.NamespacedName.String(), "req.Namespace", req.Namespace) + log.Info("PathwaysJob: CONTROLLER WORKING... findme req.NamespacedName:", "req.NamespacedName", req.NamespacedName.String(), "req.Namespace", req.Namespace) // 1. Fetch the Pathways object if err := r.Get(ctx, req.NamespacedName, pw); err != nil { log.Info("PathwaysJob: Unable to fetch Pathways ") return ctrl.Result{}, client.IgnoreNotFound(err) } + log.Info("PathwaysJob: findme fetched pw: ", "pw", pw) // 2. Process the Pathways object and build a JobSet client kubeconfig := ctrl.GetConfigOrDie() @@ -218,11 +220,15 @@ func (r *PathwaysJobReconciler) createJobSet(ctx context.Context, pw *pathwaysjo workerJob, _ := MakeWorkerJob(ctx, pw) successPolicy := MakeSuccessPolicy(pw) + log.Info("PathwaysJob: findme dummy log") + log.Info("PathwaysJob: findme meta:", "meta", pw.GetObjectMeta()) + log.Info("PathwaysJob: findme annotations1: ", "anno", pw.GetObjectMeta().GetAnnotations()) + log.Info("PathwaysJob: findme labels1: ", "labels", pw.GetObjectMeta().GetLabels()) mainJobSetConfig := jobsetv1alpha2.JobSet{ ObjectMeta: metav1.ObjectMeta{ - Name: pw.GetName(), - Namespace: pw.GetNamespace(), - Labels: pw.GetObjectMeta().GetLabels(), + Name: pw.GetName(), + Namespace: pw.GetNamespace(), + Labels: pw.GetObjectMeta().GetLabels(), Annotations: pw.GetObjectMeta().GetAnnotations(), }, Spec: jobsetv1alpha2.JobSetSpec{ @@ -237,6 +243,7 @@ func (r *PathwaysJobReconciler) createJobSet(ctx context.Context, pw *pathwaysjo ReplicatedJobs: []jobsetv1alpha2.ReplicatedJob{job, workerJob}, }, } + log.Info("PathwaysJob: findme mainJobSetConfig:", "mainJobSetConfig", mainJobSetConfig) // Set Pathways controller as the owner of the JobSet for garbage collection. if err := ctrl.SetControllerReference(pw, &mainJobSetConfig, r.Scheme); err != nil { @@ -821,7 +828,8 @@ func MakeTolerationToAllowSchedulingOnTPU(pw *pathwaysjob.PathwaysJob) []corev1. } } -func MakePathwaysHeadPodSpec(pw *pathwaysjob.PathwaysJob) *corev1.PodSpec { +func MakePathwaysHeadPodSpec(ctx context.Context, pw *pathwaysjob.PathwaysJob) *corev1.PodSpec { + log := ctrl.LoggerFrom(ctx) var pathwaysHeadPodSpec *corev1.PodSpec if isUserPodProvided(pw) { // Inject Pathways RM and proxy into the user provided pod spec @@ -849,13 +857,19 @@ func MakePathwaysHeadPodSpec(pw *pathwaysjob.PathwaysJob) *corev1.PodSpec { Containers: containerList, } // end PodSpec } - // The user pod template can have its own annotations. - // We should merge them with the annotations from the PathwaysJob. + // The user pod template can have its own annotations and labels. + // We should merge them with the annotations and labels from the PathwaysJob. if isUserPodProvided(pw) && pw.Spec.Controller.UserPodTemplate.Annotations != nil { for k, v := range pw.GetObjectMeta().GetAnnotations() { pw.Spec.Controller.UserPodTemplate.Annotations[k] = v } } + if isUserPodProvided(pw) && pw.Spec.Controller.UserPodTemplate.Labels != nil { + log.Info("PathwaysJob: findme labels2: ", "labels", pw.GetObjectMeta().GetLabels()) + for k, v := range pw.GetObjectMeta().GetLabels() { + pw.Spec.Controller.UserPodTemplate.Labels[k] = v + } + } return pathwaysHeadPodSpec } @@ -880,7 +894,9 @@ func injectJAXBackendTargetIntoMainContainer(pw *pathwaysjob.PathwaysJob, pathwa } -func MakePathwaysHeadReplicatedJob(pw *pathwaysjob.PathwaysJob, pathwaysHeadPodSpec corev1.PodSpec) jobsetv1alpha2.ReplicatedJob { +func MakePathwaysHeadReplicatedJob(ctx context.Context, pw *pathwaysjob.PathwaysJob, pathwaysHeadPodSpec corev1.PodSpec) jobsetv1alpha2.ReplicatedJob { + log := ctrl.LoggerFrom(ctx) + ctx = ctrl.LoggerInto(ctx, log) var annotations map[string]string // Start with annotations from the PathwaysJob. annotations = make(map[string]string) @@ -892,11 +908,17 @@ func MakePathwaysHeadReplicatedJob(pw *pathwaysjob.PathwaysJob, pathwaysHeadPodS // needed so that head pods are placed exclusively on CPU nodes. annotations["alpha.jobset.sigs.k8s.io/exclusive-topology"] = "kubernetes.io/hostname" } + log.Info("PathwaysJob: findme labels3: ", "labels", pw.GetObjectMeta().GetLabels()) + podLabels := make(map[string]string) + for k, v := range pw.GetObjectMeta().GetLabels() { + podLabels[k] = v + } pathwaysHeadJob := jobsetv1alpha2.ReplicatedJob{ Name: PathwaysHeadJobName, Replicas: 1, Template: batchv1.JobTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ + Labels: podLabels, Annotations: annotations, }, Spec: batchv1.JobSpec{ @@ -906,6 +928,7 @@ func MakePathwaysHeadReplicatedJob(pw *pathwaysjob.PathwaysJob, pathwaysHeadPodS Template: corev1.PodTemplateSpec{ Spec: pathwaysHeadPodSpec, ObjectMeta: metav1.ObjectMeta{ + Labels: podLabels, Annotations: annotations, }, }, @@ -918,19 +941,19 @@ func MakePathwaysHeadReplicatedJob(pw *pathwaysjob.PathwaysJob, pathwaysHeadPodS // Construct pathways-head replicated job containing Pathways RM, Pathways Proxy and the user job containers for the 'colocate' deployment mode. // In the colocate_head_with_workers mode, the Pathways head pod is placed on TPU nodes, beside a worker pod. func MakePathwaysHeadJobForColocateHeadWithWorkersDeployment(ctx context.Context, pw *pathwaysjob.PathwaysJob) (jobsetv1alpha2.ReplicatedJob, error) { - podSpec := *MakePathwaysHeadPodSpec(pw) + podSpec := *MakePathwaysHeadPodSpec(ctx, pw) // Add affinity and tolerations to allow the Pathways head pod to be scheduled on TPUs. affinitySpec, _ := MakePodAffinityRules(pw) tolerations := MakeTolerationToAllowSchedulingOnTPU(pw) podSpec.Affinity = affinitySpec podSpec.Tolerations = tolerations - return MakePathwaysHeadReplicatedJob(pw, podSpec), nil + return MakePathwaysHeadReplicatedJob(ctx, pw, podSpec), nil } // Construct pathways-head replicated job containing Pathways RM, Pathways Proxy and the user job containers for the 'default' deployment mode. // In the default mode, the Pathways head pod is placed on CPU nodes. func MakePathwaysHeadJobForDefaultDeployment(ctx context.Context, pw *pathwaysjob.PathwaysJob) (jobsetv1alpha2.ReplicatedJob, error) { - podSpec := *MakePathwaysHeadPodSpec(pw) - return MakePathwaysHeadReplicatedJob(pw, podSpec), nil + podSpec := *MakePathwaysHeadPodSpec(ctx, pw) + return MakePathwaysHeadReplicatedJob(ctx, pw, podSpec), nil } diff --git a/internal/controller/pathwaysjob_controller_test.go b/internal/controller/pathwaysjob_controller_test.go index 95c0cf18..7c0406a6 100644 --- a/internal/controller/pathwaysjob_controller_test.go +++ b/internal/controller/pathwaysjob_controller_test.go @@ -51,8 +51,11 @@ var _ = Describe("PathwaysJob Controller", func() { ObjectMeta: metav1.ObjectMeta{ Name: resourceName, Namespace: "default", + Labels: map[string]string{ + "test-label": "test-label-value", + }, Annotations: map[string]string{ - "test-annotation": "test-value", + "test-annotation": "test-annotation-value", }, }, Spec: pathwaysjobv1.PathwaysJobSpec{ @@ -108,10 +111,15 @@ var _ = Describe("PathwaysJob Controller", func() { Expect(jobSet.Spec.ReplicatedJobs[0].Name).To(Equal(PathwaysHeadJobName)) Expect(jobSet.Spec.ReplicatedJobs[1].Name).To(Equal("worker")) - By("Checking if annotations were propagated to JobSet") - Expect(jobSet.Annotations).To(HaveKeyWithValue("test-annotation", "test-value")) headJobTemplate := jobSet.Spec.ReplicatedJobs[0].Template - Expect(headJobTemplate.Annotations).To(HaveKeyWithValue("test-annotation", "test-value")) + + By("Checking if labels were propagated to JobSet") + Expect(jobSet.Labels).To(HaveKeyWithValue("test-label", "test-label-value")) + Expect(headJobTemplate.Labels).To(HaveKeyWithValue("test-label", "test-label-value")) + + By("Checking if annotations were propagated to JobSet") + Expect(jobSet.Annotations).To(HaveKeyWithValue("test-annotation", "test-annotation-value")) + Expect(headJobTemplate.Annotations).To(HaveKeyWithValue("test-annotation", "test-annotation-value")) }) }) }) diff --git a/test.yaml b/test.yaml new file mode 100644 index 00000000..a3e29bf8 --- /dev/null +++ b/test.yaml @@ -0,0 +1,33 @@ +apiVersion: pathways-job.pathways.domain/v1 +kind: PathwaysJob +metadata: + name: pw-akshu-l6 +spec: + maxRestarts: 1 + workers: + - type: ct4p-hightpu-4t + topology: 2x2x1 + numSlices: 2 + pathwaysDir: "gs://akshu-v4" + controller: + mainContainerName: main-workload + deploymentMode: default + template: + metadata: + labels: + team: "pw" + annotations: + gke-gcsfuse/volumes: "true" + spec: + containers: + - name: main-workload + image: python:3.13 + imagePullPolicy: Always + command: + - /bin/sh + - -c + - | + pip install --upgrade pip + pip install -U --pre jax jaxlib -f https://storage.googleapis.com/jax-releases/jax_nightly_releases.html + pip install pathwaysutils + python -c "import jax; import pathwaysutils; pathwaysutils.initialize(); print(\"Number of JAX devices is\", len(jax.devices()))"