From 27a5c1405134f6f78a5db6359646e5cdd28235c2 Mon Sep 17 00:00:00 2001 From: Quan Zhang Date: Thu, 25 May 2023 22:44:38 +0800 Subject: [PATCH] TEP-0135: Refactor Affinity Assistant PVC creation Part of [#6740][#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator to ensure that all of a PipelineRun's pods are scheduled to the same node. Before this commit, the PipelineRun reconciler creates PVC for each `VolumeClaimTemplate` backed workspace, and mount the PVCs to the AA to avoid PV availability zone conflict. This implementation works for `AffinityAssistantPerWorkspace` but introduces availability zone conflict issue in the `AffinityAssistantPerPipelineRun` mode since we cannot enforce all the PVC are created in the same availability zone. Instead of directly creating a PVC for each PipelineRun workspace backed by a VolumeClaimTemplate, this commit sets one VolumeClaimTemplate per PVC workspace in the affinity assistant StatefulSet spec, which enforces all VolumeClaimTemplates in StatefulSets are all provisioned on the same node/availability zone. This commit just refactors the current implementation in favor of the `AffinityAssistantPerPipelineRun` feature. There is no functionality change. The `AffinityAssistantPerPipelineRun` feature will be added in the follow up PRs. [#6740]: https://github.com/tektoncd/pipeline/issues/6740 [tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md --- .../pipelinerun/affinity_assistant.go | 77 +++++++---- .../pipelinerun/affinity_assistant_test.go | 130 ++++++++++++++---- pkg/reconciler/pipelinerun/pipelinerun.go | 62 +++++---- .../pipelinerun/pipelinerun_test.go | 102 +++++++++++++- 4 files changed, 288 insertions(+), 83 deletions(-) diff --git a/pkg/reconciler/pipelinerun/affinity_assistant.go b/pkg/reconciler/pipelinerun/affinity_assistant.go index 7a27bdda4b2..448a6f3f234 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant.go @@ -56,14 +56,26 @@ func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb [] var errs []error var unschedulableNodes sets.Set[string] = nil for _, w := range wb { + if w.PersistentVolumeClaim == nil && w.VolumeClaimTemplate == nil { + continue + } + + var claimTemplates []corev1.PersistentVolumeClaim + var claims []corev1.PersistentVolumeClaimVolumeSource + if w.PersistentVolumeClaim != nil { + claims = append(claims, *w.PersistentVolumeClaim.DeepCopy()) + } else if w.VolumeClaimTemplate != nil { + claimTemplate := getVolumeClaimTemplate(w, pr) + claimTemplates = append(claimTemplates, *claimTemplate) + } + if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil { affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name) a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{}) - claimName := getClaimName(w, *kmeta.NewControllerRef(pr)) switch { // check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist case apierrors.IsNotFound(err): - affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimName, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate) + affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate) _, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{}) if err != nil { errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err)) @@ -113,14 +125,10 @@ func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb [] return errorutils.NewAggregate(errs) } -func getClaimName(w v1beta1.WorkspaceBinding, ownerReference metav1.OwnerReference) string { - if w.PersistentVolumeClaim != nil { - return w.PersistentVolumeClaim.ClaimName - } else if w.VolumeClaimTemplate != nil { - return volumeclaim.GetPersistentVolumeClaimName(w.VolumeClaimTemplate, w, ownerReference) - } - - return "" +func getVolumeClaimTemplate(wb v1beta1.WorkspaceBinding, pr *v1beta1.PipelineRun) *corev1.PersistentVolumeClaim { + claimTemplate := wb.VolumeClaimTemplate.DeepCopy() + claimTemplate.Name = volumeclaim.GetPersistentVolumeClaimName(wb.VolumeClaimTemplate, wb, *kmeta.NewControllerRef(pr)) + return claimTemplate } func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.PipelineRun) error { @@ -162,7 +170,7 @@ func getStatefulSetLabels(pr *v1beta1.PipelineRun, affinityAssistantName string) return labels } -func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimName string, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet { +func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet { // We want a singleton pod replicas := int32(1) @@ -172,6 +180,11 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam tpl = pod.MergeAAPodTemplateWithDefault(pr.Spec.PodTemplate.ToAffinityAssistantTemplate(), defaultAATpl) } + var mounts []corev1.VolumeMount + for _, claimTemplate := range claimTemplates { + mounts = append(mounts, corev1.VolumeMount{Name: claimTemplate.Name, MountPath: claimTemplate.Name}) + } + containers := []corev1.Container{{ Name: "affinity-assistant", Image: affinityAssistantImage, @@ -190,8 +203,32 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam "memory": resource.MustParse("100Mi"), }, }, + VolumeMounts: mounts, }} + // TODO(QuanZhang-William): when introducing `coscheduling` flag with values `coschedule-pipelineruns` or `isolate-pipelineruns + // (https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md#configuration), + // it may require a StorageClass with VolumeBindingMode: WaitForFirstConsumer when WS specifies PersistentVolumeClaims to avoid Availability Zone conflict + // (discussed in https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md#co-locating-volumes). + // We need to update the TEP and documentation for this limitation if it is the case. + var volumes []corev1.Volume + for i, claim := range claims { + volumes = append(volumes, corev1.Volume{ + Name: fmt.Sprintf("workspace-%d", i), + VolumeSource: corev1.VolumeSource{ + // A Pod mounting a PersistentVolumeClaim that has a StorageClass with + // volumeBindingMode: Immediate + // the PV is allocated on a Node first, and then the pod need to be + // scheduled to that node. + // To support those PVCs, the Affinity Assistant must also mount the + // same PersistentVolumeClaim - to be sure that the Affinity Assistant + // pod is scheduled to the same Availability Zone as the PV, when using + // a regional cluster. This is called VolumeScheduling. + PersistentVolumeClaim: claim.DeepCopy(), + }, + }) + } + return &appsv1.StatefulSet{ TypeMeta: metav1.TypeMeta{ Kind: "StatefulSet", @@ -207,6 +244,8 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam Selector: &metav1.LabelSelector{ MatchLabels: getStatefulSetLabels(pr, name), }, + // by setting VolumeClaimTemplates from StatefulSet, all the PVs are scheduled to the same Availability Zone as the StatefulSet + VolumeClaimTemplates: claimTemplates, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: getStatefulSetLabels(pr, name), @@ -219,21 +258,7 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam ImagePullSecrets: tpl.ImagePullSecrets, Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr), - Volumes: []corev1.Volume{{ - Name: "workspace", - VolumeSource: corev1.VolumeSource{ - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - // A Pod mounting a PersistentVolumeClaim that has a StorageClass with - // volumeBindingMode: Immediate - // the PV is allocated on a Node first, and then the pod need to be - // scheduled to that node. - // To support those PVCs, the Affinity Assistant must also mount the - // same PersistentVolumeClaim - to be sure that the Affinity Assistant - // pod is scheduled to the same Availability Zone as the PV, when using - // a regional cluster. This is called VolumeScheduling. - ClaimName: claimName, - }}, - }}, + Volumes: volumes, }, }, }, diff --git a/pkg/reconciler/pipelinerun/affinity_assistant_test.go b/pkg/reconciler/pipelinerun/affinity_assistant_test.go index 2e347a66bf7..ff0c4863eeb 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant_test.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant_test.go @@ -60,36 +60,108 @@ var testPipelineRun = &v1beta1.PipelineRun{ } // TestCreateAndDeleteOfAffinityAssistant tests to create and delete an Affinity Assistant -// for a given PipelineRun with a PVC workspace +// for a given PipelineRun func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) { - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - c := Reconciler{ - KubeClientSet: fakek8s.NewSimpleClientset(), - Images: pipeline.Images{}, + tests := []struct { + name string + pr *v1beta1.PipelineRun + }{{ + name: "PersistentVolumeClaim Workspace type", + pr: &v1beta1.PipelineRun{ + Spec: v1beta1.PipelineRunSpec{ + Workspaces: []v1beta1.WorkspaceBinding{{ + Name: "PersistentVolumeClaim Workspace", + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "myclaim", + }, + }}, + }, + }, + }, { + name: "VolumeClaimTemplate Workspace type", + pr: &v1beta1.PipelineRun{ + Spec: v1beta1.PipelineRunSpec{ + Workspaces: []v1beta1.WorkspaceBinding{ + { + Name: "VolumeClaimTemplate Workspace", + VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, + }}, + }, + }, + }, + { + name: "other Workspace type", + pr: &v1beta1.PipelineRun{ + Spec: v1beta1.PipelineRunSpec{ + Workspaces: []v1beta1.WorkspaceBinding{ + { + Name: "EmptyDir Workspace", + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }}, + }, + }, + }, } - err := c.createOrUpdateAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace) - if err != nil { - t.Errorf("unexpected error from createOrUpdateAffinityAssistants: %v", err) - } + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() - expectedAffinityAssistantName := getAffinityAssistantName(workspaceName, testPipelineRun.Name) - _, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName, metav1.GetOptions{}) - if err != nil { - t.Errorf("unexpected error when retrieving StatefulSet: %v", err) - } + c := Reconciler{ + KubeClientSet: fakek8s.NewSimpleClientset(), + Images: pipeline.Images{}, + } - err = c.cleanupAffinityAssistants(ctx, testPipelineRun) - if err != nil { - t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err) - } + err := c.createOrUpdateAffinityAssistants(ctx, tc.pr.Spec.Workspaces, tc.pr, tc.pr.Namespace) + if err != nil { + t.Errorf("unexpected error from createOrUpdateAffinityAssistants: %v", err) + } + + expectedAAName := getAffinityAssistantName(tc.pr.Spec.Workspaces[0].Name, tc.pr.Name) + aa, err := c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectedAAName, metav1.GetOptions{}) + if err != nil { + if tc.pr.Spec.Workspaces[0].PersistentVolumeClaim == nil && tc.pr.Spec.Workspaces[0].VolumeClaimTemplate == nil { + if !apierrors.IsNotFound(err) { + t.Errorf("unexpected error when retrieving StatefulSet: %v", err) + } + } else { + t.Errorf("unexpected error when retrieving StatefulSet: %v", err) + } + } + + // validate PVs from Affinity Assistant + if tc.pr.Spec.Workspaces[0].VolumeClaimTemplate != nil { + if len(aa.Spec.VolumeClaimTemplates) != 1 { + t.Errorf("unexpected VolumeClaimTemplate count, expect 1 but got %v", len(aa.Spec.VolumeClaimTemplates)) + } + if d := cmp.Diff(*getVolumeClaimTemplate(tc.pr.Spec.Workspaces[0], tc.pr), aa.Spec.VolumeClaimTemplates[0]); d != "" { + t.Errorf("VolumeClaimTemplates diff: %s", diff.PrintWantGot(d)) + } + } else if tc.pr.Spec.Workspaces[0].PersistentVolumeClaim != nil { + if len(aa.Spec.Template.Spec.Volumes) != 1 { + t.Errorf("unexpected PersistentVolumeClaim count, expect 1 but got %v", len(aa.Spec.Template.Spec.Volumes)) + } + if d := cmp.Diff(tc.pr.Spec.Workspaces[0].PersistentVolumeClaim, aa.Spec.Template.Spec.Volumes[0].VolumeSource.PersistentVolumeClaim); d != "" { + t.Errorf("PersistentVolumeClaim diff: %s", diff.PrintWantGot(d)) + } + } + + // clean up Affinity Assistant + if tc.pr.Spec.Workspaces[0].PersistentVolumeClaim != nil || tc.pr.Spec.Workspaces[0].VolumeClaimTemplate != nil { + err = c.cleanupAffinityAssistants(ctx, tc.pr) + if err != nil { + t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err) + } - _, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName, metav1.GetOptions{}) - if !apierrors.IsNotFound(err) { - t.Errorf("expected a NotFound response, got: %v", err) + _, err = c.KubeClientSet.AppsV1().StatefulSets(tc.pr.Namespace).Get(ctx, expectedAAName, metav1.GetOptions{}) + if !apierrors.IsNotFound(err) { + t.Errorf("expected a NotFound response, got: %v", err) + } + } + }) } } @@ -239,7 +311,7 @@ func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) { }, } - stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", nil) + stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil) if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 { t.Errorf("expected Tolerations in the StatefulSet") @@ -277,7 +349,7 @@ func TestDefaultPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) { }}, } - stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", defaultTpl) + stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl) if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 { t.Errorf("expected Tolerations in the StatefulSet") @@ -323,7 +395,7 @@ func TestMergedPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) { }}, } - stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", defaultTpl) + stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl) if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 { t.Errorf("expected Tolerations from spec in the StatefulSet") @@ -360,7 +432,7 @@ func TestOnlySelectPodTemplateFieldsArePropagatedToAffinityAssistant(t *testing. }, } - stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", nil) + stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil) if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 { t.Errorf("expected Tolerations from spec in the StatefulSet") @@ -380,7 +452,7 @@ func TestThatTheAffinityAssistantIsWithoutNodeSelectorAndTolerations(t *testing. Spec: v1beta1.PipelineRunSpec{}, } - stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, "mypvc", "nginx", nil) + stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil) if len(stsWithoutTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 0 { t.Errorf("unexpected Tolerations in the StatefulSet") diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 92d9d139049..21be517aff6 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -596,7 +596,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get return controller.NewPermanentError(err) } - if pr.HasVolumeClaimTemplate() { + // Make an attempt to create Affinity Assistant if it does not exist + // if the Affinity Assistant already exists, handle the possibility of assigned node becoming unschedulable by deleting the pod + if !c.isAffinityAssistantDisabled(ctx) { + // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity + if err = c.createOrUpdateAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil { + logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) + pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, + "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", + pr.Namespace, pr.Name, err) + return controller.NewPermanentError(err) + } + } else if pr.HasVolumeClaimTemplate() { // create workspace PVC from template if err = c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(ctx, pr.Spec.Workspaces, *kmeta.NewControllerRef(pr), pr.Namespace); err != nil { logger.Errorf("Failed to create PVC for PipelineRun %s: %v", pr.Name, err) @@ -608,19 +619,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get } } - // Make an attempt to create Affinity Assistant if it does not exist - // if the Affinity Assistant already exists, handle the possibility of assigned node becoming unschedulable by deleting the pod - if !c.isAffinityAssistantDisabled(ctx) { - // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity - if err = c.createOrUpdateAffinityAssistants(ctx, pr.Spec.Workspaces, pr, pr.Namespace); err != nil { - logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) - pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, - "Failed to create StatefulSet or update affinity assistant replicas for PipelineRun %s/%s correctly: %s", - pr.Namespace, pr.Name, err) - return controller.NewPermanentError(err) - } - } - if pr.Status.FinallyStartTime == nil { if pr.HaveTasksTimedOut(ctx, c.Clock) { tasksToTimeOut := sets.NewString() @@ -868,7 +866,7 @@ func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, para var pipelinePVCWorkspaceName string var err error - tr.Spec.Workspaces, pipelinePVCWorkspaceName, err = getTaskrunWorkspaces(ctx, pr, rpt) + tr.Spec.Workspaces, pipelinePVCWorkspaceName, err = c.getTaskrunWorkspaces(ctx, pr, rpt) if err != nil { return nil, err } @@ -917,7 +915,7 @@ func (c *Reconciler) createRunObject(ctx context.Context, runName string, params var pipelinePVCWorkspaceName string var err error var workspaces []v1beta1.WorkspaceBinding - workspaces, pipelinePVCWorkspaceName, err = getTaskrunWorkspaces(ctx, pr, rpt) + workspaces, pipelinePVCWorkspaceName, err = c.getTaskrunWorkspaces(ctx, pr, rpt) if err != nil { return nil, err } @@ -991,7 +989,7 @@ func propagateWorkspaces(rpt *resources.ResolvedPipelineTask) (*resources.Resolv return rpt, nil } -func getTaskrunWorkspaces(ctx context.Context, pr *v1beta1.PipelineRun, rpt *resources.ResolvedPipelineTask) ([]v1beta1.WorkspaceBinding, string, error) { +func (c *Reconciler) getTaskrunWorkspaces(ctx context.Context, pr *v1beta1.PipelineRun, rpt *resources.ResolvedPipelineTask) ([]v1beta1.WorkspaceBinding, string, error) { var err error var workspaces []v1beta1.WorkspaceBinding var pipelinePVCWorkspaceName string @@ -1021,7 +1019,12 @@ func getTaskrunWorkspaces(ctx context.Context, pr *v1beta1.PipelineRun, rpt *res if b.PersistentVolumeClaim != nil || b.VolumeClaimTemplate != nil { pipelinePVCWorkspaceName = pipelineWorkspace } - workspaces = append(workspaces, taskWorkspaceByWorkspaceVolumeSource(b, taskWorkspaceName, pipelineTaskSubPath, *kmeta.NewControllerRef(pr))) + + workspace, err := c.taskWorkspaceByWorkspaceVolumeSource(ctx, pipelinePVCWorkspaceName, pr.Name, b, taskWorkspaceName, pipelineTaskSubPath, *kmeta.NewControllerRef(pr)) + if err != nil { + return nil, "", controller.NewPermanentError(err) + } + workspaces = append(workspaces, workspace) } else { workspaceIsOptional := false if rpt.ResolvedTask != nil && rpt.ResolvedTask.TaskSpec != nil { @@ -1054,23 +1057,30 @@ func getTaskrunWorkspaces(ctx context.Context, pr *v1beta1.PipelineRun, rpt *res // taskWorkspaceByWorkspaceVolumeSource is returning the WorkspaceBinding with the TaskRun specified name. // If the volume source is a volumeClaimTemplate, the template is applied and passed to TaskRun as a persistentVolumeClaim -func taskWorkspaceByWorkspaceVolumeSource(wb v1beta1.WorkspaceBinding, taskWorkspaceName string, pipelineTaskSubPath string, owner metav1.OwnerReference) v1beta1.WorkspaceBinding { +func (c *Reconciler) taskWorkspaceByWorkspaceVolumeSource(ctx context.Context, pipelineWorkspaceName string, prName string, wb v1beta1.WorkspaceBinding, taskWorkspaceName string, pipelineTaskSubPath string, owner metav1.OwnerReference) (v1beta1.WorkspaceBinding, error) { if wb.VolumeClaimTemplate == nil { binding := *wb.DeepCopy() binding.Name = taskWorkspaceName binding.SubPath = combinedSubPath(wb.SubPath, pipelineTaskSubPath) - return binding + return binding, nil } - // apply template + claim := corev1.PersistentVolumeClaim{} binding := v1beta1.WorkspaceBinding{ - SubPath: combinedSubPath(wb.SubPath, pipelineTaskSubPath), - PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: volumeclaim.GetPersistentVolumeClaimName(wb.VolumeClaimTemplate, wb, owner), - }, + SubPath: combinedSubPath(wb.SubPath, pipelineTaskSubPath), + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{}, } binding.Name = taskWorkspaceName - return binding + + pvcName := volumeclaim.GetPersistentVolumeClaimName(&claim, wb, owner) + if c.isAffinityAssistantDisabled(ctx) { + binding.PersistentVolumeClaim.ClaimName = pvcName + } else { + affinityAssistantName := getAffinityAssistantName(pipelineWorkspaceName, prName) + // if the PVC is created by AffinityAssistant StatefulSet, the PVC name is default to --0 + binding.PersistentVolumeClaim.ClaimName = fmt.Sprintf("%s-%s-0", pvcName, affinityAssistantName) + } + return binding, nil } // combinedSubPath returns the combined value of the optional subPath from workspaceBinding and the optional diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 1ca8eb3106d..69bb4e6acda 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -57,6 +57,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + fakek8s "k8s.io/client-go/kubernetes/fake" ktesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" @@ -108,6 +109,7 @@ const ( apiFieldsFeatureFlag = "enable-api-fields" ociBundlesFeatureFlag = "enable-tekton-oci-bundles" maxMatrixCombinationsCountFlag = "default-max-matrix-combinations-count" + disableAffinityAssistantFlag = "disable-affinity-assistant" ) type PipelineRunTest struct { @@ -1218,6 +1220,12 @@ func withMaxMatrixCombinationsCount(cm *corev1.ConfigMap, count int) *corev1.Con return newCM } +func withoutAffinityAssistant(cm *corev1.ConfigMap) *corev1.ConfigMap { + newCM := cm.DeepCopy() + newCM.Data[disableAffinityAssistantFlag] = "true" + return newCM +} + // TestReconcileOnCancelledPipelineRun runs "Reconcile" on a PipelineRun that // has been cancelled. It verifies that reconcile is successful, the pipeline // status updated and events generated. @@ -3979,11 +3987,13 @@ spec: name: myclaim `)} ts := []*v1beta1.Task{simpleHelloWorldTask} + cms := []*corev1.ConfigMap{withoutAffinityAssistant(newFeatureFlagsConfigMap())} d := test.Data{ PipelineRuns: prs, Pipelines: ps, Tasks: ts, + ConfigMaps: cms, } prt := newPipelineRunTest(t, d) defer prt.Cancel() @@ -7385,8 +7395,12 @@ spec: ctx := config.EnableAlphaAPIFields(context.Background()) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + c := Reconciler{ + KubeClientSet: fakek8s.NewSimpleClientset(), + Images: pipeline.Images{}, + } rprt := &resources.ResolvedPipelineTask{PipelineTask: &tt.pr.Spec.PipelineSpec.Tasks[0]} - _, _, err := getTaskrunWorkspaces(ctx, tt.pr, rprt) + _, _, err := c.getTaskrunWorkspaces(ctx, tt.pr, rprt) if err == nil { t.Errorf("Pipeline.getTaskrunWorkspaces() did not return error for invalid workspace") } else if d := cmp.Diff(tt.expectedError, err.Error(), cmpopts.IgnoreUnexported(apis.FieldError{})); d != "" { @@ -7492,7 +7506,11 @@ spec: } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, _, err := getTaskrunWorkspaces(context.Background(), tt.pr, tt.rprt) + c := Reconciler{ + KubeClientSet: fakek8s.NewSimpleClientset(), + Images: pipeline.Images{}, + } + _, _, err := c.getTaskrunWorkspaces(context.Background(), tt.pr, tt.rprt) if err != nil { t.Errorf("Pipeline.getTaskrunWorkspaces() returned error for valid pipeline: %v", err) @@ -7501,6 +7519,86 @@ spec: } } +func Test_taskWorkspaceByWorkspaceVolumeSource(t *testing.T) { + testPr := &v1beta1.PipelineRun{} + tests := []struct { + name, taskWorkspaceName, pipelineWorkspaceName, prName string + wb v1beta1.WorkspaceBinding + expectedBinding v1beta1.WorkspaceBinding + disableAffinityAssistant bool + }{ + { + name: "PVC Workspace with Affinity Assistant", + prName: "test-pipeline-run", + taskWorkspaceName: "task-workspace", + pipelineWorkspaceName: "pipeline-workspace", + wb: v1beta1.WorkspaceBinding{ + Name: "foo", + VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, + }, + expectedBinding: v1beta1.WorkspaceBinding{ + Name: "task-workspace", + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "pvc-2c26b46b68-affinity-assistant-e011a5ef79-0", + }, + }, + }, + { + name: "PVC Workspace without Affinity Assistant", + prName: "test-pipeline-run", + taskWorkspaceName: "task-workspace", + wb: v1beta1.WorkspaceBinding{ + Name: "foo", + VolumeClaimTemplate: &corev1.PersistentVolumeClaim{}, + }, + expectedBinding: v1beta1.WorkspaceBinding{ + Name: "task-workspace", + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: "pvc-2c26b46b68", + }, + }, + disableAffinityAssistant: true, + }, + { + name: "non-PVC Workspace", + taskWorkspaceName: "task-workspace", + wb: v1beta1.WorkspaceBinding{ + Name: "foo", + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + expectedBinding: v1beta1.WorkspaceBinding{ + Name: "task-workspace", + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + c := Reconciler{} + ctx := context.Background() + if tc.disableAffinityAssistant { + featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{ + "disable-affinity-assistant": "true", + }) + cfg := &config.Config{ + FeatureFlags: featureFlags, + } + ctx = config.ToContext(context.Background(), cfg) + } + + binding, err := c.taskWorkspaceByWorkspaceVolumeSource(ctx, tc.pipelineWorkspaceName, tc.prName, tc.wb, tc.taskWorkspaceName, "", *kmeta.NewControllerRef(testPr)) + if err != nil { + t.Errorf("Pipeline.taskWorkspaceByWorkspaceVolumeSource() returned unexpected error: %v", err) + } + + if d := cmp.Diff(tc.expectedBinding, binding); d != "" { + t.Errorf("WorkspaceBinding diff: %s", diff.PrintWantGot(d)) + } + }) + } +} + func TestReconcile_PropagatePipelineTaskRunSpecMetadata(t *testing.T) { names.TestingSeed()