Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEP-0135: Refactor Affinity Assistant PVC creation #6741

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 105 additions & 71 deletions pkg/reconciler/pipelinerun/affinity_assistant.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,73 +56,75 @@ func (c *Reconciler) createOrUpdateAffinityAssistants(ctx context.Context, wb []
var errs []error
var unschedulableNodes sets.Set[string] = nil
for _, w := range wb {
if w.PersistentVolumeClaim != nil || w.VolumeClaimTemplate != nil {
affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
claimName := getClaimName(w, *kmeta.NewControllerRef(pr))
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimName, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if w.PersistentVolumeClaim == nil && w.VolumeClaimTemplate == nil {
continue
}

var claimTemplates []corev1.PersistentVolumeClaim
var claims []corev1.PersistentVolumeClaimVolumeSource
if w.PersistentVolumeClaim != nil {
claims = append(claims, *w.PersistentVolumeClaim.DeepCopy())
} else if w.VolumeClaimTemplate != nil {
claimTemplate := w.VolumeClaimTemplate.DeepCopy()
// PVCs Will be created by Affinity Assistant StatefulSet and will follow the naming format `<claimTemplate.Name>-<AffinityAssistantName>-0`
claimTemplate.Name = volumeclaim.GetPVCNameWithoutAffinityAssistant(w.VolumeClaimTemplate.Name, w, *kmeta.NewControllerRef(pr))
claimTemplates = append(claimTemplates, *claimTemplate)
}
QuanZhang-William marked this conversation as resolved.
Show resolved Hide resolved

affinityAssistantName := getAffinityAssistantName(w.Name, pr.Name)
a, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Get(ctx, affinityAssistantName, metav1.GetOptions{})
switch {
// check whether the affinity assistant (StatefulSet) exists or not, create one if it does not exist
case apierrors.IsNotFound(err):
affinityAssistantStatefulSet := affinityAssistantStatefulSet(affinityAssistantName, pr, claimTemplates, claims, c.Images.NopImage, cfg.Defaults.DefaultAAPodTemplate)
_, err := c.KubeClientSet.AppsV1().StatefulSets(namespace).Create(ctx, affinityAssistantStatefulSet, metav1.CreateOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
}
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation
// and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
if unschedulableNodes == nil {
ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "spec.unschedulable=true",
})
if err != nil {
errs = append(errs, fmt.Errorf("failed to create StatefulSet %s: %w", affinityAssistantName, err))
errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err))
}
if err == nil {
logger.Infof("Created StatefulSet %s in namespace %s", affinityAssistantName, namespace)
unschedulableNodes = sets.Set[string]{}
// maintain the list of nodes which are unschedulable
for _, n := range ns.Items {
unschedulableNodes.Insert(n.Name)
}
// check whether the affinity assistant (StatefulSet) exists and the affinity assistant pod is created
// this check requires the StatefulSet to have the readyReplicas set to 1 to allow for any delay between the StatefulSet creation
// and the necessary pod creation, the delay can be caused by any dependency on PVCs and PVs creation
// this case addresses issues specified in https://github.com/tektoncd/pipeline/issues/6586
case err == nil && a != nil && a.Status.ReadyReplicas == 1:
if unschedulableNodes == nil {
ns, err := c.KubeClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
FieldSelector: "spec.unschedulable=true",
})
if err != nil {
errs = append(errs, fmt.Errorf("could not get the list of nodes, err: %w", err))
}
unschedulableNodes = sets.Set[string]{}
// maintain the list of nodes which are unschedulable
for _, n := range ns.Items {
unschedulableNodes.Insert(n.Name)
}
}
if unschedulableNodes.Len() > 0 {
// get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1
p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{})
// ignore instead of failing if the affinity assistant pod was not found
if err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err))
}
if unschedulableNodes.Len() > 0 {
// get the pod created for a given StatefulSet, pod is assigned ordinal of 0 with the replicas set to 1
p, err := c.KubeClientSet.CoreV1().Pods(pr.Namespace).Get(ctx, a.Name+"-0", metav1.GetOptions{})
// ignore instead of failing if the affinity assistant pod was not found
if err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("could not get the affinity assistant pod for StatefulSet %s: %w", a.Name, err))
}
// check the node which hosts the affinity assistant pod if it is unschedulable or cordoned
if p != nil && unschedulableNodes.Has(p.Spec.NodeName) {
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err))
}
// check the node which hosts the affinity assistant pod if it is unschedulable or cordoned
if p != nil && unschedulableNodes.Has(p.Spec.NodeName) {
// if the node is unschedulable, delete the affinity assistant pod such that a StatefulSet can recreate the same pod on a different node
err = c.KubeClientSet.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{})
if err != nil {
errs = append(errs, fmt.Errorf("error deleting affinity assistant pod %s in ns %s: %w", p.Name, p.Namespace, err))
}
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
case err != nil:
errs = append(errs, fmt.Errorf("failed to retrieve StatefulSet %s: %w", affinityAssistantName, err))
}
}
return errorutils.NewAggregate(errs)
}

func getClaimName(w v1beta1.WorkspaceBinding, ownerReference metav1.OwnerReference) string {
if w.PersistentVolumeClaim != nil {
return w.PersistentVolumeClaim.ClaimName
} else if w.VolumeClaimTemplate != nil {
return volumeclaim.GetPersistentVolumeClaimName(w.VolumeClaimTemplate, w, ownerReference)
}

return ""
}

func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.PipelineRun) error {
// omit cleanup if the feature is disabled
if c.isAffinityAssistantDisabled(ctx) {
Expand All @@ -136,11 +138,28 @@ func (c *Reconciler) cleanupAffinityAssistants(ctx context.Context, pr *v1beta1.
if err := c.KubeClientSet.AppsV1().StatefulSets(pr.Namespace).Delete(ctx, affinityAssistantStsName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("failed to delete StatefulSet %s: %w", affinityAssistantStsName, err))
}

// cleanup PVCs created by Affinity Assistants
if w.VolumeClaimTemplate != nil {
pvcName := getPersistentVolumeClaimNameWithAffinityAssistant(w.Name, pr.Name, w, *kmeta.NewControllerRef(pr))
if err := c.KubeClientSet.CoreV1().PersistentVolumeClaims(pr.Namespace).Delete(ctx, pvcName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("failed to delete PersistentVolumeClaim %s: %w", pvcName, err))
}
}
}
}
return errorutils.NewAggregate(errs)
}

// getPersistentVolumeClaimNameWithAffinityAssistant returns the PersistentVolumeClaim name that is
// created by the Affinity Assistant StatefulSet VolumeClaimTemplate when Affinity Assistant is enabled.
// The PVCs created by StatefulSet VolumeClaimTemplates follow the format `<pvcName>-<affinityAssistantName>-0`
func getPersistentVolumeClaimNameWithAffinityAssistant(pipelineWorkspaceName, prName string, wb v1beta1.WorkspaceBinding, owner metav1.OwnerReference) string {
pvcName := volumeclaim.GetPVCNameWithoutAffinityAssistant(wb.VolumeClaimTemplate.Name, wb, owner)
affinityAssistantName := getAffinityAssistantName(pipelineWorkspaceName, prName)
return fmt.Sprintf("%s-%s-0", pvcName, affinityAssistantName)
}

func getAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string {
hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName))
hashString := fmt.Sprintf("%x", hashBytes)
Expand All @@ -162,7 +181,10 @@ func getStatefulSetLabels(pr *v1beta1.PipelineRun, affinityAssistantName string)
return labels
}

func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimName string, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
// affinityAssistantStatefulSet returns an Affinity Assistant as a StatefulSet with the given AffinityAssistantTemplate applied to the StatefulSet PodTemplateSpec.
// The VolumeClaimTemplates and Volume of StatefulSet reference the PipelineRun WorkspaceBinding VolumeClaimTempalte and the PVCs respectively.
// The PVs created by the StatefulSet are scheduled to the same availability zone which avoids PV scheduling conflict.
func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimTemplates []corev1.PersistentVolumeClaim, claims []corev1.PersistentVolumeClaimVolumeSource, affinityAssistantImage string, defaultAATpl *pod.AffinityAssistantTemplate) *appsv1.StatefulSet {
QuanZhang-William marked this conversation as resolved.
Show resolved Hide resolved
// We want a singleton pod
replicas := int32(1)

Expand All @@ -172,6 +194,11 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
tpl = pod.MergeAAPodTemplateWithDefault(pr.Spec.PodTemplate.ToAffinityAssistantTemplate(), defaultAATpl)
}

var mounts []corev1.VolumeMount
for _, claimTemplate := range claimTemplates {
mounts = append(mounts, corev1.VolumeMount{Name: claimTemplate.Name, MountPath: claimTemplate.Name})
}

containers := []corev1.Container{{
Name: "affinity-assistant",
Image: affinityAssistantImage,
Expand All @@ -190,8 +217,27 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
"memory": resource.MustParse("100Mi"),
},
},
VolumeMounts: mounts,
}}

var volumes []corev1.Volume
for i, claim := range claims {
volumes = append(volumes, corev1.Volume{
Name: fmt.Sprintf("workspace-%d", i),
VolumeSource: corev1.VolumeSource{
// A Pod mounting a PersistentVolumeClaim that has a StorageClass with
// volumeBindingMode: Immediate
// the PV is allocated on a Node first, and then the pod need to be
// scheduled to that node.
// To support those PVCs, the Affinity Assistant must also mount the
// same PersistentVolumeClaim - to be sure that the Affinity Assistant
// pod is scheduled to the same Availability Zone as the PV, when using
// a regional cluster. This is called VolumeScheduling.
PersistentVolumeClaim: claim.DeepCopy(),
},
})
}

return &appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
Expand All @@ -207,6 +253,8 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
Selector: &metav1.LabelSelector{
MatchLabels: getStatefulSetLabels(pr, name),
},
// by setting VolumeClaimTemplates from StatefulSet, all the PVs are scheduled to the same Availability Zone as the StatefulSet
VolumeClaimTemplates: claimTemplates,
QuanZhang-William marked this conversation as resolved.
Show resolved Hide resolved
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: getStatefulSetLabels(pr, name),
Expand All @@ -219,21 +267,7 @@ func affinityAssistantStatefulSet(name string, pr *v1beta1.PipelineRun, claimNam
ImagePullSecrets: tpl.ImagePullSecrets,

Affinity: getAssistantAffinityMergedWithPodTemplateAffinity(pr),
Volumes: []corev1.Volume{{
Name: "workspace",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
// A Pod mounting a PersistentVolumeClaim that has a StorageClass with
// volumeBindingMode: Immediate
// the PV is allocated on a Node first, and then the pod need to be
// scheduled to that node.
// To support those PVCs, the Affinity Assistant must also mount the
// same PersistentVolumeClaim - to be sure that the Affinity Assistant
// pod is scheduled to the same Availability Zone as the PV, when using
// a regional cluster. This is called VolumeScheduling.
ClaimName: claimName,
}},
}},
Volumes: volumes,
},
},
},
Expand Down
Loading