Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc bug fix in manage controller #1121

Merged
merged 3 commits into from
Dec 23, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 72 additions & 81 deletions pkg/controller/manage/manage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -56,8 +57,6 @@
}

type KmeshManageController struct {
// TODO: share pod informer with bypass?
informerFactory informers.SharedInformerFactory
factory informers.SharedInformerFactory
podInformer cache.SharedIndexInformer
podLister v1.PodLister
Expand Down Expand Up @@ -89,9 +88,7 @@
namespaceLister := factory.Core().V1().Namespaces().Lister()

queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())

kmc := &KmeshManageController{
informerFactory: informerFactory,
c := &KmeshManageController{
podInformer: podInformer,
podLister: podLister,
factory: factory,
Expand All @@ -106,76 +103,55 @@

if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
kmc.handlePodAddFunc(obj)
c.handlePodUpdate(nil, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
kmc.handlePodUpdateFunc(oldObj, newObj)
c.handlePodUpdate(oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
kmc.handlePodDeleteFunc(obj)
c.handlePodDelete(obj)
},
}); err != nil {
return nil, fmt.Errorf("failed to add event handler to podInformer: %v", err)
}

if _, err := namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
kmc.handleNamespaceUpdateFunc(oldObj, newObj)
c.handleNamespaceUpdate(oldObj, newObj)
},
}); err != nil {
return nil, fmt.Errorf("failed to add event handler to namespaceInformer: %v", err)
}

return kmc, nil
return c, nil
}

func (kmc *KmeshManageController) handlePodAddFunc(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
func (c *KmeshManageController) handlePodUpdate(_, newObj interface{}) {
newPod, ok := newObj.(*corev1.Pod)
if !ok {
log.Errorf("expected *corev1.Pod but got %T", obj)
return
}

namespace, err := kmc.namespaceLister.Get(pod.Namespace)
if err != nil {
log.Errorf("failed to get pod namespace %s: %v", pod.Namespace, err)
return
}

if !utils.ShouldEnroll(pod, namespace) {
if utils.AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) {
kmc.disableKmeshManage(pod)
}
return
}
kmc.enableKmeshManage(pod)
}

func (kmc *KmeshManageController) handlePodUpdateFunc(_, newObj interface{}) {
newPod, okNew := newObj.(*corev1.Pod)
if !okNew {
log.Errorf("expected *corev1.Pod but got %T", newObj)
return
}

namespace, err := kmc.namespaceLister.Get(newPod.Namespace)
namespace, err := c.namespaceLister.Get(newPod.Namespace)
if err != nil {
log.Errorf("failed to get pod namespace %s: %v", newPod.Namespace, err)
return
}

// enable kmesh manage
if !utils.AnnotationEnabled(newPod.Annotations[constants.KmeshRedirectionAnnotation]) && utils.ShouldEnroll(newPod, namespace) {
kmc.enableKmeshManage(newPod)
c.enableKmeshManage(newPod)
return
}

// disable kmesh manage
if utils.AnnotationEnabled(newPod.Annotations[constants.KmeshRedirectionAnnotation]) && !utils.ShouldEnroll(newPod, namespace) {
kmc.disableKmeshManage(newPod)
c.disableKmeshManage(newPod)
}
}

func (kmc *KmeshManageController) handlePodDeleteFunc(obj interface{}) {
func (c *KmeshManageController) handlePodDelete(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
Expand All @@ -192,13 +168,13 @@

if utils.AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) {
log.Infof("%s/%s: Pod managed by Kmesh is deleted", pod.GetNamespace(), pod.GetName())
sendCertRequest(kmc.sm, pod, kmeshsecurity.DELETE)
sendCertRequest(c.sm, pod, kmeshsecurity.DELETE)
// We donot need to do handleKmeshManage for delete, because we may have no change to execute a cmd in pod net ns.
// And we have done this in kmesh-cni
}
}

func (kmc *KmeshManageController) handleNamespaceUpdateFunc(oldObj, newObj interface{}) {
func (c *KmeshManageController) handleNamespaceUpdate(oldObj, newObj interface{}) {
oldNS, okOld := oldObj.(*corev1.Namespace)
newNS, okNew := newObj.(*corev1.Namespace)
if !okOld || !okNew {
Expand All @@ -209,17 +185,18 @@
// Compare labels to check if they have actually changed
if !utils.ShouldEnroll(nil, oldNS) && utils.ShouldEnroll(nil, newNS) {
log.Infof("Enabling Kmesh for all pods in namespace: %s", newNS.Name)
kmc.enableKmeshForPodsInNamespace(newNS)
c.enableKmeshForPodsInNamespace(newNS)
return
}

if utils.ShouldEnroll(nil, oldNS) && !utils.ShouldEnroll(nil, newNS) {
log.Infof("Disabling Kmesh for all pods in namespace: %s", newNS.Name)
kmc.disableKmeshForPodsInNamespace(newNS)
c.disableKmeshForPodsInNamespace(newNS)
}
}

func (kmc *KmeshManageController) enableKmeshManage(pod *corev1.Pod) {
sendCertRequest(kmc.sm, pod, kmeshsecurity.ADD)
func (c *KmeshManageController) enableKmeshManage(pod *corev1.Pod) {
sendCertRequest(c.sm, pod, kmeshsecurity.ADD)
if !isPodReady(pod) {
log.Debugf("Pod %s/%s is not ready, skipping Kmesh manage enable", pod.GetNamespace(), pod.GetName())
return
Expand All @@ -230,12 +207,12 @@
log.Errorf("failed to enable Kmesh manage")
return
}
kmc.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation})
_ = linkXdp(nspath, kmc.xdpProgFd, kmc.mode)
c.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation})
_ = linkXdp(nspath, c.xdpProgFd, c.mode)
}

func (kmc *KmeshManageController) disableKmeshManage(pod *corev1.Pod) {
sendCertRequest(kmc.sm, pod, kmeshsecurity.DELETE)
func (c *KmeshManageController) disableKmeshManage(pod *corev1.Pod) {
sendCertRequest(c.sm, pod, kmeshsecurity.DELETE)
if !isPodReady(pod) {
log.Debugf("%s/%s is not ready, skipping Kmesh manage disable", pod.GetNamespace(), pod.GetName())
return
Expand All @@ -246,48 +223,53 @@
log.Error("failed to disable Kmesh manage")
return
}
kmc.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionDeleteAnnotation})
_ = unlinkXdp(nspath, kmc.mode)
c.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionDeleteAnnotation})
_ = unlinkXdp(nspath, c.mode)
}

func (kmc *KmeshManageController) enableKmeshForPodsInNamespace(namespace *corev1.Namespace) {
pods, err := kmc.podLister.Pods(namespace.Name).List(labels.Everything())
func (c *KmeshManageController) enableKmeshForPodsInNamespace(namespace *corev1.Namespace) {
pods, err := c.podLister.Pods(namespace.Name).List(labels.Everything())
if err != nil {
log.Errorf("Error listing pods in namespace %s: %v", namespace.Name, err)
return
}

for _, pod := range pods {
if utils.ShouldEnroll(pod, namespace) {
kmc.enableKmeshManage(pod)
c.enableKmeshManage(pod)
}
}
}

func (kmc *KmeshManageController) disableKmeshForPodsInNamespace(namespace *corev1.Namespace) {
pods, err := kmc.podLister.Pods(namespace.Name).List(labels.Everything())
func (c *KmeshManageController) disableKmeshForPodsInNamespace(namespace *corev1.Namespace) {
pods, err := c.podLister.Pods(namespace.Name).List(labels.Everything())
if err != nil {
log.Errorf("Error listing pods in namespace %s: %v", namespace.Name, err)
return
}

for _, pod := range pods {
if !utils.ShouldEnroll(pod, namespace) {
kmc.disableKmeshManage(pod)
c.disableKmeshManage(pod)
}
}
}

func (c *KmeshManageController) Run(stopChan <-chan struct{}) {
defer c.queue.ShutDown()
c.informerFactory.Start(stopChan)
go c.podInformer.Run(stopChan)
c.factory.Start(stopChan)
if !cache.WaitForCacheSync(stopChan, c.podInformer.HasSynced, c.namespaceInformer.HasSynced) {
log.Error("Timed out waiting for caches to sync")
log.Error("kmesh manage controller timed out waiting for caches to sync")

Check warning on line 263 in pkg/controller/manage/manage_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/manage/manage_controller.go#L263

Added line #L263 was not covered by tests
return
}
for c.processItems() {
}

go wait.Until(func() {
for c.processItems() {
}

Check warning on line 269 in pkg/controller/manage/manage_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/manage/manage_controller.go#L269

Added line #L269 was not covered by tests
}, 0, stopChan)

<-stopChan
}

func (c *KmeshManageController) processItems() bool {
Expand All @@ -303,27 +285,7 @@
return true
}

pod, err := c.podLister.Pods(queueItem.podNs).Get(queueItem.podName)
if err != nil {
if apierrors.IsNotFound(err) {
log.Infof("pod %s/%s has been deleted", queueItem.podNs, queueItem.podName)
return true
}
log.Errorf("failed to get pod %s/%s: %v", queueItem.podNs, queueItem.podName, err)
}
if pod != nil {
// TODO: handle error
namespace, _ := c.namespaceLister.Get(pod.Namespace)
if queueItem.action == ActionAddAnnotation && utils.ShouldEnroll(pod, namespace) {
log.Infof("add annotation for pod %s/%s", pod.Namespace, pod.Name)
err = utils.PatchKmeshRedirectAnnotation(c.client, pod)
} else if queueItem.action == ActionDeleteAnnotation && !utils.ShouldEnroll(pod, namespace) {
log.Infof("delete annotation for pod %s/%s", pod.Namespace, pod.Name)
err = utils.DelKmeshRedirectAnnotation(c.client, pod)
}
}

if err != nil {
if err := c.syncPod(queueItem); err != nil {

Check warning on line 288 in pkg/controller/manage/manage_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/manage/manage_controller.go#L288

Added line #L288 was not covered by tests
if c.queue.NumRequeues(key) < MaxRetries {
log.Errorf("failed to handle pod %s/%s action %s, err: %v, will retry", queueItem.podNs, queueItem.podName, queueItem.action, err)
c.queue.AddRateLimited(key)
Expand All @@ -333,10 +295,39 @@
}
return true
}

c.queue.Forget(key)
return true
}

func (c *KmeshManageController) syncPod(key QueueItem) error {
pod, err := c.podLister.Pods(key.podNs).Get(key.podName)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
log.Errorf("failed to get pod %s/%s: %v", key.podNs, key.podName, err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message is presented on line L290. My suggestion is to delete the log here and put the message in the err return value.

Suggested change
log.Errorf("failed to get pod %s/%s: %v", key.podNs, key.podName, err)
return fmt.Errorf("failed to get pod %s/%s: %v", key.podNs, key.podName, err)

return err

Check warning on line 310 in pkg/controller/manage/manage_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/manage/manage_controller.go#L303-L310

Added lines #L303 - L310 were not covered by tests
}
namespace, err := c.namespaceLister.Get(pod.Namespace)
if err != nil {
if !apierrors.IsNotFound(err) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if !apierrors.IsNotFound(err) {
if apierrors.IsNotFound(err) {

?

return nil
}
log.Errorf("failed to get pod namespace %s: %v", pod.Namespace, err)
return err

Check warning on line 318 in pkg/controller/manage/manage_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/manage/manage_controller.go#L312-L318

Added lines #L312 - L318 were not covered by tests
}

if key.action == ActionAddAnnotation && utils.ShouldEnroll(pod, namespace) {
log.Infof("add annotation for pod %s/%s", pod.Namespace, pod.Name)
return utils.PatchKmeshRedirectAnnotation(c.client, pod)
} else if key.action == ActionDeleteAnnotation && !utils.ShouldEnroll(pod, namespace) {
log.Infof("delete annotation for pod %s/%s", pod.Namespace, pod.Name)
return utils.DelKmeshRedirectAnnotation(c.client, pod)
}
return nil

Check warning on line 328 in pkg/controller/manage/manage_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/manage/manage_controller.go#L321-L328

Added lines #L321 - L328 were not covered by tests
}

func sendCertRequest(security *kmeshsecurity.SecretManager, pod *corev1.Pod, op int) {
if security != nil {
Identity := spiffe.Identity{
Expand Down
Loading