Skip to content

Commit

Permalink
TEP-0135: Refactor Affinity Assistant PVC creation
Browse files Browse the repository at this point in the history
Part of [#6740][#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator
to ensure that all of a PipelineRun's pods are scheduled to the same node.

Before this commit, the PipelineRun reconciler creates PVC for each `VolumeClaimTemplate` backed workspace,
and mount the PVCs to the AA to avoid PV availability zone conflict.
This implementation works for `AffinityAssistantPerWorkspace` but introduces availability zone conflict
issue in the `AffinityAssistantPerPipelineRun` mode since we cannot enforce all the PVC are created in the same availability zone.

Instead of directly creating a PVC for each PipelineRun workspace backed by a VolumeClaimTemplate,
this commit sets one VolumeClaimTemplate per PVC workspace in the affinity assistant StatefulSet spec,
which enforces all VolumeClaimTemplates in StatefulSets are all provisioned on the same node/availability zone.

This commit just refactors the current implementation in favor of the `AffinityAssistantPerPipelineRun` feature.
There is no functionality change. The `AffinityAssistantPerPipelineRun` feature will be added in the follow up PRs.

[#6740]: #6740
[tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md
  • Loading branch information
QuanZhang-William committed May 30, 2023
1 parent 264476b commit 3985e3f
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 83 deletions.
75 changes: 49 additions & 26 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,24 @@ func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []
var errs []error
var unschedulableNodes sets.Set[string] = nil
for _, w := range wb {
var claimTemplates []corev1.PersistentVolumeClaim
var claims []corev1.PersistentVolumeClaimVolumeSource
if w.PersistentVolumeClaim != nil {
claims = append(claims, *w.PersistentVolumeClaim.DeepCopy())
} else if w.VolumeClaimTemplate != nil {
claimTemplate := getVolumeClaimTemplate(w, pr)
claimTemplates = append(claimTemplates, *claimTemplate)
} else {
continue
}

if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil {
affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
claimName := getClaimName(w, *kmeta.NewControllerRef(pr))
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimName, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
Expand Down Expand Up @@ -113,14 +123,10 @@ func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []
return errorutils.NewAggregate(errs)
}

func getClaimName(w v1beta1.WorkspaceBinding, ownerReference metav1.OwnerReference) string {
if w.PersistentVolumeClaim != nil {
return w.PersistentVolumeClaim.ClaimName
} else if w.VolumeClaimTemplate != nil {
return volumeclaim.GetPersistentVolumeClaimName(w.VolumeClaimTemplate, w, ownerReference)
}

return ""
func getVolumeClaimTemplate(wb v1beta1.WorkspaceBinding, pr *v1beta1.PipelineRun) *corev1.PersistentVolumeClaim {
claimTemplate := wb.VolumeClaimTemplate.DeepCopy()
claimTemplate.Name = volumeclaim.GetPersistentVolumeClaimName(wb.VolumeClaimTemplate, wb, *kmeta.NewControllerRef(pr))
return claimTemplate
}

func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.PipelineRun) error {
Expand Down Expand Up @@ -162,7 +168,7 @@ func getStatefulSetLabels(pr *v1beta1.PipelineRun, affinityAssistantName string)
return labels
}

func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimName string, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
// We want a singleton pod
replicas := int32(1)

Expand All @@ -172,6 +178,11 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
tpl = pod.MergeAAPodTemplateWithDefault(pr.Spec.PodTemplate.ToAffinityAssistantTemplate(), defaultAATpl)
}

var mounts []corev1.VolumeMount
for _, claimTemplate := range claimTemplates {
mounts = append(mounts, corev1.VolumeMount{Name: claimTemplate.Name, MountPath: claimTemplate.Name})
}

containers := []corev1.Container{{
Name: "affinity-assistant",
Image: affinityAssistantImage,
Expand All @@ -190,8 +201,32 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
"memory": resource.MustParse("100Mi"),
},
},
VolumeMounts: mounts,
}}

// TODO(QuanZhang-William): when introducing `coscheduling` flag with values `coschedule-pipelineruns` or `isolate-pipelineruns
// (https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md#configuration),
// it may require a StorageClass with VolumeBindingMode: WaitForFirstConsumer when WS specifies PersistentVolumeClaims to avoid Availability Zone conflict
// (discussed in https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md#co-locating-volumes).
// We need to update the TEP and documentation for this limitation if it is the case.
var volumes []corev1.Volume
for i, claim := range claims {
volumes = append(volumes, corev1.Volume{
Name: fmt.Sprintf("workspace-%d", i),
VolumeSource: corev1.VolumeSource{
// A Pod mounting a PersistentVolumeClaim that has a StorageClass with
// volumeBindingMode: Immediate
// the PV is allocated on a Node first, and then the pod need to be
// scheduled to that node.
// To support those PVCs, the Affinity Assistant must also mount the
// same PersistentVolumeClaim - to be sure that the Affinity Assistant
// pod is scheduled to the same Availability Zone as the PV, when using
// a regional cluster. This is called VolumeScheduling.
PersistentVolumeClaim: claim.DeepCopy(),
},
})
}

return &appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
Expand All @@ -207,6 +242,8 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
Selector: &metav1.LabelSelector{
MatchLabels: getStatefulSetLabels(pr, name),
},
//by setting VolumeClaimTemplates from StatefulSet, all the PVs are scheduled to the same Availability Zone as the StatefulSet
VolumeClaimTemplates: claimTemplates,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: getStatefulSetLabels(pr, name),
Expand All @@ -219,21 +256,7 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
ImagePullSecrets: tpl.ImagePullSecrets,

Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr),
Volumes: []corev1.Volume{{
Name: "workspace",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
// A Pod mounting a PersistentVolumeClaim that has a StorageClass with
// volumeBindingMode: Immediate
// the PV is allocated on a Node first, and then the pod need to be
// scheduled to that node.
// To support those PVCs, the Affinity Assistant must also mount the
// same PersistentVolumeClaim - to be sure that the Affinity Assistant
// pod is scheduled to the same Availability Zone as the PV, when using
// a regional cluster. This is called VolumeScheduling.
ClaimName: claimName,
}},
}},
Volumes: volumes,
},
},
},
Expand Down
130 changes: 101 additions & 29 deletions pkg/reconciler/pipelinerun/affinity_assistant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,36 +60,108 @@ var testPipelineRun = &v1beta1.PipelineRun{
}

// TestCreateAndDeleteOfAffinityAssistant tests to create and delete an Affinity Assistant
// for a given PipelineRun with a PVC workspace
// for a given PipelineRun
func TestCreateAndDeleteOfAffinityAssistant(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

c := Reconciler{
KubeClientSet: fakek8s.NewSimpleClientset(),
Images: pipeline.Images{},
tests := []struct {
name string
pr *v1beta1.PipelineRun
}{{
name: "PersistentVolumeClaim Workspace type",
pr: &v1beta1.PipelineRun{
Spec: v1beta1.PipelineRunSpec{
Workspaces: []v1beta1.WorkspaceBinding{{
Name: "PersistentVolumeClaim Workspace",
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: "myclaim",
},
}},
},
},
}, {
name: "VolumeClaimTemplate Workspace type",
pr: &v1beta1.PipelineRun{
Spec: v1beta1.PipelineRunSpec{
Workspaces: []v1beta1.WorkspaceBinding{
{
Name: "VolumeClaimTemplate Workspace",
VolumeClaimTemplate: &corev1.PersistentVolumeClaim{},
}},
},
},
},
{
name: "other Workspace type",
pr: &v1beta1.PipelineRun{
Spec: v1beta1.PipelineRunSpec{
Workspaces: []v1beta1.WorkspaceBinding{
{
Name: "EmptyDir Workspace",
EmptyDir: &corev1.EmptyDirVolumeSource{},
}},
},
},
},
}

err := c.createOrUpdateAffinityAssistants(ctx, testPipelineRun.Spec.Workspaces, testPipelineRun, testPipelineRun.Namespace)
if err != nil {
t.Errorf("unexpected error from createOrUpdateAffinityAssistants: %v", err)
}
for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

expectedAffinityAssistantName := getAffinityAssistantName(workspaceName, testPipelineRun.Name)
_, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName, metav1.GetOptions{})
if err != nil {
t.Errorf("unexpected error when retrieving StatefulSet: %v", err)
}
c := Reconciler{
KubeClientSet: fakek8s.NewSimpleClientset(),
Images: pipeline.Images{},
}

err = c.cleanupAffinityAssistants(ctx, testPipelineRun)
if err != nil {
t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err)
}
err := c.createOrUpdateAffinityAssistants(ctx, tc.pr.Spec.Workspaces, tc.pr, tc.pr.Namespace)
if err != nil {
t.Errorf("unexpected error from createOrUpdateAffinityAssistants: %v", err)
}

expectedAAName := getAffinityAssistantName(tc.pr.Spec.Workspaces[0].Name, tc.pr.Name)
aa, err := c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectedAAName, metav1.GetOptions{})
if err != nil {
if tc.pr.Spec.Workspaces[0].PersistentVolumeClaim == nil && tc.pr.Spec.Workspaces[0].VolumeClaimTemplate == nil {
if !apierrors.IsNotFound(err) {
t.Errorf("unexpected error when retrieving StatefulSet: %v", err)
}
} else {
t.Errorf("unexpected error when retrieving StatefulSet: %v", err)
}
}

// validate PVs from Affinity Assistant
if tc.pr.Spec.Workspaces[0].VolumeClaimTemplate != nil {
if len(aa.Spec.VolumeClaimTemplates) != 1 {
t.Errorf("unexpected VolumeClaimTemplate count, expect 1 but got %v", len(aa.Spec.VolumeClaimTemplates))
}
if d := cmp.Diff(*getVolumeClaimTemplate(tc.pr.Spec.Workspaces[0], tc.pr), aa.Spec.VolumeClaimTemplates[0]); d != "" {
t.Errorf("VolumeClaimTemplates diff: %s", diff.PrintWantGot(d))
}
} else if tc.pr.Spec.Workspaces[0].PersistentVolumeClaim != nil {
if len(aa.Spec.Template.Spec.Volumes) != 1 {
t.Errorf("unexpected PersistentVolumeClaim count, expect 1 but got %v", len(aa.Spec.Template.Spec.Volumes))
}
if d := cmp.Diff(tc.pr.Spec.Workspaces[0].PersistentVolumeClaim, aa.Spec.Template.Spec.Volumes[0].VolumeSource.PersistentVolumeClaim); d != "" {
t.Errorf("PersistentVolumeClaim diff: %s", diff.PrintWantGot(d))
}
}

// clean up Affinity Assistant
if tc.pr.Spec.Workspaces[0].PersistentVolumeClaim != nil || tc.pr.Spec.Workspaces[0].VolumeClaimTemplate != nil {
err = c.cleanupAffinityAssistants(ctx, tc.pr)
if err != nil {
t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err)
}

_, err = c.KubeClientSet.AppsV1().StatefulSets(testPipelineRun.Namespace).Get(ctx, expectedAffinityAssistantName, metav1.GetOptions{})
if !apierrors.IsNotFound(err) {
t.Errorf("expected a NotFound response, got: %v", err)
_, err = c.KubeClientSet.AppsV1().StatefulSets(tc.pr.Namespace).Get(ctx, expectedAAName, metav1.GetOptions{})
if !apierrors.IsNotFound(err) {
t.Errorf("expected a NotFound response, got: %v", err)
}
}
})
}
}

Expand Down Expand Up @@ -239,7 +311,7 @@ func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", nil)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -277,7 +349,7 @@ func TestDefaultPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
}},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", defaultTpl)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -323,7 +395,7 @@ func TestMergedPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
}},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", defaultTpl)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations from spec in the StatefulSet")
Expand Down Expand Up @@ -360,7 +432,7 @@ func TestOnlySelectPodTemplateFieldsArePropagatedToAffinityAssistant(t *testing.
},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, "mypvc", "nginx", nil)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations from spec in the StatefulSet")
Expand All @@ -380,7 +452,7 @@ func TestThatTheAffinityAssistantIsWithoutNodeSelectorAndTolerations(t *testing.
Spec: v1beta1.PipelineRunSpec{},
}

stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, "mypvc", "nginx", nil)
stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)

if len(stsWithoutTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 0 {
t.Errorf("unexpected Tolerations in the StatefulSet")
Expand Down
Loading

0 comments on commit 3985e3f

Please sign in to comment.