Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TEP-0135] coschedule isolate pipelinerun #6929

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 36 additions & 23 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context
return fmt.Errorf("failed to create PVC for PipelineRun %s: %w", pr.Name, err)
}
}

switch aaBehavior {
case aa.AffinityAssistantPerWorkspace:
for claim, workspaceName := range claimToWorkspace {
Expand Down Expand Up @@ -129,8 +128,12 @@ func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affini
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
aaBehavior, err := aa.GetAffinityAssistantBehavior(ctx)
if err != nil {
return []error{err}
}
affinityAssistantStatefulSet := affinityAssistantStatefulSet(aaBehavior, affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err = c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
}
Expand Down Expand Up @@ -229,10 +232,11 @@ func getStatefulSetLabels(pr *v1.PipelineRun, affinityAssistantName string) map[
return labels
}

// affinityAssistantStatefulSet returns an Affinity Assistant as a StatefulSet with the given AffinityAssistantTemplate applied to the StatefulSet PodTemplateSpec.
// affinityAssistantStatefulSet returns an Affinity Assistant as a StatefulSet based on the AffinityAssistantBehavior
// with the given AffinityAssistantTemplate applied to the StatefulSet PodTemplateSpec.
// The VolumeClaimTemplates and Volume of StatefulSet reference the PipelineRun WorkspaceBinding VolumeClaimTempalte and the PVCs respectively.
// The PVs created by the StatefulSet are scheduled to the same availability zone which avoids PV scheduling conflict.
func affinityAssistantStatefulSet(name string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
func affinityAssistantStatefulSet(aaBehavior aa.AffinityAssistantBehavior, name string, pr *v1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
// We want a singleton pod
replicas := int32(1)

Expand Down Expand Up @@ -314,7 +318,7 @@ func affinityAssistantStatefulSet(name string, pr *v1.PipelineRun, claimTemplate
NodeSelector: tpl.NodeSelector,
ImagePullSecrets: tpl.ImagePullSecrets,

Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr),
QuanZhang-William marked this conversation as resolved.
Show resolved Hide resolved
Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr, aaBehavior),
Volumes: volumes,
},
},
Expand All @@ -334,30 +338,39 @@ func (c *Reconciler) isAffinityAssistantDisabled(ctx context.Context) bool {
}

// getAssistantAffinityMergedWithPodTemplateAffinity return the affinity that merged with PipelineRun PodTemplate affinity.
func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun) *corev1.Affinity {
// use podAntiAffinity to repel other affinity assistants
repelOtherAffinityAssistantsPodAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
TopologyKey: "kubernetes.io/hostname",
},
}

func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun, aaBehavior aa.AffinityAssistantBehavior) *corev1.Affinity {
affinityAssistantsAffinity := &corev1.Affinity{}
if pr.Spec.TaskRunTemplate.PodTemplate != nil && pr.Spec.TaskRunTemplate.PodTemplate.Affinity != nil {
affinityAssistantsAffinity = pr.Spec.TaskRunTemplate.PodTemplate.Affinity
}
if affinityAssistantsAffinity.PodAntiAffinity == nil {
affinityAssistantsAffinity.PodAntiAffinity = &corev1.PodAntiAffinity{}
}
affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
append(affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
repelOtherAffinityAssistantsPodAffinityTerm)

repelOtherAffinityAssistantsPodAffinityTerm := corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
TopologyKey: "kubernetes.io/hostname",
}

if aaBehavior == aa.AffinityAssistantPerPipelineRunWithIsolation {
// use RequiredDuringSchedulingIgnoredDuringExecution term to enforce only one pipelinerun can run in a node at a time
affinityAssistantsAffinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution =
append(affinityAssistantsAffinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
repelOtherAffinityAssistantsPodAffinityTerm)
} else {
preferredRepelOtherAffinityAssistantsPodAffinityTerm := corev1.WeightedPodAffinityTerm{
lbernick marked this conversation as resolved.
Show resolved Hide resolved
Weight: 100,
PodAffinityTerm: repelOtherAffinityAssistantsPodAffinityTerm,
}
// use RequiredDuringSchedulingIgnoredDuringExecution term to schedule pipelineruns to different nodes when possible
affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
append(affinityAssistantsAffinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
preferredRepelOtherAffinityAssistantsPodAffinityTerm)
}

return affinityAssistantsAffinity
}
74 changes: 51 additions & 23 deletions pkg/reconciler/pipelinerun/affinity_assistant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func TestPipelineRunPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -499,7 +499,7 @@ func TestDefaultPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
}},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -546,7 +546,7 @@ func TestMergedPodTemplatesArePropagatedToAffinityAssistant(t *testing.T) {
}},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", defaultTpl)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations from spec in the StatefulSet")
Expand Down Expand Up @@ -584,7 +584,7 @@ func TestOnlySelectPodTemplateFieldsArePropagatedToAffinityAssistant(t *testing.
},
}

stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
stsWithTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)

if len(stsWithTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 1 {
t.Errorf("expected Tolerations from spec in the StatefulSet")
Expand All @@ -604,7 +604,7 @@ func TestThatTheAffinityAssistantIsWithoutNodeSelectorAndTolerations(t *testing.
Spec: v1.PipelineRunSpec{},
}

stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet("test-assistant", prWithoutCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)
stsWithoutTolerationsAndNodeSelector := affinityAssistantStatefulSet(aa.AffinityAssistantPerWorkspace, "test-assistant", prWithoutCustomPodTemplate, []corev1.PersistentVolumeClaim{}, []corev1.PersistentVolumeClaimVolumeSource{}, "nginx", nil)

if len(stsWithoutTolerationsAndNodeSelector.Spec.Template.Spec.Tolerations) != 0 {
t.Errorf("unexpected Tolerations in the StatefulSet")
Expand Down Expand Up @@ -802,30 +802,42 @@ func TestDisableAffinityAssistant(t *testing.T) {
}

func TestGetAssistantAffinityMergedWithPodTemplateAffinity(t *testing.T) {
assistantPodAffinityTerm := corev1.WeightedPodAffinityTerm{
labelSelector := &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
}

assistantWeightedPodAffinityTerm := corev1.WeightedPodAffinityTerm{
Weight: 100,
PodAffinityTerm: corev1.PodAffinityTerm{
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
workspace.LabelComponent: workspace.ComponentNameAffinityAssistant,
},
},
TopologyKey: "kubernetes.io/hostname",
LabelSelector: labelSelector,
TopologyKey: "kubernetes.io/hostname",
},
}

prWithEmptyAffinityPodTemplate := parse.MustParseV1PipelineRun(t, `
metadata:
name: pr-with-no-podTemplate
`)
affinityWithAssistantAffinity := &corev1.Affinity{
affinityWithAssistantAffinityPreferred := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
assistantPodAffinityTerm,
assistantWeightedPodAffinityTerm,
},
},
}

affinityWithAssistantAffinityRequired := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{{
LabelSelector: labelSelector,
TopologyKey: "kubernetes.io/hostname",
}},
},
}

prWithEmptyAffinityPodTemplate := parse.MustParseV1PipelineRun(t, `
metadata:
name: pr-with-no-podTemplate
`)

prWithPodTemplatePodAffinity := parse.MustParseV1PipelineRun(t, `
metadata:
name: pr-with-podTemplate-podAffinity
Expand Down Expand Up @@ -861,7 +873,7 @@ spec:
TopologyKey: "kubernetes.io/hostname",
},
},
assistantPodAffinityTerm,
assistantWeightedPodAffinityTerm,
},
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
Expand Down Expand Up @@ -895,7 +907,7 @@ spec:
affinityWithPodTemplateNodeAffinity := &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
assistantPodAffinityTerm,
assistantWeightedPodAffinityTerm,
},
},
NodeAffinity: &corev1.NodeAffinity{
Expand All @@ -920,26 +932,42 @@ spec:
for _, tc := range []struct {
description string
pr *v1.PipelineRun
aaBehavior aa.AffinityAssistantBehavior
expect *corev1.Affinity
}{
{
description: "podTemplate affinity is empty",
description: "podTemplate affinity is empty - per workspace",
pr: prWithEmptyAffinityPodTemplate,
aaBehavior: aa.AffinityAssistantPerWorkspace,
expect: affinityWithAssistantAffinityPreferred,
},
{
description: "podTemplate affinity is empty - per pipelineruns",
pr: prWithEmptyAffinityPodTemplate,
aaBehavior: aa.AffinityAssistantPerPipelineRun,
expect: affinityWithAssistantAffinityPreferred,
},
{
description: "podTemplate affinity is empty - per isolate pipelinerun",
pr: prWithEmptyAffinityPodTemplate,
expect: affinityWithAssistantAffinity,
aaBehavior: aa.AffinityAssistantPerPipelineRunWithIsolation,
expect: affinityWithAssistantAffinityRequired,
},
{
description: "podTemplate with affinity which contains podAntiAffinity",
pr: prWithPodTemplatePodAffinity,
aaBehavior: aa.AffinityAssistantPerWorkspace,
expect: affinityWithPodTemplatePodAffinity,
},
{
description: "podTemplate with affinity which contains nodeAntiAffinity",
pr: prWithPodTemplateNodeAffinity,
aaBehavior: aa.AffinityAssistantPerWorkspace,
expect: affinityWithPodTemplateNodeAffinity,
},
} {
t.Run(tc.description, func(t *testing.T) {
resultAffinity := getAssistantAffinityMergedWithPodTemplateAffinity(tc.pr)
resultAffinity := getAssistantAffinityMergedWithPodTemplateAffinity(tc.pr, tc.aaBehavior)
if d := cmp.Diff(tc.expect, resultAffinity); d != "" {
t.Errorf("affinity diff: %s", diff.PrintWantGot(d))
}
Expand Down