Skip to content

Commit

Permalink
[TEP-0135] coschedule isolate pipelinerun
Browse files Browse the repository at this point in the history
Part of [#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator to ensure
that all of a PipelineRun's pods are scheduled to the same node.

This commit implements `coschedule: isolate-pipelinerun` coschedule mode by modifying [PodAntiAffinity](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#inter-pod-affinity-and-anti-affinity)
terms in the `Affinity Assistant StatefulSets`, which enforces only 1 pipelinerun is running in a node at the same time.

/kind feature

[#6740]: #6740
[tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md
  • Loading branch information
QuanZhang-William committed Jul 17, 2023
1 parent 3b9b351 commit 3afac06
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 49 deletions.
56 changes: 34 additions & 22 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context
return fmt.Errorf("failed to create PVC for PipelineRun %s: %w", pr.Name, err)
}
}

switch aaBehavior {
case aa.AffinityAssistantPerWorkspace:
for claim, workspaceName := range claimToWorkspace {
Expand Down Expand Up @@ -129,8 +128,12 @@ func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affini
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
aaBehavior, err := aa.GetAffinityAssistantBehavior(ctx)
if err != nil {
return []error{err}
}
affinityAssistantStatefulSet := affinityAssistantStatefulSet(aaBehavior, affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err = c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
}
Expand Down Expand Up @@ -232,7 +235,7 @@ func getStatefulSetLabels(pr *v1.PipelineRun, affinityAssistantName string) map[
// affinityAssistantStatefulSet returns an Affinity Assistant as a StatefulSet with the given AffinityAssistantTemplate applied to the StatefulSet PodTemplateSpec.
// The VolumeClaimTemplates and Volume of StatefulSet reference the PipelineRun WorkspaceBinding VolumeClaimTempalte and the PVCs respectively.
// The PVs created by the StatefulSet are scheduled to the same availability zone which avoids PV scheduling conflict.
func affinityAssistantStatefulSet(name string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
func affinityAssistantStatefulSet(aaBehavior aa.AffinityAssitantBehavior, name string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
// We want a singleton pod
replicas := int32(1)

Expand Down Expand Up @@ -314,7 +317,7 @@ func affinityAssistantStatefulSet(name string, pr *v1.PipelineRun, claimTemplate
NodeSelector: tpl.NodeSelector,
ImagePullSecrets: tpl.ImagePullSecrets,

Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr),
Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr, aaBehavior),
Volumes: volumes,
},
},
Expand All @@ -334,30 +337,39 @@ func (c *Reconciler) isAffinityAssistantDisabled(ctx context.Context) bool {
}

// getAssistantAffinityMergedWithPodTemplateAffinity return the affinity that merged with PipelineRun PodTemplate affinity.
func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun) *corev1.Affinity {
// use podAntiAffinity to repel other affinity assistants
repelOtherAffinityAssistantsPodAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
TopologyKey: "kubernetes.io/hostname",
},
}

func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) *corev1.Affinity {
affinityAssistantsAffinity := &corev1.Affinity{}
if pr.Spec.TaskRunTemplate.PodTemplate != nil && pr.Spec.TaskRunTemplate.PodTemplate.Affinity != nil {
affinityAssistantsAffinity = pr.Spec.TaskRunTemplate.PodTemplate.Affinity
}
if affinityAssistantsAffinity.PodAntiAffinity == nil {
affinityAssistantsAffinity.PodAntiAffinity = &corev1.PodAntiAffinity{}
}
affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
append(affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
repelOtherAffinityAssistantsPodAffinityTerm)

repelOtherAffinityAssistantsPodAffinityTerm := corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
TopologyKey: "kubernetes.io/hostname",
}

if aaBehavior == aa.AffinityAssistantPerPipelineRunWithIsolation {
// use RequiredDuringSchedulingIgnoredDuringExecution term to enforce only one pipelinerun can run in a node at a time
affinityAssistantsAffinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution =
append(affinityAssistantsAffinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
repelOtherAffinityAssistantsPodAffinityTerm)
} else {
preferredRepelOtherAffinityAssistantsPodAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: repelOtherAffinityAssistantsPodAffinityTerm,
}
// use RequiredDuringSchedulingIgnoredDuringExecution term to schedule pipelineruns to different nodes when possible
affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
append(affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
preferredRepelOtherAffinityAssistantsPodAffinityTerm)
}

return affinityAssistantsAffinity
}
78 changes: 51 additions & 27 deletions pkg/reconciler/pipelinerun/affinity_assistant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,7 @@ func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
},
},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -498,8 +497,7 @@ func TestDefaultPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
Name: "reg-creds",
}},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -545,8 +543,7 @@ func TestMergedPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
Name: "reg-creds",
}},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations from spec in the StatefulSet")
Expand Down Expand Up @@ -583,8 +580,7 @@ func TestOnlySelectPodTemplateFieldsArePropagatedToAffinityAssistant(t *testing.
}},
},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations from spec in the StatefulSet")
Expand All @@ -604,7 +600,7 @@ func TestThatTheAffinityAssistantIsWithoutNodeSelectorAndTolerations(t *testing.
Spec: v1.PipelineRunSpec{},
}

stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithoutCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)

if len(stsWithoutTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 0 {
t.Errorf("unexpected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -802,30 +798,42 @@ func TestDisableAffinityAssistant(t *testing.T) {
}

func TestGetAssistantAffinityMergedWithPodTemplateAffinity(t *testing.T) {
assistantPodAffinityTerm := corev1.WeightedPodAffinityTerm{
labelSelector := &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
}

assistantWeightedPodAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
TopologyKey: "kubernetes.io/hostname",
LabelSelector: labelSelector,
TopologyKey: "kubernetes.io/hostname",
},
}

prWithEmptyAffinityPodTemplate := parse.MustParseV1PipelineRun(t, `
metadata:
name: pr-with-no-podTemplate
`)
affinityWithAssistantAffinity := &corev1.Affinity{
affinityWithAssistantAffinityPreferred := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
assistantPodAffinityTerm,
assistantWeightedPodAffinityTerm,
},
},
}

affinityWithAssistantAffinityRequired := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{{
LabelSelector: labelSelector,
TopologyKey: "kubernetes.io/hostname",
}},
},
}

prWithEmptyAffinityPodTemplate := parse.MustParseV1PipelineRun(t, `
metadata:
name: pr-with-no-podTemplate
`)

prWithPodTemplatePodAffinity := parse.MustParseV1PipelineRun(t, `
metadata:
name: pr-with-podTemplate-podAffinity
Expand Down Expand Up @@ -861,7 +869,7 @@ spec:
TopologyKey: "kubernetes.io/hostname",
},
},
assistantPodAffinityTerm,
assistantWeightedPodAffinityTerm,
},
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
Expand Down Expand Up @@ -895,7 +903,7 @@ spec:
affinityWithPodTemplateNodeAffinity := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
assistantPodAffinityTerm,
assistantWeightedPodAffinityTerm,
},
},
NodeAffinity: &corev1.NodeAffinity{
Expand All @@ -920,26 +928,42 @@ spec:
for _, tc := range []struct {
description string
pr *v1.PipelineRun
aaBehavior aa.AffinityAssitantBehavior
expect *corev1.Affinity
}{
{
description: "podTemplate affinity is empty",
description: "podTemplate affinity is empty - per workspace",
pr: prWithEmptyAffinityPodTemplate,
aaBehavior: aa.AffinityAssistantPerWorkspace,
expect: affinityWithAssistantAffinityPreferred,
},
{
description: "podTemplate affinity is empty - per pipelineruns",
pr: prWithEmptyAffinityPodTemplate,
aaBehavior: aa.AffinityAssistantPerPipelineRun,
expect: affinityWithAssistantAffinityPreferred,
},
{
description: "podTemplate affinity is empty - per isolate pipelinerun",
pr: prWithEmptyAffinityPodTemplate,
expect: affinityWithAssistantAffinity,
aaBehavior: aa.AffinityAssistantPerPipelineRunWithIsolation,
expect: affinityWithAssistantAffinityRequired,
},
{
description: "podTemplate with affinity which contains podAntiAffinity",
pr: prWithPodTemplatePodAffinity,
aaBehavior: aa.AffinityAssistantPerWorkspace,
expect: affinityWithPodTemplatePodAffinity,
},
{
description: "podTemplate with affinity which contains nodeAntiAffinity",
pr: prWithPodTemplateNodeAffinity,
aaBehavior: aa.AffinityAssistantPerWorkspace,
expect: affinityWithPodTemplateNodeAffinity,
},
} {
t.Run(tc.description, func(t *testing.T) {
resultAffinity := getAssistantAffinityMergedWithPodTemplateAffinity(tc.pr)
resultAffinity := getAssistantAffinityMergedWithPodTemplateAffinity(tc.pr, tc.aaBehavior)
if d := cmp.Diff(tc.expect, resultAffinity); d != "" {
t.Errorf("affinity diff: %s", diff.PrintWantGot(d))
}
Expand Down

0 comments on commit 3afac06

Please sign in to comment.