Skip to content

Commit 7875e6a

Browse files
Modify pod annotations update to safely allow concurrent writes
1 parent fbccaad commit 7875e6a

File tree

5 files changed

+138
-67
lines changed

5 files changed

+138
-67
lines changed

pkg/daemon/daemon.go

Lines changed: 78 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,7 @@ func (d *daemon) getIbSriovNetwork(networkID string) (string, *utils.IbSriovCniS
209209
// Check if this network's resource is managed by this daemon
210210
resourceName := netAttInfo.Annotations["k8s.v1.cni.cncf.io/resourceName"]
211211
if resourceName == "" || !d.config.IsManagedResource(resourceName) {
212-
// TODO(Nik) dev qol, check if someone else manages this resource or if it is orphan
213-
// checkResourceOwner(networkNamespace, networkName)
212+
// TODO(Nik) qol, check if someone else manages this resource or if it is orphan
214213
return "", nil, fmt.Errorf("network %s uses resource %s which is not managed by this daemon", networkName, resourceName)
215214
}
216215

@@ -254,8 +253,8 @@ func getPodNetworkInfo(netName string, pod *kapi.Pod, netMap networksMap) (*podN
254253

255254
// addPodFinalizer adds the GUID cleanup finalizer to a pod
256255
func (d *daemon) addPodFinalizer(pod *kapi.Pod, networkName string) error {
256+
podFinalizer := fmt.Sprintf("%s-%s", PodGUIDFinalizer, networkName)
257257
return wait.ExponentialBackoff(backoffValues, func() (bool, error) {
258-
podFinalizer := fmt.Sprintf("%s-%s", PodGUIDFinalizer, networkName)
259258
if err := d.kubeClient.AddFinalizerToPod(pod, podFinalizer); err != nil {
260259
log.Warn().Msgf("failed to add finalizer to pod %s/%s: %v",
261260
pod.Namespace, pod.Name, err)
@@ -267,8 +266,8 @@ func (d *daemon) addPodFinalizer(pod *kapi.Pod, networkName string) error {
267266

268267
// removePodFinalizer removes the GUID cleanup finalizer from a pod
269268
func (d *daemon) removePodFinalizer(pod *kapi.Pod, networkName string) error {
269+
podFinalizer := fmt.Sprintf("%s-%s", PodGUIDFinalizer, networkName)
270270
return wait.ExponentialBackoff(backoffValues, func() (bool, error) {
271-
podFinalizer := fmt.Sprintf("%s-%s", PodGUIDFinalizer, networkName)
272271
if err := d.kubeClient.RemoveFinalizerFromPod(pod, podFinalizer); err != nil {
273272
log.Warn().Msgf("failed to remove finalizer from pod %s/%s: %v",
274273
pod.Namespace, pod.Name, err)
@@ -480,44 +479,104 @@ func syncGUIDPool(smClient plugins.SubnetManagerClient, guidPool guid.Pool) erro
480479

481480
// Update and set Pod's network annotation.
482481
// If failed to update annotation, pod's GUID added into the list to be removed from Pkey.
483-
func (d *daemon) updatePodNetworkAnnotation(pi *podNetworkInfo, removedList *[]net.HardwareAddr) error {
482+
func (d *daemon) updatePodNetworkAnnotation(pi *podNetworkInfo, removedList *[]net.HardwareAddr) {
484483
if pi.ibNetwork.CNIArgs == nil {
485484
pi.ibNetwork.CNIArgs = &map[string]interface{}{}
486485
}
487486

488487
(*pi.ibNetwork.CNIArgs)[utils.InfiniBandAnnotation] = utils.ConfiguredInfiniBandPod
489-
netAnnotations, err := json.Marshal(pi.networks)
490-
if err != nil {
491-
return fmt.Errorf("failed to dump networks %+v of pod into json with error: %v", pi.networks, err)
492-
}
493-
494-
pi.pod.Annotations[v1.NetworkAttachmentAnnot] = string(netAnnotations)
495488

496489
// Try to set pod's annotations in backoff loop
497-
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
498-
log.Info().Msgf("updatePodNetworkAnnotation(): Updating pod annotation for pod: %s with anootation: %s", pi.pod.Name, pi.pod.Annotations)
490+
if err := wait.ExponentialBackoff(backoffValues, func() (bool, error) {
491+
492+
// Get latest annotations state to avoid conflicts
493+
latestPodAnnotations, networks, err := d.getLatestPodAnnotations(pi.pod)
494+
if err != nil {
495+
log.Warn().Msgf("failed to get latest pod annotations for %s/%s: %v", pi.pod.Namespace, pi.pod.Name, err)
496+
return false, nil
497+
}
498+
499+
targetNetwork, err := utils.GetPodNetwork(networks, pi.ibNetwork.Name)
500+
if err != nil {
501+
return false, fmt.Errorf("failed to locate network %s in pod %s/%s annotations: %v", pi.ibNetwork.Name, pi.pod.Namespace, pi.pod.Name, err)
502+
}
503+
504+
err = updateInfiniBandNetwork(targetNetwork, pi.ibNetwork)
505+
if err != nil {
506+
return false, fmt.Errorf("failed to update infiniband network for pod %s/%s: %v", pi.pod.Namespace, pi.pod.Name, err)
507+
}
508+
509+
netAnnotations, err := json.Marshal(networks)
510+
if err != nil {
511+
return false, fmt.Errorf("failed to marshal updated networks for pod %s/%s: %v", pi.pod.Namespace, pi.pod.Name, err)
512+
}
513+
514+
if latestPodAnnotations == nil {
515+
return false, fmt.Errorf("latestPodAnnotations is nil for pod %s/%s", pi.pod.Namespace, pi.pod.Name)
516+
}
517+
518+
latestPodAnnotations[v1.NetworkAttachmentAnnot] = string(netAnnotations)
519+
pi.pod.Annotations = latestPodAnnotations
520+
521+
log.Info().Msgf("updatePodNetworkAnnotation(): Updating pod annotation for pod: %s/%s", pi.pod.Namespace, pi.pod.Name)
499522
if err = d.kubeClient.SetAnnotationsOnPod(pi.pod, pi.pod.Annotations); err != nil {
500523
if kerrors.IsNotFound(err) {
501524
return false, err
502525
}
503-
log.Warn().Msgf("failed to update pod annotations with err: %v", err)
526+
if kerrors.IsConflict(err) {
527+
log.Warn().Msgf("conflict while updating pod annotations for %s/%s, will retry", pi.pod.Namespace, pi.pod.Name)
528+
return false, nil
529+
}
530+
log.Warn().Msgf("failed to update pod annotations for %s/%s with err: %v", pi.pod.Namespace, pi.pod.Name, err)
504531
return false, nil
505532
}
506-
log.Info().Msgf("updatePodNetworkAnnotation(): Success on updating pod annotation for pod: %s with anootation: %s", pi.pod.Name, pi.pod.Annotations)
533+
534+
log.Info().Msgf("updatePodNetworkAnnotation(): Success on updating pod annotation for pod: %s/%s with annotations: %s", pi.pod.Namespace, pi.pod.Name, pi.pod.Annotations)
507535
return true, nil
508536
}); err != nil {
509-
log.Error().Msgf("failed to update pod annotations")
537+
log.Error().Msgf("failed to update pod annotations for %s/%s with error: %v", pi.pod.Namespace, pi.pod.Name, err)
510538

511539
if err = d.guidPool.ReleaseGUID(pi.addr.String()); err != nil {
512-
log.Warn().Msgf("failed to release guid \"%s\" from removed pod \"%s\" in namespace "+
513-
"\"%s\" with error: %v", pi.addr.String(), pi.pod.Name, pi.pod.Namespace, err)
540+
log.Warn().Msgf("failed to release guid \"%s\" from removed pod \"%s\" in namespace \"%s\" with error: %v", pi.addr.String(), pi.pod.Name, pi.pod.Namespace, err)
514541
} else {
515542
delete(d.guidPodNetworkMap, pi.addr.String())
516543
}
517544

518545
*removedList = append(*removedList, pi.addr)
519546
}
520547

548+
return
549+
}
550+
551+
// Retrieves the latest annotations for a pod and returns the annotations and the pod networks.
552+
func (d *daemon) getLatestPodAnnotations(pod *kapi.Pod) (map[string]string, []*v1.NetworkSelectionElement, error) {
553+
latestPod, err := d.kubeClient.GetPod(pod.Namespace, pod.Name)
554+
if err != nil {
555+
return nil, nil, err
556+
}
557+
558+
networks, err := netAttUtils.ParsePodNetworkAnnotation(latestPod)
559+
if err != nil {
560+
return nil, nil, err
561+
}
562+
563+
return latestPod.Annotations, networks, nil
564+
}
565+
566+
// Replaces target network with source network, erroring if source is already configured.
567+
func updateInfiniBandNetwork(target *v1.NetworkSelectionElement, source *v1.NetworkSelectionElement) error {
568+
if target == nil || source == nil {
569+
return fmt.Errorf("target or source network is nil")
570+
}
571+
572+
if target.CNIArgs != nil {
573+
if (*target.CNIArgs)[utils.InfiniBandAnnotation] == utils.ConfiguredInfiniBandPod {
574+
return fmt.Errorf("target network is already configured")
575+
}
576+
}
577+
578+
target.InfinibandGUIDRequest = source.InfinibandGUIDRequest
579+
target.CNIArgs = source.CNIArgs
521580
return nil
522581
}
523582

@@ -609,10 +668,7 @@ func (d *daemon) AddPeriodicUpdate() {
609668
var removedGUIDList []net.HardwareAddr
610669
for _, pi := range passedPods {
611670
log.Info().Msgf("Updating annotations for the pod %s, network %s", pi.pod.Name, pi.ibNetwork.Name)
612-
err = d.updatePodNetworkAnnotation(pi, &removedGUIDList)
613-
if err != nil {
614-
log.Error().Msgf("%v", err)
615-
}
671+
d.updatePodNetworkAnnotation(pi, &removedGUIDList)
616672
}
617673

618674
if ibCniSpec.PKey != "" && len(removedGUIDList) != 0 {

pkg/daemon/daemon_e2e_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"github.com/stretchr/testify/mock"
1010
corev1 "k8s.io/api/core/v1"
1111
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12-
"k8s.io/apimachinery/pkg/types"
1312
"k8s.io/client-go/kubernetes"
1413
"k8s.io/client-go/rest"
1514
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -457,6 +456,15 @@ func (c *testK8sClient) GetPods(namespace string) (*corev1.PodList, error) {
457456
return podList, err
458457
}
459458

459+
func (c *testK8sClient) GetPod(namespace, name string) (*corev1.Pod, error) {
460+
pod := &corev1.Pod{}
461+
key := client.ObjectKey{Namespace: namespace, Name: name}
462+
if err := c.client.Get(ctx, key, pod); err != nil {
463+
return nil, err
464+
}
465+
return pod, nil
466+
}
467+
460468
func (c *testK8sClient) SetAnnotationsOnPod(pod *corev1.Pod, annotations map[string]string) error {
461469
pod.Annotations = annotations
462470
return c.client.Update(ctx, pod)
@@ -533,9 +541,3 @@ func (c *testK8sClient) GetRestClient() rest.Interface {
533541
}
534542
return clientset.CoreV1().RESTClient()
535543
}
536-
537-
func (c *testK8sClient) PatchPod(pod *corev1.Pod, patchType types.PatchType, patchData []byte) error {
538-
// Use controller-runtime client patch
539-
patch := client.RawPatch(patchType, patchData)
540-
return c.client.Patch(ctx, pod, patch)
541-
}

pkg/daemon/daemon_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,8 @@ var _ = Describe("Daemon Finalizer Tests", func() {
209209
kubeClient.On("AddFinalizerToNetworkAttachmentDefinition", "default", "ib-network2", GUIDInUFMFinalizer).Return(nil)
210210

211211
// Mock pod annotation updates
212-
kubeClient.On("SetAnnotationsOnPod", testPod, mock.AnythingOfType("map[string]string")).Return(nil)
212+
kubeClient.On("GetPod", testPod.Namespace, testPod.Name).Return(testPod, nil)
213+
kubeClient.On("SetAnnotationsOnPod", mock.AnythingOfType("*v1.Pod"), mock.AnythingOfType("map[string]string")).Return(nil)
213214

214215
// Mock SM client calls
215216
smClient.On("AddGuidsToPKey", 1, mock.AnythingOfType("[]net.HardwareAddr")).Return(nil)

pkg/k8s-client/client.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,15 @@ package k8sclient
22

33
import (
44
"context"
5-
"encoding/json"
65
"fmt"
6+
"reflect"
77
"time"
88

99
netapi "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
1010
netclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/typed/k8s.cni.cncf.io/v1"
1111
"github.com/rs/zerolog/log"
1212
kapi "k8s.io/api/core/v1"
1313
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14-
"k8s.io/apimachinery/pkg/types"
1514
"k8s.io/apimachinery/pkg/util/wait"
1615
"k8s.io/client-go/kubernetes"
1716
"k8s.io/client-go/rest"
@@ -20,8 +19,8 @@ import (
2019

2120
type Client interface {
2221
GetPods(namespace string) (*kapi.PodList, error)
22+
GetPod(namespace, name string) (*kapi.Pod, error)
2323
SetAnnotationsOnPod(pod *kapi.Pod, annotations map[string]string) error
24-
PatchPod(pod *kapi.Pod, patchType types.PatchType, patchData []byte) error
2524
GetNetworkAttachmentDefinition(namespace, name string) (*netapi.NetworkAttachmentDefinition, error)
2625
GetRestClient() rest.Interface
2726
AddFinalizerToNetworkAttachmentDefinition(namespace, name, finalizer string) error
@@ -65,33 +64,38 @@ func (c *client) GetPods(namespace string) (*kapi.PodList, error) {
6564
return c.clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
6665
}
6766

67+
// GetPod obtains a single Pod resource from the kubernetes api server
68+
func (c *client) GetPod(namespace, name string) (*kapi.Pod, error) {
69+
log.Debug().Msgf("getting pod namespace: %s, name: %s", namespace, name)
70+
return c.clientset.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
71+
}
72+
6873
// SetAnnotationsOnPod takes the pod object and map of key/value string pairs to set as annotations
6974
func (c *client) SetAnnotationsOnPod(pod *kapi.Pod, annotations map[string]string) error {
7075
log.Info().Msgf("Setting annotation on pod, namespace: %s, podName: %s, annotations: %v",
7176
pod.Namespace, pod.Name, annotations)
72-
var err error
73-
var patchData []byte
74-
patch := struct {
75-
Metadata map[string]interface{} `json:"metadata"`
76-
}{
77-
Metadata: map[string]interface{}{
78-
"annotations": annotations,
79-
},
80-
}
8177

82-
podDesc := pod.Namespace + "/" + pod.Name
83-
patchData, err = json.Marshal(&patch)
78+
// Get the latest version of the pod
79+
currentPod, err := c.clientset.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
8480
if err != nil {
85-
return fmt.Errorf("failed to set annotations on pod %s: %v", podDesc, err)
81+
return err
8682
}
87-
return c.PatchPod(pod, types.MergePatchType, patchData)
88-
}
8983

90-
// PatchPod applies the patch changes
91-
func (c *client) PatchPod(pod *kapi.Pod, patchType types.PatchType, patchData []byte) error {
92-
log.Debug().Msgf("patch pod, namespace: %s, podName: %s", pod.Namespace, pod.Name)
93-
_, err := c.clientset.CoreV1().Pods(pod.Namespace).Patch(
94-
context.TODO(), pod.Name, patchType, patchData, metav1.PatchOptions{})
84+
// Check if there are any conflicts with the current view of the pod's annotations and the latest version.
85+
if currentPod.Annotations != nil {
86+
if !reflect.DeepEqual(currentPod.Annotations, pod.Annotations) {
87+
return fmt.Errorf("conflict with the current view of the pod's annotations and the latest version")
88+
}
89+
}
90+
91+
currentPod.Annotations = annotations
92+
93+
// Update the pod with retry and backoff
94+
err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
95+
_, err = c.clientset.CoreV1().Pods(pod.Namespace).Update(
96+
context.Background(), currentPod, metav1.UpdateOptions{})
97+
return err == nil, nil
98+
})
9599
return err
96100
}
97101

pkg/k8s-client/mocks/Client.go

Lines changed: 23 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)