Skip to content

Commit

Permalink
[TEP-0135] Coschedule per PipelineRun E2E support
Browse files Browse the repository at this point in the history
Part of [#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator to ensure
that all of a PipelineRun's pods are scheduled to the same node.

This commit consumes the functions added in [#6819] and implements end to end support of `Coschedule:PipelineRuns` coschedule mode,
where all the `PipelineRun pods` are scheduled to the same node.

/kind feature

[#6819]: #6819
[#6740]: #6740
[tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md
  • Loading branch information
QuanZhang-William committed Jul 13, 2023
1 parent 3b9b351 commit 200b7b0
Show file tree
Hide file tree
Showing 6 changed files with 485 additions and 189 deletions.
97 changes: 60 additions & 37 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package pipelinerun
import (
"context"
"crypto/sha256"
"errors"
"fmt"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/pod"
v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
"github.com/tektoncd/pipeline/pkg/internal/affinityassistant"
aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/workspace"
Expand All @@ -40,21 +42,25 @@ import (
)

const (
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// as a volume source and expect an Assistant StatefulSet in AffinityAssistantPerWorkspace behavior, but couldn't create a StatefulSet.
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace"
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet"

featureFlagDisableAffinityAssistantKey = "disable-affinity-assistant"
)

var (
ErrPvcCreationFailed = errors.New("PVC creation error")
ErrAffinityAssistantCreationFailed = errors.New("Affinity Assistant creation error")
)

// createOrUpdateAffinityAssistantsAndPVCs creates Affinity Assistant StatefulSets and PVCs based on AffinityAssistantBehavior.
// This is done to achieve Node Affinity for taskruns in a pipelinerun, and make it possible for the taskruns to execute parallel while sharing volume.
// If the AffinityAssitantBehavior is AffinityAssistantPerWorkspace, it creates an Affinity Assistant for
// every taskrun in the pipelinerun that use the same PVC based volume.
// If the AffinityAssitantBehavior is AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation,
// it creates one Affinity Assistant for the pipelinerun.
func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context, pr *v1.PipelineRun, aaBehavior aa.AffinityAssitantBehavior) error {
var errs []error
var unschedulableNodes sets.Set[string] = nil

var claimTemplates []corev1.PersistentVolumeClaim
Expand Down Expand Up @@ -82,39 +88,40 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context
// affinity assistant so that the OwnerReference of the PVCs are the pipelineruns, which is used to achieve PVC auto deletion at PipelineRun deletion time
if (aaBehavior == aa.AffinityAssistantPerWorkspace || aaBehavior == aa.AffinityAssistantDisabled) && pr.HasVolumeClaimTemplate() {
if err := c.pvcHandler.CreatePVCsForWorkspaces(ctx, pr.Spec.Workspaces, *kmeta.NewControllerRef(pr), pr.Namespace); err != nil {
return fmt.Errorf("failed to create PVC for PipelineRun %s: %w", pr.Name, err)
return fmt.Errorf("%w: %v", ErrPvcCreationFailed, err) //nolint:errorlint
}
}

switch aaBehavior {
case aa.AffinityAssistantPerWorkspace:
for claim, workspaceName := range claimToWorkspace {
aaName := GetAffinityAssistantName(workspaceName, pr.Name)
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes)
errs = append(errs, err...)
if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes); err != nil {
return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err)
}
}
for claimTemplate, workspaceName := range claimTemplatesToWorkspace {
aaName := GetAffinityAssistantName(workspaceName, pr.Name)
// To support PVC auto deletion at pipelinerun deletion time, the OwnerReference of the PVCs should be set to the owning pipelinerun.
// In AffinityAssistantPerWorkspace mode, the reconciler has created PVCs (owned by pipelinerun) from pipelinerun's VolumeClaimTemplate at this point,
// so the VolumeClaimTemplates are pass in as PVCs when creating affinity assistant StatefulSet for volume scheduling.
// If passed in as VolumeClaimTemplates, the PVCs are owned by Affinity Assistant StatefulSet instead of the pipelinerun.
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes)
errs = append(errs, err...)
if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes); err != nil {
return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err)
}
}
case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation:
if claims != nil || claimTemplates != nil {
aaName := GetAffinityAssistantName("", pr.Name)
// In AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes, the PVCs are created via StatefulSet for volume scheduling.
// PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time,
// so we don't need to worry the OwnerReference of the PVCs
err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes)
errs = append(errs, err...)
aaName := GetAffinityAssistantName("", pr.Name)
// In AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes, the PVCs are created via StatefulSet for volume scheduling.
// PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time,
// so we don't need to worry the OwnerReference of the PVCs
if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes); err != nil {
return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err)
}
case aa.AffinityAssistantDisabled:
}

return errorutils.NewAggregate(errs)
return nil
}

// createOrUpdateAffinityAssistant creates an Affinity Assistant Statefulset with the provided affinityAssistantName and pipelinerun information.
Expand Down Expand Up @@ -178,35 +185,62 @@ func (c *Reconciler) createOrUpdateAffinityAssistant(ctx context.Context, affini
return errs
}

// TODO(#6740)(WIP) implement cleanupAffinityAssistants for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation affinity assistant modes
// cleanupAffinityAssistants deletes Affinity Assistant StatefulSets
func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1.PipelineRun) error {
// omit cleanup if the feature is disabled
if c.isAffinityAssistantDisabled(ctx) {
return nil
aaBehavior, err := aa.GetAffinityAssistantBehavior(ctx)
if err != nil {
return err
}

var errs []error
for _, w := range pr.Spec.Workspaces {
if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil {
affinityAssistantStsName := GetAffinityAssistantName(w.Name, pr.Name)
if err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Delete(ctx, affinityAssistantStsName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("failed to delete StatefulSet %s: %w", affinityAssistantStsName, err))
switch aaBehavior {
case aa.AffinityAssistantPerWorkspace:
for _, w := range pr.Spec.Workspaces {
if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil {
affinityAssistantName := GetAffinityAssistantName(w.Name, pr.Name)
if err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Delete(ctx, affinityAssistantName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("failed to delete StatefulSet %s: %w", affinityAssistantName, err))
}
}
}
case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation:
affinityAssistantName := GetAffinityAssistantName("", pr.Name)
if err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Delete(ctx, affinityAssistantName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("failed to delete StatefulSet %s: %w", affinityAssistantName, err))
}
case aa.AffinityAssistantDisabled:
return nil
}

return errorutils.NewAggregate(errs)
}

// getPersistentVolumeClaimNameWithAffinityAssistant returns the PersistentVolumeClaim name that is
// created by the Affinity Assistant StatefulSet VolumeClaimTemplate when Affinity Assistant is enabled.
// The PVCs created by StatefulSet VolumeClaimTemplates follow the format `<pvcName>-<affinityAssistantName>-0`
// TODO(#6740)(WIP): use this function when adding end-to-end support for AffinityAssistantPerPipelineRun mode
func getPersistentVolumeClaimNameWithAffinityAssistant(pipelineWorkspaceName, prName string, wb v1.WorkspaceBinding, owner metav1.OwnerReference) string {
pvcName := volumeclaim.GetPVCNameWithoutAffinityAssistant(wb.VolumeClaimTemplate.Name, wb, owner)
affinityAssistantName := GetAffinityAssistantName(pipelineWorkspaceName, prName)
return fmt.Sprintf("%s-%s-0", pvcName, affinityAssistantName)
}

// getAffinityAssistantAnnotationVal generates and returns the value for `pipeline.tekton.dev/affinity-assistant` annotation
// based on aaBehavior, pipelinePVCWorkspaceName and prName
func getAffinityAssistantAnnotationVal(aaBehavior affinityassistant.AffinityAssitantBehavior, pipelinePVCWorkspaceName string, prName string) string {
switch aaBehavior {
case affinityassistant.AffinityAssistantPerWorkspace:
if pipelinePVCWorkspaceName != "" {
return GetAffinityAssistantName(pipelinePVCWorkspaceName, prName)
}
case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation:
return GetAffinityAssistantName("", prName)

case affinityassistant.AffinityAssistantDisabled:
}

return ""
}

// GetAffinityAssistantName returns the Affinity Assistant name based on pipelineWorkspaceName and pipelineRunName
func GetAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string {
hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName))
Expand Down Expand Up @@ -322,17 +356,6 @@ func affinityAssistantStatefulSet(name string, pr *v1.PipelineRun, claimTemplate
}
}

// isAffinityAssistantDisabled returns a bool indicating whether an Affinity Assistant should
// be created for each PipelineRun that use workspaces with PersistentVolumeClaims
// as volume source. The default behaviour is to enable the Affinity Assistant to
// provide Node Affinity for TaskRuns that share a PVC workspace.
//
// TODO(#6740)(WIP): replace this function with GetAffinityAssistantBehavior
func (c *Reconciler) isAffinityAssistantDisabled(ctx context.Context) bool {
cfg := config.FromContextOrDefaults(ctx)
return cfg.FeatureFlags.DisableAffinityAssistant
}

// getAssistantAffinityMergedWithPodTemplateAffinity return the affinity that merged with PipelineRun PodTemplate affinity.
func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun) *corev1.Affinity {
// use podAntiAffinity to repel other affinity assistants
Expand Down
Loading

0 comments on commit 200b7b0

Please sign in to comment.