Skip to content

Commit

Permalink
[TEP-0135] Refactor CreatePVCsForWorkspaces
Browse files Browse the repository at this point in the history
Part of [#6740][#6740] and closes [#6915].

Prior to this commit, the `createOrUpdateAffinityAssistantsAndPVCs` function attempts to create all Affinity Assistant StatefulSets and returns aggregated errors.
This could result in time and resource waste when executing a pipelinerun that will fail. This commit updates it to "fail fast" strategy
where the function is returned as soon as the first error is encountered.

This commit also refactors the original `CreatePVCsForWorkspacesWithoutAffinityAssistant` (renamed to `CreatePVCFromVolumeClaimTemplate`) function
and its usages to improve readability since the PVC creation logic is now dependent on `AffinityAssistantBehavior`.

/kind feature

[#6740]: #6740
[#6915]: #6915
  • Loading branch information
QuanZhang-William committed Jul 12, 2023
1 parent b769b56 commit 1b38911
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 103 deletions.
55 changes: 28 additions & 27 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ import (
)

const (
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// as a volume source and expect an Assistant StatefulSet in AffinityAssistantPerWorkspace behavior, but couldn't create a StatefulSet.
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace"
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet"

featureFlagDisableAffinityAssistantKey = "disable-affinity-assistant"
)
Expand All @@ -53,14 +53,13 @@ const (
// every taskrun in the pipelinerun that use the same PVC based volume.
// If the AffinityAssitantBehavior is AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation,
// it creates one Affinity Assistant for the pipelinerun.
func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context, pr *v1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) error {
var errs []error
func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context, pr *v1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) (string, error) {
var unschedulableNodes sets.Set[string] = nil

var claimTemplates []corev1.PersistentVolumeClaim
var claims []corev1.PersistentVolumeClaimVolumeSource
claimToWorkspace := map[*corev1.PersistentVolumeClaimVolumeSource]string{}
claimTemplatesToWorkspace := map[*corev1.PersistentVolumeClaim]string{}
claimTemplatesToWorkspace := map[*corev1.PersistentVolumeClaim]v1.WorkspaceBinding{}

for _, w := range pr.Spec.Workspaces {
if w.PersistentVolumeClaim == nil && w.VolumeClaimTemplate == nil {
Expand All @@ -74,47 +73,49 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context
claimTemplate := w.VolumeClaimTemplate.DeepCopy()
claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr))
claimTemplates = append(claimTemplates, *claimTemplate)
claimTemplatesToWorkspace[claimTemplate] = w.Name
}
}

// create PVCs from PipelineRun's VolumeClaimTemplate when aaBehavior is AffinityAssistantPerWorkspace or AffinityAssistantDisabled before creating
// affinity assistant so that the OwnerReference of the PVCs are the pipelineruns, which is used to achieve PVC auto deletion at PipelineRun deletion time
if (aaBehavior == aa.AffinityAssistantPerWorkspace || aaBehavior == aa.AffinityAssistantDisabled) && pr.HasVolumeClaimTemplate() {
if err := c.pvcHandler.CreatePVCsForWorkspaces(ctx, pr.Spec.Workspaces, *kmeta.NewControllerRef(pr), pr.Namespace); err != nil {
return fmt.Errorf("failed to create PVC for PipelineRun %s: %w", pr.Name, err)
claimTemplatesToWorkspace[claimTemplate] = w
}
}

switch aaBehavior {
case aa.AffinityAssistantPerWorkspace:
for claim, workspaceName := range claimToWorkspace {
aaName := GetAffinityAssistantName(workspaceName, pr.Name)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes)
errs = append(errs, err...)
if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes); err != nil {
return ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, errorutils.NewAggregate(err)
}
}
for claimTemplate, workspaceName := range claimTemplatesToWorkspace {
aaName := GetAffinityAssistantName(workspaceName, pr.Name)
// To support PVC auto deletion at pipelinerun deletion time, the OwnerReference of the PVCs should be set to the owning pipelinerun.
// In AffinityAssistantPerWorkspace mode, the reconciler has created PVCs (owned by pipelinerun) from pipelinerun's VolumeClaimTemplate at this point,
// so the VolumeClaimTemplates are pass in as PVCs when creating affinity assistant StatefulSet for volume scheduling.
// If passed in as VolumeClaimTemplates, the PVCs are owned by Affinity Assistant StatefulSet instead of the pipelinerun.
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes)
errs = append(errs, err...)
for claimTemplate, workspace := range claimTemplatesToWorkspace {
// To support PVC auto deletion at pipelinerun deletion time, the OwnerReference of the PVCs should be set to the owning pipelinerun,
// so we create PVCs from PipelineRuns' VolumeClaimTemplate and pass the PVCs to the Affinity Assistant StatefulSet for volume scheduling.
// If passed in as VolumeClaimTemplates directrly, the PVCs are owned by Affinity Assistant StatefulSet instead of the pipelinerun.
if err := c.pvcHandler.CreatePVCFromVolumeClaimTemplate(ctx, workspace, *kmeta.NewControllerRef(pr), pr.Namespace); err != nil {
return volumeclaim.ReasonCouldntCreateWorkspacePVC, err
}
aaName := GetAffinityAssistantName(workspace.Name, pr.Name)
if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes); err != nil {
return ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, errorutils.NewAggregate(err)
}
}
case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation:
if claims != nil || claimTemplates != nil {
aaName := GetAffinityAssistantName("", pr.Name)
// In AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes, the PVCs are created via StatefulSet for volume scheduling.
// PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time,
// so we don't need to worry the OwnerReference of the PVCs
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes)
errs = append(errs, err...)
if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes); err != nil {
return ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, errorutils.NewAggregate(err)
}
}
case aa.AffinityAssistantDisabled:
for _, workspace := range claimTemplatesToWorkspace {
if err := c.pvcHandler.CreatePVCFromVolumeClaimTemplate(ctx, workspace, *kmeta.NewControllerRef(pr), pr.Namespace); err != nil {
return volumeclaim.ReasonCouldntCreateWorkspacePVC, err
}
}
}

return errorutils.NewAggregate(errs)
return "", nil
}

// createOrUpdateAffinityAssistant creates an Affinity Assistant Statefulset with the provided affinityAssistantName and pipelinerun information.
Expand Down
90 changes: 79 additions & 11 deletions pkg/reconciler/pipelinerun/affinity_assistant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pipelinerun
import (
"context"
"errors"
"fmt"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -108,9 +109,9 @@ var testPRWithEmptyDir = &v1.PipelineRun{
},
}

// TestCreateAndDeleteOfAffinityAssistantPerPipelineRun tests to create and delete an Affinity Assistant
// TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun tests to create and delete Affinity Assistants and PVCs
// per pipelinerun for a given PipelineRun
func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) {
func TestCreateOrUpdateAffinityAssistantsAndPVCsPerPipelineRun(t *testing.T) {
tests := []struct {
name string
pr *v1.PipelineRun
Expand Down Expand Up @@ -169,9 +170,9 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) {
KubeClientSet: fakek8s.NewSimpleClientset(),
}

err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, tc.pr, aa.AffinityAssistantPerPipelineRun)
reason, err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, tc.pr, aa.AffinityAssistantPerPipelineRun)
if err != nil {
t.Errorf("unexpected error from createOrUpdateAffinityAssistantsPerPipelineRun: %v", err)
t.Errorf("unexpected error from createOrUpdateAffinityAssistantsPerPipelineRun: %v with reason: %v", err, reason)
}

// validate StatefulSets from Affinity Assistant
Expand All @@ -183,9 +184,9 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) {
}
}

// TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled tests to create and delete an Affinity Assistant
// per workspace for a given PipelineRun
func TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled(t *testing.T) {
// TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled tests to create and delete Affinity Assistants and PVCs
// per workspace or disabled for a given PipelineRun
func TestCreateOrUpdateAffinityAssistantsAndPVCsPerWorkspaceOrDisabled(t *testing.T) {
tests := []struct {
name, expectedPVCName string
pr *v1.PipelineRun
Expand Down Expand Up @@ -273,9 +274,9 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled(t *testing.T)
pvcHandler: volumeclaim.NewPVCHandler(kubeClientSet, zap.NewExample().Sugar()),
}

err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, tc.pr, tc.aaBehavior)
reason, err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, tc.pr, tc.aaBehavior)
if err != nil {
t.Fatalf("unexpected error from createOrUpdateAffinityAssistantsPerWorkspace: %v", err)
t.Fatalf("unexpected error from createOrUpdateAffinityAssistantsAndPVCs: %v with reason %v", err, reason)
}

// validate StatefulSets from Affinity Assistant
Expand Down Expand Up @@ -314,6 +315,73 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled(t *testing.T)
}
}

func TestCreateOrUpdateAffinityAssistantsAndPVCs_Failure(t *testing.T) {
testCases := []struct {
name, failureType string
aaBehavior aa.AffinityAssitantBehavior
expectedErr error
expectedReason string
}{{
name: "affinity assistant creation failed - per workspace",
failureType: "statefulset",
aaBehavior: aa.AffinityAssistantPerWorkspace,
expectedErr: fmt.Errorf("failed to create StatefulSet affinity-assistant-4cf1a1c468: error creating statefulsets"),
expectedReason: ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet,
}, {
name: "affinity assistant creation failed - per pipelinerun",
failureType: "statefulset",
aaBehavior: aa.AffinityAssistantPerPipelineRun,
expectedErr: fmt.Errorf("failed to create StatefulSet affinity-assistant-426b306c50: error creating statefulsets"),
expectedReason: ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet,
}, {
name: "pvc creation failed - per workspace",
failureType: "pvc",
aaBehavior: aa.AffinityAssistantPerWorkspace,
expectedErr: fmt.Errorf("failed to create PVC pvc-b9eea16dce: error creating persistentvolumeclaims"),
expectedReason: volumeclaim.ReasonCouldntCreateWorkspacePVC,
}, {
name: "pvc creation failed - disabled",
failureType: "pvc",
aaBehavior: aa.AffinityAssistantDisabled,
expectedErr: fmt.Errorf("failed to create PVC pvc-b9eea16dce: error creating persistentvolumeclaims"),
expectedReason: volumeclaim.ReasonCouldntCreateWorkspacePVC,
}}

for _, tc := range testCases {
ctx := context.Background()
kubeClientSet := fakek8s.NewSimpleClientset()
c := Reconciler{
KubeClientSet: kubeClientSet,
pvcHandler: volumeclaim.NewPVCHandler(kubeClientSet, zap.NewExample().Sugar()),
}

switch tc.failureType {
case "pvc":
c.KubeClientSet.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "persistentvolumeclaims",
func(action testing2.Action) (handled bool, ret runtime.Object, err error) {
return true, &corev1.PersistentVolumeClaim{}, errors.New("error creating persistentvolumeclaims")
})
case "statefulset":
c.KubeClientSet.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "statefulsets",
func(action testing2.Action) (handled bool, ret runtime.Object, err error) {
return true, &appsv1.StatefulSet{}, errors.New("error creating statefulsets")
})
}

reason, err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, testPRWithVolumeClaimTemplate, tc.aaBehavior)

if err == nil {
t.Errorf("expect error from createOrUpdateAffinityAssistantsAndPVCs but got nil")
}
if d := cmp.Diff(tc.expectedErr.Error(), err.Error()); d != "" {
t.Errorf("expected err mismatching: %v", diff.PrintWantGot(d))
}
if d := cmp.Diff(tc.expectedReason, reason); d != "" {
t.Errorf("expected failure reason mismatching: %v", diff.PrintWantGot(d))
}
}
}

// TestCreateAffinityAssistantWhenNodeIsCordoned tests an existing Affinity Assistant can identify the node failure and
// can migrate the affinity assistant pod to a healthy node so that the existing pipelineRun runs to competition
func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) {
Expand Down Expand Up @@ -417,9 +485,9 @@ func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) {
return true, &corev1.Pod{}, errors.New("error listing/deleting pod")
})
}
err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, testPRWithPVC, aa.AffinityAssistantPerWorkspace)
reason, err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, testPRWithPVC, aa.AffinityAssistantPerWorkspace)
if !tt.expectedError && err != nil {
t.Errorf("expected no error from createOrUpdateAffinityAssistantsPerWorkspace for the test \"%s\", but got: %v", tt.name, err)
t.Errorf("expected no error from createOrUpdateAffinityAssistantsPerWorkspace for the test \"%s\", but got: %v; reason: %v", tt.name, err, reason)
}
// the affinity assistant pod must have been deleted when it was running on a cordoned node
if tt.validatePodDeletion {
Expand Down
16 changes: 5 additions & 11 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,18 +613,12 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel

switch aaBehavior {
case affinityassistant.AffinityAssistantPerWorkspace, affinityassistant.AffinityAssistantDisabled:
if err = c.createOrUpdateAffinityAssistantsAndPVCs(ctx, pr, aaBehavior); err != nil {
failReason, err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, pr, aaBehavior)
if err != nil {
logger.Errorf("Failed to create PVC or affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err)
if aaBehavior == affinityassistant.AffinityAssistantPerWorkspace {
pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace,
"Failed to create StatefulSet for PipelineRun %s/%s correctly: %s",
pr.Namespace, pr.Name, err)
} else {
pr.Status.MarkFailed(volumeclaim.ReasonCouldntCreateWorkspacePVC,
"Failed to create PVC for PipelineRun %s/%s Workspaces correctly: %s",
pr.Namespace, pr.Name, err)
}

pr.Status.MarkFailed(failReason,
"Failed to create StatefulSet for PipelineRun %s/%s correctly: %s",
pr.Namespace, pr.Name, err)
return controller.NewPermanentError(err)
}
case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation:
Expand Down
18 changes: 13 additions & 5 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
errorutils "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/kubernetes"
corev1Listers "k8s.io/client-go/listers/core/v1"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -484,12 +485,19 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1.TaskRun, rtr *resourc
// Please note that this block is required to run before `applyParamsContextsResultsAndWorkspaces` is called the first time,
// and that `applyParamsContextsResultsAndWorkspaces` _must_ be called on every reconcile.
if pod == nil && tr.HasVolumeClaimTemplate() {
if err := c.pvcHandler.CreatePVCsForWorkspaces(ctx, tr.Spec.Workspaces, *kmeta.NewControllerRef(tr), tr.Namespace); err != nil {
logger.Errorf("Failed to create PVC for TaskRun %s: %v", tr.Name, err)
var errs []error
for _, ws := range tr.Spec.Workspaces {
if err := c.pvcHandler.CreatePVCFromVolumeClaimTemplate(ctx, ws, *kmeta.NewControllerRef(tr), tr.Namespace); err != nil {
errs = append(errs, err)
}
}
if errs != nil {
aggregatedErrs := errorutils.NewAggregate(errs)
logger.Errorf("Failed to create PVC for TaskRun %s: %v", tr.Name, aggregatedErrs)
tr.Status.MarkResourceFailed(volumeclaim.ReasonCouldntCreateWorkspacePVC,
fmt.Errorf("Failed to create PVC for TaskRun %s workspaces correctly: %w",
fmt.Sprintf("%s/%s", tr.Namespace, tr.Name), err))
return controller.NewPermanentError(err)
fmt.Errorf("failed to create PVC for TaskRun %s workspaces correctly: %w",
fmt.Sprintf("%s/%s", tr.Namespace, tr.Name), aggregatedErrs))
return controller.NewPermanentError(aggregatedErrs)
}

taskRunWorkspaces := applyVolumeClaimTemplates(tr.Spec.Workspaces, *kmeta.NewControllerRef(tr))
Expand Down
Loading

0 comments on commit 1b38911

Please sign in to comment.