Skip to content

Commit

Permalink
TEP-0135: implement per-pipelinerun coscheduling
Browse files Browse the repository at this point in the history
Part of [tektoncd#6740][tektoncd#6740]. [TEP-0135][tep-0135] introduces a feature that allows a cluster operator
to ensure that all of a PipelineRun's pods are scheduled to the same node.

This commit implements the `coschedule-pipelineruns` scheduling mode, where all the `pods` of a `PipelineRun` are scheduled to the same node.
This commit renames the current `createOrUpdateAffinityAssistants` function to `createOrUpdateAffinityAssistantsPerWorkspace`, and adds a new
function `createOrUpdateAffinityAssistantsPerPipelineRun` for the `coschedule-pipelineruns` scheduling mode (with some refactoring).

There is no functionality change of the existing `createOrUpdateAffinityAssistants` function.
The `createOrUpdateAffinityAssistantsPerPipelineRun` function is implemented, but not used.
The usage of the `createOrUpdateAffinityAssistantsPerPipelineRun` function will be added in the followup PRs.

/kind feature

[tektoncd#6740]: tektoncd#6740
[tep-0135]: https://github.com/tektoncd/community/blob/main/teps/0135-coscheduling-pipelinerun-pods.md
  • Loading branch information
QuanZhang-William committed Jun 12, 2023
1 parent ee3c22c commit 8905234
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 129 deletions.
173 changes: 116 additions & 57 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,17 @@ import (
)

const (
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace indicates that a PipelineRun uses workspaces with PersistentVolumeClaim
// as a volume source and expect an Assistant StatefulSet, but couldn't create a StatefulSet.
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet = "CouldntCreateOrUpdateAffinityAssistantstatefulSet"
ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace"

featureFlagDisableAffinityAssistantKey = "disable-affinity-assistant"
)

// createOrUpdateAffinityAssistants creates an Affinity Assistant StatefulSet for every workspace in the PipelineRun that
// createOrUpdateAffinityAssistantsPerWorkspace creates an Affinity Assistant StatefulSet for every workspace in the PipelineRun that
// use a PersistentVolumeClaim volume. This is done to achieve Node Affinity for all TaskRuns that
// share the workspace volume and make it possible for the tasks to execute parallel while sharing volume.
func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []v1beta1.WorkspaceBinding, pr *v1beta1.PipelineRun, namespace string) error {
logger := logging.FromContext(ctx)
cfg := config.FromContextOrDefaults(ctx)

func (c *Reconciler) createOrUpdateAffinityAssistantsPerWorkspace(ctx context.Context, wb []v1beta1.WorkspaceBinding, pr *v1beta1.PipelineRun, namespace string) error {
var errs []error
var unschedulableNodes sets.Set[string] = nil
for _, w := range wb {
Expand All @@ -62,67 +59,129 @@ func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []

var claimTemplates []corev1.PersistentVolumeClaim
var claims []corev1.PersistentVolumeClaimVolumeSource
if w.PersistentVolumeClaim != nil {
claims = append(claims, *w.PersistentVolumeClaim.DeepCopy())
} else if w.VolumeClaimTemplate != nil {
claimTemplate := w.VolumeClaimTemplate.DeepCopy()
// PVCs Will be created by Affinity Assistant StatefulSet and will follow the naming format `<claimTemplate.Name>-<AffinityAssistantName>-0`
claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr))

claimTemplate, claim := getAffinityAssistantPVCandVolumeClaimTemplate(w, pr)
if claimTemplate != nil {
claimTemplates = append(claimTemplates, *claimTemplate)
}
if claim != nil {
claims = append(claims, *claim)
}

aaName := getAffinityAssistantName(w.Name, pr.Name)
err := c.createOrUpdateAffinityAssistants(ctx, namespace, aaName, pr, claimTemplates, claims, unschedulableNodes)
errs = append(errs, err...)
}
return errorutils.NewAggregate(errs)
}

// createOrUpdateAffinityAssistantsPerPipelineRun creates an Affinity Assistant StatefulSet for each PipelineRun that
// use a PersistentVolumeClaim volume. This is done to achieve Node Affinity for all TaskRuns that
// in a PipelineRun. It also makes it possible for the tasks to execute parallel while sharing volume.
// TODO: consume this function in the PipelineRun reconciler after
// https://github.com/tektoncd/pipeline/pull/6790 is merged
func (c *Reconciler) createOrUpdateAffinityAssistantsPerPipelineRun(ctx context.Context, wb []v1beta1.WorkspaceBinding, pr *v1beta1.PipelineRun, namespace string) error {
var errs []error
var unschedulableNodes sets.Set[string] = nil

var claimTemplates []corev1.PersistentVolumeClaim
var claims []corev1.PersistentVolumeClaimVolumeSource
for _, w := range wb {
if w.PersistentVolumeClaim == nil && w.VolumeClaimTemplate == nil {
continue
}

claimTemplate, claim := getAffinityAssistantPVCandVolumeClaimTemplate(w, pr)
if claimTemplate != nil {
claimTemplates = append(claimTemplates, *claimTemplate)
}
if claim != nil {
claims = append(claims, *claim)
}
}

affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
// only create affinity assistant when the pr has PVC or VolumeClaimTemplate based Workspace
if claims != nil || claimTemplates != nil {
// the Affinity Assistant name is only determined by PipelineRun in this mode
aaName := getAffinityAssistantName("", pr.Name)
errs = append(errs, c.createOrUpdateAffinityAssistants(ctx, namespace, aaName, pr, claimTemplates, claims, unschedulableNodes)...)
}

return errorutils.NewAggregate(errs)
}

// getAffinityAssistantPVCandVolumeClaimTemplate returns the PVC and VolumeClaimTemplate for Affinity Assistant StatefulSet from PipelineRun WorkspaceBindings
func getAffinityAssistantPVCandVolumeClaimTemplate(w v1beta1.WorkspaceBinding, pr *v1beta1.PipelineRun) (*corev1.PersistentVolumeClaim, *corev1.PersistentVolumeClaimVolumeSource) {
var claimTemplate *corev1.PersistentVolumeClaim
var claim *corev1.PersistentVolumeClaimVolumeSource

if w.PersistentVolumeClaim != nil {
claim = w.PersistentVolumeClaim.DeepCopy()
} else if w.VolumeClaimTemplate != nil {
claimTemplate = w.VolumeClaimTemplate.DeepCopy()
// PVCs Will be created by Affinity Assistant StatefulSet and will follow the naming format `<claimTemplate.Name>-<AffinityAssistantName>-0`
claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr))
}

return claimTemplate, claim
}

func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, namespace, affinityAssistantName string, pr *v1beta1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, unschedulableNodes sets.Set[string]) []error {
logger := logging.FromContext(ctx)
cfg := config.FromContextOrDefaults(ctx)

var errs []error
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
}
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation
// and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
if unschedulableNodes == nil {
ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "spec.unschedulable=true",
})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err))
}
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace)
unschedulableNodes = sets.Set[string]{}
// maintain the list of nodes which are unschedulable
for _, n := range ns.Items {
unschedulableNodes.Insert(n.Name)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation
// and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
if unschedulableNodes == nil {
ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "spec.unschedulable=true",
})
if err != nil {
errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err))
}
unschedulableNodes = sets.Set[string]{}
// maintain the list of nodes which are unschedulable
for _, n := range ns.Items {
unschedulableNodes.Insert(n.Name)
}
}
if unschedulableNodes.Len() > 0 {
// get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1
p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{})
// ignore instead of failing if the affinity assistant pod was not found
if err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err))
}
if unschedulableNodes.Len() > 0 {
// get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1
p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{})
// ignore instead of failing if the affinity assistant pod was not found
if err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err))
}
// check the node which hosts the affinity assistant pod if it is unschedulable or cordoned
if p != nil && unschedulableNodes.Has(p.Spec.NodeName) {
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err))
}
// check the node which hosts the affinity assistant pod if it is unschedulable or cordoned
if p != nil && unschedulableNodes.Has(p.Spec.NodeName) {
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err))
}
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
return errorutils.NewAggregate(errs)

return errs
}

func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.PipelineRun) error {
Expand Down
Loading

0 comments on commit 8905234

Please sign in to comment.