diff --git a/pkg/reconciler/pipelinerun/affinity_assistant.go b/pkg/reconciler/pipelinerun/affinity_assistant.go index af09e0eb65b..f0dccf3cdc5 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant.go @@ -41,20 +41,19 @@ import ( const ( // ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace indicates that a PipelineRun uses workspaces with PersistentVolumeClaim - // as a volume source and expect an Assistant StatefulSet, but couldn't create a StatefulSet. + // as a volume source and expect an Assistant StatefulSet in AffinityAssistantPerWorkspace behavior, but couldn't create a StatefulSet. ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace" featureFlagDisableAffinityAssistantKey = "disable-affinity-assistant" ) -// createOrUpdateAffinityAssistantsPerAABehavior creates Affinity Assistant StatefulSets based on AffinityAssistantBehavior. +// createOrUpdateAffinityAssistantsAndPVCs creates Affinity Assistant StatefulSets and PVCs based on AffinityAssistantBehavior. // This is done to achieve Node Affinity for taskruns in a pipelinerun, and make it possible for the taskruns to execute parallel while sharing volume. // If the AffinityAssitantBehavior is AffinityAssistantPerWorkspace, it creates an Affinity Assistant for // every taskrun in the pipelinerun that use the same PVC based volume. // If the AffinityAssitantBehavior is AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation, // it creates one Affinity Assistant for the pipelinerun. -// Other AffinityAssitantBehaviors are invalid. -func (c *Reconciler) createOrUpdateAffinityAssistantsPerAABehavior(ctx context.Context, pr *v1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) error { +func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context, pr *v1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) error { var errs []error var unschedulableNodes sets.Set[string] = nil @@ -79,26 +78,40 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsPerAABehavior(ctx context.C } } + // create PVCs from PipelineRun's VolumeClaimTemplate when aaBehavior is AffinityAssistantPerWorkspace or AffinityAssistantDisabled before creating + // affinity assistant so that the OwnerReference of the PVCs are the pipelineruns, which is used to achieve PVC auto deletion at PipelineRun deletion time + if (aaBehavior == aa.AffinityAssistantPerWorkspace || aaBehavior == aa.AffinityAssistantDisabled) && pr.HasVolumeClaimTemplate() { + if err := c.pvcHandler.CreatePVCsForWorkspaces(ctx, pr.Spec.Workspaces, *kmeta.NewControllerRef(pr), pr.Namespace); err != nil { + return fmt.Errorf("failed to create PVC for PipelineRun %s: %w", pr.Name, err) + } + } + switch aaBehavior { case aa.AffinityAssistantPerWorkspace: for claim, workspaceName := range claimToWorkspace { - aaName := getAffinityAssistantName(workspaceName, pr.Name) + aaName := GetAffinityAssistantName(workspaceName, pr.Name) err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes) errs = append(errs, err...) } for claimTemplate, workspaceName := range claimTemplatesToWorkspace { - aaName := getAffinityAssistantName(workspaceName, pr.Name) - err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, []corev1.PersistentVolumeClaim{*claimTemplate}, nil, unschedulableNodes) + aaName := GetAffinityAssistantName(workspaceName, pr.Name) + // To support PVC auto deletion at pipelinerun deletion time, the OwnerReference of the PVCs should be set to the owning pipelinerun. + // In AffinityAssistantPerWorkspace mode, the reconciler has created PVCs (owned by pipelinerun) from pipelinerun's VolumeClaimTemplate at this point, + // so the VolumeClaimTemplates are pass in as PVCs when creating affinity assistant StatefulSet for volume scheduling. + // If passed in as VolumeClaimTemplates, the PVCs are owned by Affinity Assistant StatefulSet instead of the pipelinerun. + err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes) errs = append(errs, err...) } case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation: if claims != nil || claimTemplates != nil { - aaName := getAffinityAssistantName("", pr.Name) + aaName := GetAffinityAssistantName("", pr.Name) + // In AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes, the PVCs are created via StatefulSet for volume scheduling. + // PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time, + // so we don't need to worry the OwnerReference of the PVCs err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes) errs = append(errs, err...) } case aa.AffinityAssistantDisabled: - return fmt.Errorf("unexpected Affinity Assistant behavior %v", aa.AffinityAssistantDisabled) } return errorutils.NewAggregate(errs) @@ -175,18 +188,10 @@ func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1.Pipel var errs []error for _, w := range pr.Spec.Workspaces { if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil { - affinityAssistantStsName := getAffinityAssistantName(w.Name, pr.Name) + affinityAssistantStsName := GetAffinityAssistantName(w.Name, pr.Name) if err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Delete(ctx, affinityAssistantStsName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { errs = append(errs, fmt.Errorf("failed to delete StatefulSet %s: %w", affinityAssistantStsName, err)) } - - // cleanup PVCs created by Affinity Assistants - if w.VolumeClaimTemplate != nil { - pvcName := getPersistentVolumeClaimNameWithAffinityAssistant(w.Name, pr.Name, w, *kmeta.NewControllerRef(pr)) - if err := c.KubeClientSet.CoreV1().PersistentVolumeClaims(pr.Namespace).Delete(ctx, pvcName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { - errs = append(errs, fmt.Errorf("failed to delete PersistentVolumeClaim %s: %w", pvcName, err)) - } - } } } return errorutils.NewAggregate(errs) @@ -195,13 +200,15 @@ func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1.Pipel // getPersistentVolumeClaimNameWithAffinityAssistant returns the PersistentVolumeClaim name that is // created by the Affinity Assistant StatefulSet VolumeClaimTemplate when Affinity Assistant is enabled. // The PVCs created by StatefulSet VolumeClaimTemplates follow the format `--0` +// TODO(#6740)(WIP): use this function when adding end-to-end support for AffinityAssistantPerPipelineRun mode func getPersistentVolumeClaimNameWithAffinityAssistant(pipelineWorkspaceName, prName string, wb v1.WorkspaceBinding, owner metav1.OwnerReference) string { pvcName := volumeclaim.GetPVCNameWithoutAffinityAssistant(wb.VolumeClaimTemplate.Name, wb, owner) - affinityAssistantName := getAffinityAssistantName(pipelineWorkspaceName, prName) + affinityAssistantName := GetAffinityAssistantName(pipelineWorkspaceName, prName) return fmt.Sprintf("%s-%s-0", pvcName, affinityAssistantName) } -func getAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string { +// GetAffinityAssistantName returns the Affinity Assistant name based on pipelineWorkspaceName and pipelineRunName +func GetAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string { hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName)) hashString := fmt.Sprintf("%x", hashBytes) return fmt.Sprintf("%s-%s", workspace.ComponentNameAffinityAssistant, hashString[:10]) diff --git a/pkg/reconciler/pipelinerun/affinity_assistant_test.go b/pkg/reconciler/pipelinerun/affinity_assistant_test.go index 1bb9f37d963..b18bdde3d2f 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant_test.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant_test.go @@ -19,7 +19,6 @@ package pipelinerun import ( "context" "errors" - "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -29,9 +28,11 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline/pod" v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" + "github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim" "github.com/tektoncd/pipeline/pkg/workspace" "github.com/tektoncd/pipeline/test/diff" "github.com/tektoncd/pipeline/test/parse" + "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -168,13 +169,13 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { KubeClientSet: fakek8s.NewSimpleClientset(), } - err := c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, tc.pr, aa.AffinityAssistantPerPipelineRun) + err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, tc.pr, aa.AffinityAssistantPerPipelineRun) if err != nil { t.Errorf("unexpected error from createOrUpdateAffinityAssistantsPerPipelineRun: %v", err) } // validate StatefulSets from Affinity Assistant - expectAAName := getAffinityAssistantName("", tc.pr.Name) + expectAAName := GetAffinityAssistantName("", tc.pr.Name) validateStatefulSetSpec(t, ctx, c, expectAAName, tc.expectStatefulSetSpec) // TODO(#6740)(WIP): test cleanupAffinityAssistants for coscheduling-pipelinerun mode when fully implemented @@ -182,16 +183,18 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { } } -// TestCreateAndDeleteOfAffinityAssistantPerWorkspace tests to create and delete an Affinity Assistant +// TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled tests to create and delete an Affinity Assistant // per workspace for a given PipelineRun -func TestCreateAndDeleteOfAffinityAssistantPerWorkspace(t *testing.T) { +func TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled(t *testing.T) { tests := []struct { - name string + name, expectedPVCName string pr *v1.PipelineRun expectStatefulSetSpec []*appsv1.StatefulSetSpec + aaBehavior aa.AffinityAssitantBehavior }{{ - name: "PersistentVolumeClaim Workspace type", - pr: testPRWithPVC, + name: "PersistentVolumeClaim Workspace type", + aaBehavior: aa.AffinityAssistantPerWorkspace, + pr: testPRWithPVC, expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ @@ -205,20 +208,43 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspace(t *testing.T) { }, }}, }, { - name: "VolumeClaimTemplate Workspace type", - pr: testPRWithVolumeClaimTemplate, + name: "VolumeClaimTemplate Workspace type", + aaBehavior: aa.AffinityAssistantPerWorkspace, + pr: testPRWithVolumeClaimTemplate, + expectedPVCName: "pvc-b9eea16dce", expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ - VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ - ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, - }}, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{{ + Name: "workspace-0", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ClaimName: "pvc-b9eea16dce"}, + }, + }}, + }, + }, }}, }, { - name: "VolumeClaimTemplate and PersistentVolumeClaim Workspaces", - pr: testPRWithVolumeClaimTemplateAndPVC, + name: "VolumeClaimTemplate Workspace type - AA disabled", + aaBehavior: aa.AffinityAssistantDisabled, + pr: testPRWithVolumeClaimTemplate, + expectedPVCName: "pvc-b9eea16dce", + }, { + name: "VolumeClaimTemplate and PersistentVolumeClaim Workspaces", + aaBehavior: aa.AffinityAssistantPerWorkspace, + pr: testPRWithVolumeClaimTemplateAndPVC, + expectedPVCName: "pvc-b9eea16dce", expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ - VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ - ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, - }}}, { + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{{ + Name: "workspace-0", + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ClaimName: "pvc-b9eea16dce"}, + }, + }}, + }, + }}, { Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Volumes: []corev1.Volume{{ @@ -232,6 +258,7 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspace(t *testing.T) { }}, }, { name: "other Workspace type", + aaBehavior: aa.AffinityAssistantPerWorkspace, pr: testPRWithEmptyDir, expectStatefulSetSpec: nil, }} @@ -240,23 +267,33 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspace(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { ctx := context.Background() + kubeClientSet := fakek8s.NewSimpleClientset() c := Reconciler{ - KubeClientSet: fakek8s.NewSimpleClientset(), + KubeClientSet: kubeClientSet, + pvcHandler: volumeclaim.NewPVCHandler(kubeClientSet, zap.NewExample().Sugar()), } - err := c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, tc.pr, aa.AffinityAssistantPerWorkspace) + err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, tc.pr, tc.aaBehavior) if err != nil { - t.Errorf("unexpected error from createOrUpdateAffinityAssistantsPerWorkspace: %v", err) + t.Fatalf("unexpected error from createOrUpdateAffinityAssistantsPerWorkspace: %v", err) } // validate StatefulSets from Affinity Assistant for i, w := range tc.pr.Spec.Workspaces { if tc.expectStatefulSetSpec != nil { - expectAAName := getAffinityAssistantName(w.Name, tc.pr.Name) + expectAAName := GetAffinityAssistantName(w.Name, tc.pr.Name) validateStatefulSetSpec(t, ctx, c, expectAAName, tc.expectStatefulSetSpec[i]) } } + // validate PVCs from VolumeClaimTemplate + if tc.expectedPVCName != "" { + _, err = c.KubeClientSet.CoreV1().PersistentVolumeClaims("").Get(ctx, tc.expectedPVCName, metav1.GetOptions{}) + if err != nil { + t.Errorf("unexpected error when retrieving PVC: %v", err) + } + } + // clean up Affinity Assistant c.cleanupAffinityAssistants(ctx, tc.pr) if err != nil { @@ -267,7 +304,7 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspace(t *testing.T) { continue } - expectAAName := getAffinityAssistantName(w.Name, tc.pr.Name) + expectAAName := GetAffinityAssistantName(w.Name, tc.pr.Name) _, err = c.KubeClientSet.AppsV1().StatefulSets(tc.pr.Namespace).Get(ctx, expectAAName, metav1.GetOptions{}) if !apierrors.IsNotFound(err) { t.Errorf("expected a NotFound response, got: %v", err) @@ -277,28 +314,10 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspace(t *testing.T) { } } -func TestCreateAndDeleteOfAffinityAssistantDisabled_Failure(t *testing.T) { - ctx := context.Background() - c := Reconciler{ - KubeClientSet: fakek8s.NewSimpleClientset(), - } - - wantErr := fmt.Errorf("unexpected Affinity Assistant behavior %s", aa.AffinityAssistantDisabled) - - err := c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, testPRWithPVC, aa.AffinityAssistantDisabled) - if err == nil { - t.Fatalf("expecting error: %v, but got nil", wantErr) - } - - if diff := cmp.Diff(wantErr.Error(), err.Error()); diff != "" { - t.Errorf("expected error mismatch: %v", diff) - } -} - // TestCreateAffinityAssistantWhenNodeIsCordoned tests an existing Affinity Assistant can identify the node failure and // can migrate the affinity assistant pod to a healthy node so that the existing pipelineRun runs to competition func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { - expectedAffinityAssistantName := getAffinityAssistantName(workspacePVCName, testPRWithPVC.Name) + expectedAffinityAssistantName := GetAffinityAssistantName(workspacePVCName, testPRWithPVC.Name) ss := []*appsv1.StatefulSet{{ TypeMeta: metav1.TypeMeta{ @@ -398,7 +417,7 @@ func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { return true, &corev1.Pod{}, errors.New("error listing/deleting pod") }) } - err := c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, testPRWithPVC, aa.AffinityAssistantPerWorkspace) + err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, testPRWithPVC, aa.AffinityAssistantPerWorkspace) if !tt.expectedError && err != nil { t.Errorf("expected no error from createOrUpdateAffinityAssistantsPerWorkspace for the test \"%s\", but got: %v", tt.name, err) } @@ -603,7 +622,7 @@ func TestThatTheAffinityAssistantIsWithoutNodeSelectorAndTolerations(t *testing. // plus 10 chars for a hash. Labels in Kubernetes can not be longer than 63 chars. // Typical output from the example below is affinity-assistant-0384086f62 func TestThatAffinityAssistantNameIsNoLongerThan53(t *testing.T) { - affinityAssistantName := getAffinityAssistantName( + affinityAssistantName := GetAffinityAssistantName( "pipeline-workspace-name-that-is-quite-long", "pipelinerun-with-a-long-custom-name") @@ -628,7 +647,7 @@ func TestCleanupAffinityAssistants_Success(t *testing.T) { } // seed data to create StatefulSets and PVCs - expectedAffinityAssistantName := getAffinityAssistantName(workspace.Name, pr.Name) + expectedAffinityAssistantName := GetAffinityAssistantName(workspace.Name, pr.Name) aa := []*appsv1.StatefulSet{{ TypeMeta: metav1.TypeMeta{ Kind: "StatefulSet", @@ -642,14 +661,12 @@ func TestCleanupAffinityAssistants_Success(t *testing.T) { ReadyReplicas: 1, }, }} - expectedPVCName := getPersistentVolumeClaimNameWithAffinityAssistant(workspace.Name, pr.Name, workspace, *kmeta.NewControllerRef(pr)) pvc := []*corev1.PersistentVolumeClaim{{ ObjectMeta: metav1.ObjectMeta{ Name: expectedPVCName, }}, } - data := Data{ StatefulSets: aa, PVCs: pvc, @@ -667,9 +684,11 @@ func TestCleanupAffinityAssistants_Success(t *testing.T) { if !apierrors.IsNotFound(err) { t.Errorf("expected a NotFound response of StatefulSet, got: %v", err) } + + // the PVCs are NOT expected to be deleted at Affinity Assistant cleanup time _, err = c.KubeClientSet.CoreV1().PersistentVolumeClaims(pr.Namespace).Get(ctx, expectedPVCName, metav1.GetOptions{}) - if !apierrors.IsNotFound(err) { - t.Errorf("expected a NotFound response of PersistentVolumeClaims, got: %v", err) + if err != nil { + t.Errorf("unexpected err when getting PersistentVolumeClaims, err: %v", err) } } @@ -692,14 +711,9 @@ func TestCleanupAffinityAssistants_Failure(t *testing.T) { func(action testing2.Action) (handled bool, ret runtime.Object, err error) { return true, &corev1.NodeList{}, errors.New("error deleting statefulsets") }) - c.KubeClientSet.CoreV1().(*fake.FakeCoreV1).PrependReactor("delete", "persistentvolumeclaims", - func(action testing2.Action) (handled bool, ret runtime.Object, err error) { - return true, &corev1.Pod{}, errors.New("error deleting persistentvolumeclaims") - }) expectedErrs := errorutils.NewAggregate([]error{ errors.New("failed to delete StatefulSet affinity-assistant-e3b0c44298: error deleting statefulsets"), - errors.New("failed to delete PersistentVolumeClaim pvc-e3b0c44298-affinity-assistant-e3b0c44298-0: error deleting persistentvolumeclaims"), }) errs := c.cleanupAffinityAssistants(ctx, pr) diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index fac1e3c468e..2970e85b782 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -605,27 +605,30 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel return controller.NewPermanentError(err) } - // Make an attempt to create Affinity Assistant if it does not exist - // if the Affinity Assistant already exists, handle the possibility of assigned node becoming unschedulable by deleting the pod - if !c.isAffinityAssistantDisabled(ctx) { - // create Affinity Assistant (StatefulSet) so that taskRun pods that share workspace PVC achieve Node Affinity - // TODO(#6740)(WIP): We only support AffinityAssistantPerWorkspace at the point. Implement different AffinityAssitantBehaviors based on `coscheduling` feature flag when adding end-to-end support. - if err = c.createOrUpdateAffinityAssistantsPerAABehavior(ctx, pr, affinityassistant.AffinityAssistantPerWorkspace); err != nil { - logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) - pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace, - "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", - pr.Namespace, pr.Name, err) - return controller.NewPermanentError(err) - } - } else if pr.HasVolumeClaimTemplate() { - // create workspace PVC from template - if err = c.pvcHandler.CreatePVCsForWorkspacesWithoutAffinityAssistant(ctx, pr.Spec.Workspaces, *kmeta.NewControllerRef(pr), pr.Namespace); err != nil { - logger.Errorf("Failed to create PVC for PipelineRun %s: %v", pr.Name, err) - pr.Status.MarkFailed(volumeclaim.ReasonCouldntCreateWorkspacePVC, - "Failed to create PVC for PipelineRun %s/%s Workspaces correctly: %s", - pr.Namespace, pr.Name, err) + aaBehavior, err := affinityassistant.GetAffinityAssistantBehavior(ctx) + if err != nil { + return controller.NewPermanentError(err) + } + + switch aaBehavior { + case affinityassistant.AffinityAssistantPerWorkspace, affinityassistant.AffinityAssistantDisabled: + if err = c.createOrUpdateAffinityAssistantsAndPVCs(ctx, pr, aaBehavior); err != nil { + logger.Errorf("Failed to create PVC or affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) + if aaBehavior == affinityassistant.AffinityAssistantPerWorkspace { + pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace, + "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", + pr.Namespace, pr.Name, err) + } else { + pr.Status.MarkFailed(volumeclaim.ReasonCouldntCreateWorkspacePVC, + "Failed to create PVC for PipelineRun %s/%s Workspaces correctly: %s", + pr.Namespace, pr.Name, err) + } + return controller.NewPermanentError(err) } + case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation: + // TODO(#6740)(WIP): implement end-to-end support for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation modes + return controller.NewPermanentError(fmt.Errorf("affinity assistant behavior: %v is not implemented", aaBehavior)) } } @@ -879,7 +882,7 @@ func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, para } if !c.isAffinityAssistantDisabled(ctx) && pipelinePVCWorkspaceName != "" { - tr.Annotations[workspace.AnnotationAffinityAssistantName] = getAffinityAssistantName(pipelinePVCWorkspaceName, pr.Name) + tr.Annotations[workspace.AnnotationAffinityAssistantName] = GetAffinityAssistantName(pipelinePVCWorkspaceName, pr.Name) } logger.Infof("Creating a new TaskRun object %s for pipeline task %s", taskRunName, rpt.PipelineTask.Name) @@ -987,7 +990,7 @@ func (c *Reconciler) createCustomRun(ctx context.Context, runName string, params // Set the affinity assistant annotation in case the custom task creates TaskRuns or Pods // that can take advantage of it. if !c.isAffinityAssistantDisabled(ctx) && pipelinePVCWorkspaceName != "" { - r.Annotations[workspace.AnnotationAffinityAssistantName] = getAffinityAssistantName(pipelinePVCWorkspaceName, pr.Name) + r.Annotations[workspace.AnnotationAffinityAssistantName] = GetAffinityAssistantName(pipelinePVCWorkspaceName, pr.Name) } logger.Infof("Creating a new CustomRun object %s", runName) @@ -1049,7 +1052,12 @@ func (c *Reconciler) getTaskrunWorkspaces(ctx context.Context, pr *v1.PipelineRu pipelinePVCWorkspaceName = pipelineWorkspace } - workspace := c.taskWorkspaceByWorkspaceVolumeSource(ctx, pipelinePVCWorkspaceName, pr.Name, b, taskWorkspaceName, pipelineTaskSubPath, *kmeta.NewControllerRef(pr)) + aaBehavior, err := affinityassistant.GetAffinityAssistantBehavior(ctx) + if err != nil { + return nil, "", err + } + + workspace := c.taskWorkspaceByWorkspaceVolumeSource(ctx, pipelinePVCWorkspaceName, pr.Name, b, taskWorkspaceName, pipelineTaskSubPath, *kmeta.NewControllerRef(pr), aaBehavior) workspaces = append(workspaces, workspace) } else { workspaceIsOptional := false @@ -1084,7 +1092,7 @@ func (c *Reconciler) getTaskrunWorkspaces(ctx context.Context, pr *v1.PipelineRu // taskWorkspaceByWorkspaceVolumeSource returns the WorkspaceBinding to be bound to each TaskRun in the Pipeline Task. // If the PipelineRun WorkspaceBinding is a volumeClaimTemplate, the returned WorkspaceBinding references a PersistentVolumeClaim created for the PipelineRun WorkspaceBinding based on the PipelineRun as OwnerReference. // Otherwise, the returned WorkspaceBinding references the same volume as the PipelineRun WorkspaceBinding, with the file path joined with pipelineTaskSubPath as the binding subpath. -func (c *Reconciler) taskWorkspaceByWorkspaceVolumeSource(ctx context.Context, pipelineWorkspaceName string, prName string, wb v1.WorkspaceBinding, taskWorkspaceName string, pipelineTaskSubPath string, owner metav1.OwnerReference) v1.WorkspaceBinding { +func (c *Reconciler) taskWorkspaceByWorkspaceVolumeSource(ctx context.Context, pipelineWorkspaceName string, prName string, wb v1.WorkspaceBinding, taskWorkspaceName string, pipelineTaskSubPath string, owner metav1.OwnerReference, aaBehavior affinityassistant.AffinityAssitantBehavior) v1.WorkspaceBinding { if wb.VolumeClaimTemplate == nil { binding := *wb.DeepCopy() binding.Name = taskWorkspaceName @@ -1098,9 +1106,8 @@ func (c *Reconciler) taskWorkspaceByWorkspaceVolumeSource(ctx context.Context, p } binding.Name = taskWorkspaceName - if !c.isAffinityAssistantDisabled(ctx) { - binding.PersistentVolumeClaim.ClaimName = getPersistentVolumeClaimNameWithAffinityAssistant(pipelineWorkspaceName, prName, wb, owner) - } else { + // TODO(#6740)(WIP): get binding for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation mode + if aaBehavior == affinityassistant.AffinityAssistantDisabled || aaBehavior == affinityassistant.AffinityAssistantPerWorkspace { binding.PersistentVolumeClaim.ClaimName = volumeclaim.GetPVCNameWithoutAffinityAssistant(wb.VolumeClaimTemplate.Name, wb, owner) } diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 3d0a631ac74..0c4a93386ed 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -40,6 +40,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1alpha1" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" resolutionv1beta1 "github.com/tektoncd/pipeline/pkg/apis/resolution/v1beta1" + "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" resolutionutil "github.com/tektoncd/pipeline/pkg/internal/resolution" "github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent" "github.com/tektoncd/pipeline/pkg/reconciler/events/k8sevent" @@ -532,7 +533,7 @@ spec: wantRun: mustParseCustomRunWithObjectMeta(t, taskRunObjectMetaWithAnnotations("test-pipelinerun-custom-task", "namespace", "test-pipelinerun", "test-pipelinerun", "custom-task", false, map[string]string{ - "pipeline.tekton.dev/affinity-assistant": getAffinityAssistantName("pipelinews", pipelineRunName), + "pipeline.tekton.dev/affinity-assistant": GetAffinityAssistantName("pipelinews", pipelineRunName), }), ` spec: @@ -4043,8 +4044,8 @@ spec: t.Fatalf("expected one StatefulSet created. %d was created", len(stsNames)) } - expectedAffinityAssistantName1 := getAffinityAssistantName(workspaceName, pipelineRunName) - expectedAffinityAssistantName2 := getAffinityAssistantName(workspaceName2, pipelineRunName) + expectedAffinityAssistantName1 := GetAffinityAssistantName(workspaceName, pipelineRunName) + expectedAffinityAssistantName2 := GetAffinityAssistantName(workspaceName2, pipelineRunName) expectedAffinityAssistantStsNames := make(map[string]bool) expectedAffinityAssistantStsNames[expectedAffinityAssistantName1] = true expectedAffinityAssistantStsNames[expectedAffinityAssistantName2] = true @@ -7673,7 +7674,7 @@ func Test_taskWorkspaceByWorkspaceVolumeSource(t *testing.T) { name, taskWorkspaceName, pipelineWorkspaceName, prName string wb v1.WorkspaceBinding expectedBinding v1.WorkspaceBinding - disableAffinityAssistant bool + aaBehavior affinityassistant.AffinityAssitantBehavior }{ { name: "PVC Workspace with Affinity Assistant", @@ -7687,9 +7688,10 @@ func Test_taskWorkspaceByWorkspaceVolumeSource(t *testing.T) { expectedBinding: v1.WorkspaceBinding{ Name: "task-workspace", PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: "pvc-2c26b46b68-affinity-assistant-e011a5ef79-0", + ClaimName: "pvc-2c26b46b68", }, }, + aaBehavior: affinityassistant.AffinityAssistantPerWorkspace, }, { name: "PVC Workspace without Affinity Assistant", @@ -7705,7 +7707,7 @@ func Test_taskWorkspaceByWorkspaceVolumeSource(t *testing.T) { ClaimName: "pvc-2c26b46b68", }, }, - disableAffinityAssistant: true, + aaBehavior: affinityassistant.AffinityAssistantDisabled, }, { name: "non-PVC Workspace", @@ -7718,6 +7720,7 @@ func Test_taskWorkspaceByWorkspaceVolumeSource(t *testing.T) { Name: "task-workspace", EmptyDir: &corev1.EmptyDirVolumeSource{}, }, + aaBehavior: affinityassistant.AffinityAssistantPerWorkspace, }, } @@ -7725,20 +7728,7 @@ func Test_taskWorkspaceByWorkspaceVolumeSource(t *testing.T) { t.Run(tc.name, func(t *testing.T) { c := Reconciler{} ctx := context.Background() - if tc.disableAffinityAssistant { - featureFlags, err := config.NewFeatureFlagsFromMap(map[string]string{ - "disable-affinity-assistant": "true", - }) - if err != nil { - t.Fatalf("error creating feature flag disable-affinity-assistant from map: %v", err) - } - cfg := &config.Config{ - FeatureFlags: featureFlags, - } - ctx = config.ToContext(context.Background(), cfg) - } - - binding := c.taskWorkspaceByWorkspaceVolumeSource(ctx, tc.pipelineWorkspaceName, tc.prName, tc.wb, tc.taskWorkspaceName, "", *kmeta.NewControllerRef(testPr)) + binding := c.taskWorkspaceByWorkspaceVolumeSource(ctx, tc.pipelineWorkspaceName, tc.prName, tc.wb, tc.taskWorkspaceName, "", *kmeta.NewControllerRef(testPr), tc.aaBehavior) if d := cmp.Diff(tc.expectedBinding, binding); d != "" { t.Errorf("WorkspaceBinding diff: %s", diff.PrintWantGot(d)) } diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 8f35bccdf86..695e72d6d13 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -484,7 +484,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1.TaskRun, rtr *resourc // Please note that this block is required to run before `applyParamsContextsResultsAndWorkspaces` is called the first time, // and that `applyParamsContextsResultsAndWorkspaces` _must_ be called on every reconcile. if pod == nil && tr.HasVolumeClaimTemplate() { - if err := c.pvcHandler.CreatePVCsForWorkspacesWithoutAffinityAssistant(ctx, tr.Spec.Workspaces, *kmeta.NewControllerRef(tr), tr.Namespace); err != nil { + if err := c.pvcHandler.CreatePVCsForWorkspaces(ctx, tr.Spec.Workspaces, *kmeta.NewControllerRef(tr), tr.Namespace); err != nil { logger.Errorf("Failed to create PVC for TaskRun %s: %v", tr.Name, err) tr.Status.MarkResourceFailed(volumeclaim.ReasonCouldntCreateWorkspacePVC, fmt.Errorf("Failed to create PVC for TaskRun %s workspaces correctly: %w", diff --git a/pkg/reconciler/volumeclaim/pvchandler.go b/pkg/reconciler/volumeclaim/pvchandler.go index 6f0a28e6ca7..9e9283d3016 100644 --- a/pkg/reconciler/volumeclaim/pvchandler.go +++ b/pkg/reconciler/volumeclaim/pvchandler.go @@ -38,7 +38,7 @@ const ( // PvcHandler is used to create PVCs for workspaces type PvcHandler interface { - CreatePVCsForWorkspacesWithoutAffinityAssistant(ctx context.Context, wb []v1.WorkspaceBinding, ownerReference metav1.OwnerReference, namespace string) error + CreatePVCsForWorkspaces(ctx context.Context, wb []v1.WorkspaceBinding, ownerReference metav1.OwnerReference, namespace string) error } type defaultPVCHandler struct { @@ -51,13 +51,11 @@ func NewPVCHandler(clientset clientset.Interface, logger *zap.SugaredLogger) Pvc return &defaultPVCHandler{clientset, logger} } -// CreatePVCsForWorkspacesWithoutAffinityAssistant checks if a PVC named -- exists; +// CreatePVCsForWorkspaces checks if a PVC named -- exists; // where claim-name is provided by the user in the volumeClaimTemplate, and owner-name is the name of the // resource with the volumeClaimTemplate declared, a PipelineRun or TaskRun. If the PVC did not exist, a new PVC // with that name is created with the provided OwnerReference. -// This function is only called when Affinity Assistant is disabled. -// When Affinity Assistant is enabled, the PersistentVolumeClaims will be created by the Affinity Assistant StatefulSet VolumeClaimTemplate instead. -func (c *defaultPVCHandler) CreatePVCsForWorkspacesWithoutAffinityAssistant(ctx context.Context, wb []v1.WorkspaceBinding, ownerReference metav1.OwnerReference, namespace string) error { +func (c *defaultPVCHandler) CreatePVCsForWorkspaces(ctx context.Context, wb []v1.WorkspaceBinding, ownerReference metav1.OwnerReference, namespace string) error { var errs []error for _, claim := range getPVCsWithoutAffinityAssistant(wb, ownerReference, namespace) { _, err := c.clientset.CoreV1().PersistentVolumeClaims(claim.Namespace).Get(ctx, claim.Name, metav1.GetOptions{}) diff --git a/pkg/reconciler/volumeclaim/pvchandler_test.go b/pkg/reconciler/volumeclaim/pvchandler_test.go index 64c9b78fa2e..aaffa85d478 100644 --- a/pkg/reconciler/volumeclaim/pvchandler_test.go +++ b/pkg/reconciler/volumeclaim/pvchandler_test.go @@ -83,7 +83,7 @@ func TestCreatePersistentVolumeClaimsForWorkspaces(t *testing.T) { // when - err := pvcHandler.CreatePVCsForWorkspacesWithoutAffinityAssistant(ctx, workspaces, ownerRef, namespace) + err := pvcHandler.CreatePVCsForWorkspaces(ctx, workspaces, ownerRef, namespace) if err != nil { t.Fatalf("unexpexted error: %v", err) } @@ -147,7 +147,7 @@ func TestCreatePersistentVolumeClaimsForWorkspacesWithoutMetadata(t *testing.T) // when - err := pvcHandler.CreatePVCsForWorkspacesWithoutAffinityAssistant(ctx, workspaces, ownerRef, namespace) + err := pvcHandler.CreatePVCsForWorkspaces(ctx, workspaces, ownerRef, namespace) if err != nil { t.Fatalf("unexpexted error: %v", err) } @@ -205,7 +205,7 @@ func TestCreateExistPersistentVolumeClaims(t *testing.T) { } fakekubeclient.Fake.PrependReactor(actionGet, "*", fn) - err := pvcHandler.CreatePVCsForWorkspacesWithoutAffinityAssistant(ctx, workspaces, ownerRef, namespace) + err := pvcHandler.CreatePVCsForWorkspaces(ctx, workspaces, ownerRef, namespace) if err != nil { t.Fatalf("unexpected error: %v", err) } diff --git a/test/affinity_assistant_test.go b/test/affinity_assistant_test.go new file mode 100644 index 00000000000..27648a27c84 --- /dev/null +++ b/test/affinity_assistant_test.go @@ -0,0 +1,140 @@ +//go:build e2e +// +build e2e + +/* +Copyright 2023 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "context" + "fmt" + "testing" + + "github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun" + "github.com/tektoncd/pipeline/test/parse" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + knativetest "knative.dev/pkg/test" +) + +// TestAffinityAssistant_PerWorkspace tests the lifecycle status of Affinity Assistant and PVCs +// from the start to the deletion of a PipelineRun +func TestAffinityAssistant_PerWorkspace(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + c, namespace := setup(ctx, t) + knativetest.CleanupOnInterrupt(func() { tearDown(ctx, t, c, namespace) }, t.Logf) + defer tearDown(ctx, t, c, namespace) + prName := "my-pipelinerun" + pr := parse.MustParseV1PipelineRun(t, fmt.Sprintf(` +metadata: + name: %s + namespace: %s +spec: + pipelineSpec: + workspaces: + - name: my-workspace + tasks: + - name: foo + workspaces: + - name: my-workspace + taskSpec: + steps: + - image: busybox + script: echo hello foo + - name: bar + workspaces: + - name: my-workspace + taskSpec: + steps: + - image: busybox + script: echo hello bar + workspaces: + - name: my-workspace + volumeClaimTemplate: + metadata: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 16Mi +`, prName, namespace)) + + if _, err := c.V1PipelineRunClient.Create(ctx, pr, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create PipelineRun: %s", err) + } + t.Logf("Waiting for PipelineRun in namespace %s to run", namespace) + if err := WaitForPipelineRunState(ctx, c, prName, timeout, Running(prName), "PipelineRun Running", v1Version); err != nil { + t.Fatalf("Error waiting for PipelineRun to run: %s", err) + } + + // validate SS and PVC are created + ssName := pipelinerun.GetAffinityAssistantName("my-workspace", pr.Name) + ss, err := c.KubeClient.AppsV1().StatefulSets(namespace).Get(ctx, ssName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Couldn't get expected Affinity Assistant StatefulSet %s, err: %s", ss, err) + } + pvcList, err := c.KubeClient.CoreV1().PersistentVolumeClaims(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Fatalf("Couldn't get expected PVCs: %s", err) + } + if len(pvcList.Items) != 1 { + t.Fatalf("Unexpected PVC created, expecting 1 but got: %v", len(pvcList.Items)) + } + pvcName := pvcList.Items[0].Name + + // wait for PipelineRun to finish + t.Logf("Waiting for PipelineRun in namespace %s to finish", namespace) + if err := WaitForPipelineRunState(ctx, c, prName, timeout, PipelineRunSucceed(prName), "PipelineRunSucceeded", v1Version); err != nil { + t.Errorf("Error waiting for PipelineRun to finish: %s", err) + } + + // validate Affinity Assistant is cleaned up + ss, err = c.KubeClient.AppsV1().StatefulSets(namespace).Get(ctx, ssName, metav1.GetOptions{}) + if !apierrors.IsNotFound(err) { + t.Fatalf("Failed to cleanup Affinity Assistant StatefulSet: %v, err: %v", ssName, err) + } + + // validate PipelineRun pods sharing the same PVC are scheduled to the same node + podFoo, err := c.KubeClient.CoreV1().Pods(namespace).Get(ctx, fmt.Sprintf("%v-foo-pod", prName), metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get pod: %v-foo-pod, err: %v", prName, err) + } + podBar, err := c.KubeClient.CoreV1().Pods(namespace).Get(ctx, fmt.Sprintf("%v-bar-pod", prName), metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get pod: %v-bar-pod, err: %v", prName, err) + } + if podFoo.Spec.NodeName != podBar.Spec.NodeName { + t.Errorf("pods are not scheduled to same node: %v and %v", podFoo.Spec.NodeName, podBar.Spec.NodeName) + } + + // delete PipelineRun + if err = c.V1PipelineRunClient.Delete(ctx, prName, metav1.DeleteOptions{}); err != nil { + t.Fatalf("failed to delete PipelineRun: %s", err) + } + + // validate PVC is in bounded state + pvc, err := c.KubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(ctx, pvcName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("failed to get PVC %v after PipelineRun is deleted, err: %v", pvcName, err) + } + if pvc.Status.Phase != "Bound" { + t.Fatalf("expect PVC %s to be in bounded state but got %v", pvcName, pvc.Status.Phase) + } +}