diff --git a/pkg/reconciler/pipelinerun/affinity_assistant.go b/pkg/reconciler/pipelinerun/affinity_assistant.go index b7a16b319bd..0d3d0c2009e 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant.go @@ -26,6 +26,7 @@ import ( "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/pod" v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" + "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" "github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim" "github.com/tektoncd/pipeline/pkg/workspace" @@ -102,15 +103,13 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context } } case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation: - if claimNames != nil || claimTemplates != nil { - aaName := GetAffinityAssistantName("", pr.Name) - // The PVCs are created via StatefulSet's VolumeClaimTemplate for volume scheduling - // in AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes. - // This is because PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time in these modes, - // and there is no requirement of the PVC OwnerReference. - if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claimNames, unschedulableNodes); err != nil { - return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err) - } + aaName := GetAffinityAssistantName("", pr.Name) + // The PVCs are created via StatefulSet's VolumeClaimTemplate for volume scheduling + // in AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes. + // This is because PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time in these modes, + // and there is no requirement of the PVC OwnerReference. + if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claimNames, unschedulableNodes); err != nil { + return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err) } case aa.AffinityAssistantDisabled: for _, workspace := range claimTemplateToWorkspace { @@ -232,13 +231,29 @@ func (c *Reconciler) cleanupAffinityAssistantsAndPVCs(ctx context.Context, pr *v // getPersistentVolumeClaimNameWithAffinityAssistant returns the PersistentVolumeClaim name that is // created by the Affinity Assistant StatefulSet VolumeClaimTemplate when Affinity Assistant is enabled. // The PVCs created by StatefulSet VolumeClaimTemplates follow the format `--0` -// TODO(#6740)(WIP): use this function when adding end-to-end support for AffinityAssistantPerPipelineRun mode func getPersistentVolumeClaimNameWithAffinityAssistant(pipelineWorkspaceName, prName string, wb v1.WorkspaceBinding, owner metav1.OwnerReference) string { pvcName := volumeclaim.GeneratePVCNameFromWorkspaceBinding(wb.VolumeClaimTemplate.Name, wb, owner) affinityAssistantName := GetAffinityAssistantName(pipelineWorkspaceName, prName) return fmt.Sprintf("%s-%s-0", pvcName, affinityAssistantName) } +// getAffinityAssistantAnnotationVal generates and returns the value for `pipeline.tekton.dev/affinity-assistant` annotation +// based on aaBehavior, pipelinePVCWorkspaceName and prName +func getAffinityAssistantAnnotationVal(aaBehavior affinityassistant.AffinityAssistantBehavior, pipelinePVCWorkspaceName string, prName string) string { + switch aaBehavior { + case affinityassistant.AffinityAssistantPerWorkspace: + if pipelinePVCWorkspaceName != "" { + return GetAffinityAssistantName(pipelinePVCWorkspaceName, prName) + } + case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation: + return GetAffinityAssistantName("", prName) + + case affinityassistant.AffinityAssistantDisabled: + } + + return "" +} + // GetAffinityAssistantName returns the Affinity Assistant name based on pipelineWorkspaceName and pipelineRunName func GetAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string { hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName)) @@ -355,17 +370,6 @@ func affinityAssistantStatefulSet(aaBehavior aa.AffinityAssistantBehavior, name } } -// isAffinityAssistantDisabled returns a bool indicating whether an Affinity Assistant should -// be created for each PipelineRun that use workspaces with PersistentVolumeClaims -// as volume source. The default behaviour is to enable the Affinity Assistant to -// provide Node Affinity for TaskRuns that share a PVC workspace. -// -// TODO(#6740)(WIP): replace this function with GetAffinityAssistantBehavior -func (c *Reconciler) isAffinityAssistantDisabled(ctx context.Context) bool { - cfg := config.FromContextOrDefaults(ctx) - return cfg.FeatureFlags.DisableAffinityAssistant -} - // getAssistantAffinityMergedWithPodTemplateAffinity return the affinity that merged with PipelineRun PodTemplate affinity. func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun, aaBehavior aa.AffinityAssistantBehavior) *corev1.Affinity { affinityAssistantsAffinity := &corev1.Affinity{} diff --git a/pkg/reconciler/pipelinerun/affinity_assistant_test.go b/pkg/reconciler/pipelinerun/affinity_assistant_test.go index 020357850bb..e86d12a5eee 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant_test.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant_test.go @@ -50,7 +50,6 @@ import ( ) var podSpecFilter cmp.Option = cmpopts.IgnoreFields(corev1.PodSpec{}, "Containers", "Affinity") -var statefulSetSpecFilter cmp.Option = cmpopts.IgnoreFields(appsv1.StatefulSetSpec{}, "Replicas", "Selector") var podTemplateSpecFilter cmp.Option = cmpopts.IgnoreFields(corev1.PodTemplateSpec{}, "ObjectMeta") var workspacePVCName = "test-workspace-pvc" @@ -112,6 +111,7 @@ var testPRWithEmptyDir = &v1.PipelineRun{ // TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun tests to create and delete Affinity Assistants and PVCs // per pipelinerun for a given PipelineRun func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) { + replicas := int32(1) tests := []struct { name string pr *v1.PipelineRun @@ -120,6 +120,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) { name: "PersistentVolumeClaim Workspace type", pr: testPRWithPVC, expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithPVC.Name, + workspace.LabelInstance: "affinity-assistant-622aca4516", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Volumes: []corev1.Volume{{ @@ -135,6 +143,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) { name: "VolumeClaimTemplate Workspace type", pr: testPRWithVolumeClaimTemplate, expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplate.Name, + workspace.LabelInstance: "affinity-assistant-426b306c50", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, }}, @@ -143,6 +159,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) { name: "VolumeClaimTemplate and PersistentVolumeClaim Workspaces", pr: testPRWithVolumeClaimTemplateAndPVC, expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplateAndPVC.Name, + workspace.LabelInstance: "affinity-assistant-5bf44db4a8", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, }}, @@ -158,16 +182,31 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) { }, }, }, { - name: "other Workspace type", - pr: testPRWithEmptyDir, - expectStatefulSetSpec: nil, + name: "other Workspace type", + pr: testPRWithEmptyDir, + expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithEmptyDir.Name, + workspace.LabelInstance: "affinity-assistant-c655a0c8a2", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, + }, }} for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - ctx := context.Background() + configMap := map[string]string{ + "disable-affinity-assistant": "true", + "coschedule": "pipelineruns", + } + kubeClientSet := fakek8s.NewSimpleClientset() + ctx := cfgtesting.SetFeatureFlags(context.Background(), t, configMap) c := Reconciler{ - KubeClientSet: fakek8s.NewSimpleClientset(), + KubeClientSet: kubeClientSet, + pvcHandler: volumeclaim.NewPVCHandler(kubeClientSet, zap.NewExample().Sugar()), } err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, tc.pr, aa.AffinityAssistantPerPipelineRun) @@ -179,7 +218,15 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) { expectAAName := GetAffinityAssistantName("", tc.pr.Name) validateStatefulSetSpec(t, ctx, c, expectAAName, tc.expectStatefulSetSpec) - // TODO(#6740)(WIP): test cleanupAffinityAssistantsAndPVCs for coscheduling-pipelinerun mode when fully implemented + // clean up Affinity Assistant + c.cleanupAffinityAssistantsAndPVCs(ctx, tc.pr) + if err != nil { + t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err) + } + _, err = c.KubeClientSet.AppsV1().StatefulSets(tc.pr.Namespace).Get(ctx, expectAAName, metav1.GetOptions{}) + if !apierrors.IsNotFound(err) { + t.Errorf("expected a NotFound response, got: %v", err) + } }) } } @@ -187,6 +234,7 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) { // TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled tests to create and delete Affinity Assistants and PVCs // per workspace or disabled for a given PipelineRun func TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled(t *testing.T) { + replicas := int32(1) tests := []struct { name, expectedPVCName string pr *v1.PipelineRun @@ -197,6 +245,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled(t *testin aaBehavior: aa.AffinityAssistantPerWorkspace, pr: testPRWithPVC, expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithPVC.Name, + workspace.LabelInstance: "affinity-assistant-ac9f8fc5ee", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Volumes: []corev1.Volume{{ @@ -214,6 +270,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled(t *testin pr: testPRWithVolumeClaimTemplate, expectedPVCName: "pvc-b9eea16dce", expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplate.Name, + workspace.LabelInstance: "affinity-assistant-4cf1a1c468", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Volumes: []corev1.Volume{{ @@ -236,6 +300,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled(t *testin pr: testPRWithVolumeClaimTemplateAndPVC, expectedPVCName: "pvc-b9eea16dce", expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplateAndPVC.Name, + workspace.LabelInstance: "affinity-assistant-6c87e714a0", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Volumes: []corev1.Volume{{ @@ -246,6 +318,14 @@ func TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled(t *testin }}, }, }}, { + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplateAndPVC.Name, + workspace.LabelInstance: "affinity-assistant-6399c93362", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Volumes: []corev1.Volume{{ @@ -880,53 +960,6 @@ func TestThatCleanupIsAvoidedIfAssistantIsDisabled(t *testing.T) { } } -func TestDisableAffinityAssistant(t *testing.T) { - for _, tc := range []struct { - description string - configMap *corev1.ConfigMap - expected bool - }{{ - description: "Default behaviour: A missing disable-affinity-assistant flag should result in false", - configMap: &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, - Data: map[string]string{}, - }, - expected: false, - }, { - description: "Setting disable-affinity-assistant to false should result in false", - configMap: &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, - Data: map[string]string{ - featureFlagDisableAffinityAssistantKey: "false", - }, - }, - expected: false, - }, { - description: "Setting disable-affinity-assistant to true should result in true", - configMap: &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, - Data: map[string]string{ - featureFlagDisableAffinityAssistantKey: "true", - }, - }, - expected: true, - }} { - t.Run(tc.description, func(t *testing.T) { - c := Reconciler{ - KubeClientSet: fakek8s.NewSimpleClientset( - tc.configMap, - ), - Images: pipeline.Images{}, - } - store := config.NewStore(logtesting.TestLogger(t)) - store.OnConfigChanged(tc.configMap) - if result := c.isAffinityAssistantDisabled(store.ToContext(context.Background())); result != tc.expected { - t.Errorf("Expected %t Received %t", tc.expected, result) - } - }) - } -} - func TestGetAssistantAffinityMergedWithPodTemplateAffinity(t *testing.T) { labelSelector := &metav1.LabelSelector{ MatchLabels: map[string]string{ @@ -1101,6 +1134,48 @@ spec: } } +func TestGetAffinityAssistantAnnotationVal(t *testing.T) { + tcs := []struct { + name string + aaBehavior aa.AffinityAssistantBehavior + wsName, prName, expectAffinityAssistantAnnotationVal string + }{{ + name: "per workspace", + aaBehavior: aa.AffinityAssistantPerWorkspace, + wsName: "my-ws", + prName: "my-pipelinerun", + expectAffinityAssistantAnnotationVal: "affinity-assistant-315f58d30d", + }, { + name: "per workspace - empty pipeline workspace name", + aaBehavior: aa.AffinityAssistantPerWorkspace, + prName: "my-pipelinerun", + }, { + name: "per pipelinerun", + aaBehavior: aa.AffinityAssistantPerPipelineRun, + wsName: "my-ws", + prName: "my-pipelinerun", + expectAffinityAssistantAnnotationVal: "affinity-assistant-0b79942a50", + }, { + name: "isolate pipelinerun", + aaBehavior: aa.AffinityAssistantPerPipelineRunWithIsolation, + wsName: "my-ws", + prName: "my-pipelinerun", + expectAffinityAssistantAnnotationVal: "affinity-assistant-0b79942a50", + }, { + name: "disabled", + aaBehavior: aa.AffinityAssistantDisabled, + wsName: "my-ws", + prName: "my-pipelinerun", + }} + + for _, tc := range tcs { + aaAnnotationVal := getAffinityAssistantAnnotationVal(tc.aaBehavior, tc.wsName, tc.prName) + if diff := cmp.Diff(tc.expectAffinityAssistantAnnotationVal, aaAnnotationVal); diff != "" { + t.Errorf("Affinity Assistant Annotation Val mismatch: %v", diff) + } + } +} + type Data struct { StatefulSets []*appsv1.StatefulSet Nodes []*corev1.Node @@ -1139,7 +1214,7 @@ func validateStatefulSetSpec(t *testing.T, ctx context.Context, c Reconciler, ex if err != nil { t.Fatalf("unexpected error when retrieving StatefulSet: %v", err) } - if d := cmp.Diff(expectStatefulSetSpec, &aa.Spec, statefulSetSpecFilter, podSpecFilter, podTemplateSpecFilter); d != "" { + if d := cmp.Diff(expectStatefulSetSpec, &aa.Spec, podSpecFilter, podTemplateSpecFilter); d != "" { t.Errorf("StatefulSetSpec diff: %s", diff.PrintWantGot(d)) } } else if !apierrors.IsNotFound(err) { diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index b2435daabe9..899ab6346dc 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -617,28 +617,21 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel if err != nil { return controller.NewPermanentError(err) } - - switch aaBehavior { - case affinityassistant.AffinityAssistantPerWorkspace, affinityassistant.AffinityAssistantDisabled: - if err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, pr, aaBehavior); err != nil { - switch { - case errors.Is(err, ErrPvcCreationFailed): - logger.Errorf("Failed to create PVC for PipelineRun %s: %v", pr.Name, err) - pr.Status.MarkFailed(volumeclaim.ReasonCouldntCreateWorkspacePVC, - "Failed to create PVC for PipelineRun %s/%s correctly: %s", - pr.Namespace, pr.Name, err) - case errors.Is(err, ErrAffinityAssistantCreationFailed): - logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) - pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, - "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", - pr.Namespace, pr.Name, err) - default: - } - return controller.NewPermanentError(err) + if err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, pr, aaBehavior); err != nil { + switch { + case errors.Is(err, ErrPvcCreationFailed): + logger.Errorf("Failed to create PVC for PipelineRun %s: %v", pr.Name, err) + pr.Status.MarkFailed(volumeclaim.ReasonCouldntCreateWorkspacePVC, + "Failed to create PVC for PipelineRun %s/%s correctly: %s", + pr.Namespace, pr.Name, err) + case errors.Is(err, ErrAffinityAssistantCreationFailed): + logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) + pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, + "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", + pr.Namespace, pr.Name, err) + default: } - case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation: - // TODO(#6740)(WIP): implement end-to-end support for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation modes - return controller.NewPermanentError(fmt.Errorf("affinity assistant behavior: %v is not implemented", aaBehavior)) + return controller.NewPermanentError(err) } } @@ -885,8 +878,12 @@ func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, para return nil, err } - if !c.isAffinityAssistantDisabled(ctx) && pipelinePVCWorkspaceName != "" { - tr.Annotations[workspace.AnnotationAffinityAssistantName] = GetAffinityAssistantName(pipelinePVCWorkspaceName, pr.Name) + aaBehavior, err := affinityassistant.GetAffinityAssistantBehavior(ctx) + if err != nil { + return nil, err + } + if aaAnnotationVal := getAffinityAssistantAnnotationVal(aaBehavior, pipelinePVCWorkspaceName, pr.Name); aaAnnotationVal != "" { + tr.Annotations[workspace.AnnotationAffinityAssistantName] = aaAnnotationVal } logger.Infof("Creating a new TaskRun object %s for pipeline task %s", taskRunName, rpt.PipelineTask.Name) @@ -1006,10 +1003,15 @@ func (c *Reconciler) createCustomRun(ctx context.Context, runName string, params }, } } + // Set the affinity assistant annotation in case the custom task creates TaskRuns or Pods // that can take advantage of it. - if !c.isAffinityAssistantDisabled(ctx) && pipelinePVCWorkspaceName != "" { - r.Annotations[workspace.AnnotationAffinityAssistantName] = GetAffinityAssistantName(pipelinePVCWorkspaceName, pr.Name) + aaBehavior, err := affinityassistant.GetAffinityAssistantBehavior(ctx) + if err != nil { + return nil, err + } + if aaAnnotationVal := getAffinityAssistantAnnotationVal(aaBehavior, pipelinePVCWorkspaceName, pr.Name); aaAnnotationVal != "" { + r.Annotations[workspace.AnnotationAffinityAssistantName] = aaAnnotationVal } logger.Infof("Creating a new CustomRun object %s", runName) @@ -1125,9 +1127,11 @@ func (c *Reconciler) taskWorkspaceByWorkspaceVolumeSource(ctx context.Context, p } binding.Name = taskWorkspaceName - // TODO(#6740)(WIP): get binding for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation mode - if aaBehavior == affinityassistant.AffinityAssistantDisabled || aaBehavior == affinityassistant.AffinityAssistantPerWorkspace { + switch aaBehavior { + case affinityassistant.AffinityAssistantPerWorkspace, affinityassistant.AffinityAssistantDisabled: binding.PersistentVolumeClaim.ClaimName = volumeclaim.GeneratePVCNameFromWorkspaceBinding(wb.VolumeClaimTemplate.Name, wb, owner) + case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation: + binding.PersistentVolumeClaim.ClaimName = getPersistentVolumeClaimNameWithAffinityAssistant("", prName, wb, owner) } return binding diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 72c8326de32..57be16e2766 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -66,7 +66,9 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation/field" fakek8s "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/typed/core/v1/fake" ktesting "k8s.io/client-go/testing" + testing2 "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" "knative.dev/pkg/apis" @@ -13279,6 +13281,87 @@ spec: } } +func TestHandleAffinityAssistantAndPVCCreationError(t *testing.T) { + prName := "affinity-assistant-creation-fail" + namespace := "default" + pr := parse.MustParseV1PipelineRun(t, fmt.Sprintf(` +metadata: + name: %s + namespace: %s +spec: + pipelineSpec: + tasks: + - name: hello-world + workspaces: + - name: my-ws + taskSpec: + steps: + - image: busybox + script: echo hello + workspaces: + - name: my-ws + volumeClaimTemplate: + metadata: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 16Mi +`, prName, namespace)) + + tcs := []struct { + name, failureType, expectErrorReason string + }{{ + name: "pvc creation error", + failureType: "pvc", + expectErrorReason: volumeclaim.ReasonCouldntCreateWorkspacePVC, + }, { + name: "affinity assistant creation error", + failureType: "statefulset", + expectErrorReason: ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, + }} + + for _, tc := range tcs { + d := test.Data{ + PipelineRuns: []*v1.PipelineRun{pr.DeepCopy()}, + } + testAssets, cancel := getPipelineRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + + // mock pvc creation err + switch tc.failureType { + case "pvc": + clients.Kube.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "persistentvolumeclaims", + func(action testing2.Action) (handled bool, ret runtime.Object, err error) { + return true, &corev1.PersistentVolumeClaim{}, errors.New("error creating persistentvolumeclaims") + }) + case "statefulset": + clients.Kube.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "statefulsets", + func(action testing2.Action) (handled bool, ret runtime.Object, err error) { + return true, &appsv1.StatefulSet{}, errors.New("error creating statefulsets") + }) + } + + // reconciler + err := c.Reconciler.Reconcile(testAssets.Ctx, fmt.Sprintf("%s/%s", namespace, prName)) + if !controller.IsPermanentError(err) { + t.Errorf("expected permanent error but got %s", err) + } + + // check pr status + reconciledPr, err := clients.Pipeline.TektonV1().PipelineRuns(namespace).Get(testAssets.Ctx, prName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("had error getting reconciled pipelinerun out of fake client: %s", err) + } + if diff := cmp.Diff(tc.expectErrorReason, reconciledPr.Status.GetCondition(apis.ConditionSucceeded).Reason); diff != "" { + t.Errorf("pipelinerun fail reason mismatch: %v", diff) + } + } +} + func TestHandleTaskRunCreationError(t *testing.T) { prName := "taskrun-creation-fails" namespace := "default" diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 040ca08fb3f..9e32d319ff9 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -418,7 +418,11 @@ func (c *Reconciler) prepare(ctx context.Context, tr *v1.TaskRun) (*v1.TaskSpec, return nil, nil, controller.NewPermanentError(err) } - if _, usesAssistant := tr.Annotations[workspace.AnnotationAffinityAssistantName]; usesAssistant { + aaBehavior, err := affinityassistant.GetAffinityAssistantBehavior(ctx) + if err != nil { + return nil, nil, controller.NewPermanentError(err) + } + if aaBehavior == affinityassistant.AffinityAssistantPerWorkspace { if err := workspace.ValidateOnlyOnePVCIsUsed(tr.Spec.Workspaces); err != nil { logger.Errorf("TaskRun %q workspaces incompatible with Affinity Assistant: %v", tr.Name, err) tr.Status.MarkResourceFailed(podconvert.ReasonFailedValidation, err) diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index bbcc5c7edc8..8a57fb18f95 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -3273,10 +3273,9 @@ spec: } } -// TestReconcileWithWorkspacesIncompatibleWithAffinityAssistant tests that a TaskRun used with an associated -// Affinity Assistant is validated and that the validation fails for a TaskRun that is incompatible with -// Affinity Assistant; e.g. using more than one PVC-backed workspace. -func TestReconcileWithWorkspacesIncompatibleWithAffinityAssistant(t *testing.T) { +// TestReconcileWithMultiplePVCWorkspaceWithAffinityAssistant tests the execution of a TaskRun binding two VolumeClaimTemplate +// as Workspace in AffinityAssistantPerWorkspaces mode and AffinityAssistantPerPipelineruns mode. +func TestReconcileWithMultiplePVCWorkspaceWithAffinityAssistant(t *testing.T) { taskWithTwoWorkspaces := parse.MustParseV1Task(t, ` metadata: name: test-task-two-workspaces @@ -3288,6 +3287,11 @@ spec: readOnly: true - description: another workspace name: ws2 + steps: + - command: + - /mycmd + image: foo + name: simple-step `) taskRun := parse.MustParseV1TaskRun(t, ` metadata: @@ -3309,34 +3313,63 @@ spec: name: pvc2 `) - d := test.Data{ - Tasks: []*v1.Task{taskWithTwoWorkspaces}, - TaskRuns: []*v1.TaskRun{taskRun}, - ClusterTasks: nil, - } - testAssets, cancel := getTaskRunController(t, d) - defer cancel() - clients := testAssets.Clients - createServiceAccount(t, testAssets, "default", "foo") - _ = testAssets.Controller.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)) + tcs := []struct { + name string + cfgMap map[string]string + expectFailureReason string + }{{ + name: "multiple PVC based Workspaces in per workspace coschedule mode - failure", + cfgMap: map[string]string{ + "disable-affinity-assistant": "false", + "coschedule": "workspaces", + }, + expectFailureReason: podconvert.ReasonFailedValidation, + }, { + name: "multiple PVC based Workspaces in per pipelinerun coschedule mode - success", + cfgMap: map[string]string{ + "disable-affinity-assistant": "true", + "coschedule": "pipelineruns", + }, + }} - _, err := clients.Pipeline.TektonV1().Tasks(taskRun.Namespace).Get(testAssets.Ctx, taskWithTwoWorkspaces.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("krux: %v", err) - } + for _, tc := range tcs { + d := test.Data{ + Tasks: []*v1.Task{taskWithTwoWorkspaces}, + TaskRuns: []*v1.TaskRun{taskRun}, + ClusterTasks: nil, + ConfigMaps: []*corev1.ConfigMap{{ + ObjectMeta: metav1.ObjectMeta{Namespace: system.Namespace(), Name: config.GetFeatureFlagsConfigName()}, + Data: tc.cfgMap, + }}, + } + testAssets, cancel := getTaskRunController(t, d) + defer cancel() + clients := testAssets.Clients + createServiceAccount(t, testAssets, "default", "foo") + _ = testAssets.Controller.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)) - ttt, err := clients.Pipeline.TektonV1().TaskRuns(taskRun.Namespace).Get(testAssets.Ctx, taskRun.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("expected TaskRun %s to exist but instead got error when getting it: %v", taskRun.Name, err) - } + _, err := clients.Pipeline.TektonV1().Tasks(taskRun.Namespace).Get(testAssets.Ctx, taskWithTwoWorkspaces.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("failed to get task: %v", err) + } - if len(ttt.Status.Conditions) != 1 { - t.Errorf("unexpected number of Conditions, expected 1 Condition") - } + ttt, err := clients.Pipeline.TektonV1().TaskRuns(taskRun.Namespace).Get(testAssets.Ctx, taskRun.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("expected TaskRun %s to exist but instead got error when getting it: %v", taskRun.Name, err) + } + + if len(ttt.Status.Conditions) != 1 { + t.Errorf("unexpected number of Conditions, expected 1 Condition") + } - for _, cond := range ttt.Status.Conditions { - if cond.Reason != podconvert.ReasonFailedValidation { - t.Errorf("unexpected Reason on the Condition, expected: %s, got: %s", podconvert.ReasonFailedValidation, cond.Reason) + if tc.expectFailureReason != "" { + for _, cond := range ttt.Status.Conditions { + if cond.Reason != tc.expectFailureReason { + t.Errorf("unexpected Reason on the Condition, expected: %s, got: %s", tc.expectFailureReason, cond.Reason) + } + } + } else if ttt.IsFailure() { + t.Errorf("Unexpected unsuccessful condition for TaskRun %q:\n%#v", taskRun.Name, ttt.Status.Conditions) } } } diff --git a/test/affinity_assistant_test.go b/test/affinity_assistant_test.go index 56da797f83b..4c23676031b 100644 --- a/test/affinity_assistant_test.go +++ b/test/affinity_assistant_test.go @@ -22,10 +22,14 @@ package test import ( "context" "fmt" + "strconv" "testing" + "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/test/parse" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "knative.dev/pkg/system" knativetest "knative.dev/pkg/test" ) @@ -94,17 +98,8 @@ spec: pvcName := pvcList.Items[0].Name // validate PipelineRun pods sharing the same PVC are scheduled to the same node - podFoo, err := c.KubeClient.CoreV1().Pods(namespace).Get(ctx, fmt.Sprintf("%v-foo-pod", prName), metav1.GetOptions{}) - if err != nil { - t.Fatalf("Failed to get pod: %v-foo-pod, err: %v", prName, err) - } - podBar, err := c.KubeClient.CoreV1().Pods(namespace).Get(ctx, fmt.Sprintf("%v-bar-pod", prName), metav1.GetOptions{}) - if err != nil { - t.Fatalf("Failed to get pod: %v-bar-pod, err: %v", prName, err) - } - if podFoo.Spec.NodeName != podBar.Spec.NodeName { - t.Errorf("pods are not scheduled to same node: %v and %v", podFoo.Spec.NodeName, podBar.Spec.NodeName) - } + podNames := []string{fmt.Sprintf("%v-foo-pod", prName), fmt.Sprintf("%v-bar-pod", prName)} + validatePodAffinity(t, ctx, podNames, namespace, c.KubeClient) // delete PipelineRun if err = c.V1PipelineRunClient.Delete(ctx, prName, metav1.DeleteOptions{}); err != nil { @@ -120,3 +115,127 @@ spec: t.Fatalf("expect PVC %s to be in bounded state but got %v", pvcName, pvc.Status.Phase) } } + +// TestAffinityAssistant_PerPipelineRun tests that mounting multiple PVC based workspaces to a pipeline task is allowed and +// all the pods are scheduled to the same node in AffinityAssistantPerPipelineRuns mode +func TestAffinityAssistant_PerPipelineRun(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + c, namespace := setup(ctx, t) + knativetest.CleanupOnInterrupt(func() { tearDown(ctx, t, c, namespace) }, t.Logf) + defer resetFeatureFlagAndCleanup(ctx, t, c, namespace) + + // update feature flag + configMapData := map[string]string{ + "coschedule": config.CoschedulePipelineRuns, + "disable-affinity-assistant": "true", + } + if err := updateConfigMap(ctx, c.KubeClient, system.Namespace(), config.GetFeatureFlagsConfigName(), configMapData); err != nil { + t.Fatal(err) + } + + prName := "my-pipelinerun" + pr := parse.MustParseV1PipelineRun(t, fmt.Sprintf(` +metadata: + name: %s + namespace: %s +spec: + pipelineSpec: + workspaces: + - name: my-workspace + - name: my-workspace2 + tasks: + - name: foo + workspaces: + - name: my-workspace + taskSpec: + steps: + - image: busybox + script: echo hello foo + - name: bar + workspaces: + - name: my-workspace2 + taskSpec: + steps: + - image: busybox + script: echo hello bar + - name: double-ws + workspaces: + - name: my-workspace + - name: my-workspace2 + taskSpec: + steps: + - image: busybox + script: echo double-ws + - name: no-ws + taskSpec: + steps: + - image: busybox + script: echo no-ws + workspaces: + - name: my-workspace + volumeClaimTemplate: + metadata: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 16Mi + - name: my-workspace2 + volumeClaimTemplate: + metadata: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 16Mi +`, prName, namespace)) + + // create PipelineRun + if _, err := c.V1PipelineRunClient.Create(ctx, pr, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create PipelineRun: %s", err) + } + + // wait for PipelineRun to finish + t.Logf("Waiting for PipelineRun in namespace %s to finish", namespace) + if err := WaitForPipelineRunState(ctx, c, prName, timeout, PipelineRunSucceed(prName), "PipelineRunSucceeded", v1Version); err != nil { + t.Errorf("Error waiting for PipelineRun to finish: %s", err) + } + + // validate PipelineRun pods sharing the same PVC are scheduled to the same node + podNames := []string{fmt.Sprintf("%v-foo-pod", prName), fmt.Sprintf("%v-bar-pod", prName), fmt.Sprintf("%v-double-ws-pod", prName), fmt.Sprintf("%v-no-ws-pod", prName)} + validatePodAffinity(t, ctx, podNames, namespace, c.KubeClient) +} + +// validatePodAffinity checks if all the given pods are scheduled to the same node +func validatePodAffinity(t *testing.T, ctx context.Context, podNames []string, namespace string, client kubernetes.Interface) { + t.Helper() + nodeName := "" + for _, podName := range podNames { + pod, err := client.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get pod: %v, err: %v", podName, err) + } + + if nodeName == "" { + nodeName = pod.Spec.NodeName + } else if pod.Spec.NodeName != nodeName { + t.Errorf("pods are not scheduled to the same node as expected %v vs %v", nodeName, pod.Spec.NodeName) + } + } +} + +func resetFeatureFlagAndCleanup(ctx context.Context, t *testing.T, c *clients, namespace string) { + t.Helper() + configMapData := map[string]string{ + "coschedule": config.DefaultCoschedule, + "disable-affinity-assistant": strconv.FormatBool(config.DefaultDisableAffinityAssistant), + } + if err := updateConfigMap(ctx, c.KubeClient, system.Namespace(), config.GetFeatureFlagsConfigName(), configMapData); err != nil { + t.Fatal(err) + } + tearDown(ctx, t, c, namespace) +}