Skip to content

Commit

Permalink
Merge pull request kmesh-net#1121 from hzxuzhonghu/manager-controller
Browse files Browse the repository at this point in the history
misc bug fix in manage controller
  • Loading branch information
kmesh-bot authored Dec 23, 2024
2 parents 762ed49 + 5a1a958 commit f92af83
Showing 1 changed file with 70 additions and 81 deletions.
151 changes: 70 additions & 81 deletions pkg/controller/manage/manage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -56,8 +57,6 @@ type QueueItem struct {
}

type KmeshManageController struct {
// TODO: share pod informer with bypass?
informerFactory informers.SharedInformerFactory
factory informers.SharedInformerFactory
podInformer cache.SharedIndexInformer
podLister v1.PodLister
Expand Down Expand Up @@ -89,9 +88,7 @@ func NewKmeshManageController(client kubernetes.Interface, sm *kmeshsecurity.Sec
namespaceLister := factory.Core().V1().Namespaces().Lister()

queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())

kmc := &KmeshManageController{
informerFactory: informerFactory,
c := &KmeshManageController{
podInformer: podInformer,
podLister: podLister,
factory: factory,
Expand All @@ -106,76 +103,55 @@ func NewKmeshManageController(client kubernetes.Interface, sm *kmeshsecurity.Sec

if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
kmc.handlePodAddFunc(obj)
c.handlePodUpdate(nil, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
kmc.handlePodUpdateFunc(oldObj, newObj)
c.handlePodUpdate(oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
kmc.handlePodDeleteFunc(obj)
c.handlePodDelete(obj)
},
}); err != nil {
return nil, fmt.Errorf("failed to add event handler to podInformer: %v", err)
}

if _, err := namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
kmc.handleNamespaceUpdateFunc(oldObj, newObj)
c.handleNamespaceUpdate(oldObj, newObj)
},
}); err != nil {
return nil, fmt.Errorf("failed to add event handler to namespaceInformer: %v", err)
}

return kmc, nil
return c, nil
}

func (kmc *KmeshManageController) handlePodAddFunc(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
func (c *KmeshManageController) handlePodUpdate(_, newObj interface{}) {
newPod, ok := newObj.(*corev1.Pod)
if !ok {
log.Errorf("expected *corev1.Pod but got %T", obj)
return
}

namespace, err := kmc.namespaceLister.Get(pod.Namespace)
if err != nil {
log.Errorf("failed to get pod namespace %s: %v", pod.Namespace, err)
return
}

if !utils.ShouldEnroll(pod, namespace) {
if utils.AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) {
kmc.disableKmeshManage(pod)
}
return
}
kmc.enableKmeshManage(pod)
}

func (kmc *KmeshManageController) handlePodUpdateFunc(_, newObj interface{}) {
newPod, okNew := newObj.(*corev1.Pod)
if !okNew {
log.Errorf("expected *corev1.Pod but got %T", newObj)
return
}

namespace, err := kmc.namespaceLister.Get(newPod.Namespace)
namespace, err := c.namespaceLister.Get(newPod.Namespace)
if err != nil {
log.Errorf("failed to get pod namespace %s: %v", newPod.Namespace, err)
return
}

// enable kmesh manage
if !utils.AnnotationEnabled(newPod.Annotations[constants.KmeshRedirectionAnnotation]) && utils.ShouldEnroll(newPod, namespace) {
kmc.enableKmeshManage(newPod)
c.enableKmeshManage(newPod)
return
}

// disable kmesh manage
if utils.AnnotationEnabled(newPod.Annotations[constants.KmeshRedirectionAnnotation]) && !utils.ShouldEnroll(newPod, namespace) {
kmc.disableKmeshManage(newPod)
c.disableKmeshManage(newPod)
}
}

func (kmc *KmeshManageController) handlePodDeleteFunc(obj interface{}) {
func (c *KmeshManageController) handlePodDelete(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
Expand All @@ -192,13 +168,13 @@ func (kmc *KmeshManageController) handlePodDeleteFunc(obj interface{}) {

if utils.AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) {
log.Infof("%s/%s: Pod managed by Kmesh is deleted", pod.GetNamespace(), pod.GetName())
sendCertRequest(kmc.sm, pod, kmeshsecurity.DELETE)
sendCertRequest(c.sm, pod, kmeshsecurity.DELETE)
// We donot need to do handleKmeshManage for delete, because we may have no change to execute a cmd in pod net ns.
// And we have done this in kmesh-cni
}
}

func (kmc *KmeshManageController) handleNamespaceUpdateFunc(oldObj, newObj interface{}) {
func (c *KmeshManageController) handleNamespaceUpdate(oldObj, newObj interface{}) {
oldNS, okOld := oldObj.(*corev1.Namespace)
newNS, okNew := newObj.(*corev1.Namespace)
if !okOld || !okNew {
Expand All @@ -209,17 +185,18 @@ func (kmc *KmeshManageController) handleNamespaceUpdateFunc(oldObj, newObj inter
// Compare labels to check if they have actually changed
if !utils.ShouldEnroll(nil, oldNS) && utils.ShouldEnroll(nil, newNS) {
log.Infof("Enabling Kmesh for all pods in namespace: %s", newNS.Name)
kmc.enableKmeshForPodsInNamespace(newNS)
c.enableKmeshForPodsInNamespace(newNS)
return
}

if utils.ShouldEnroll(nil, oldNS) && !utils.ShouldEnroll(nil, newNS) {
log.Infof("Disabling Kmesh for all pods in namespace: %s", newNS.Name)
kmc.disableKmeshForPodsInNamespace(newNS)
c.disableKmeshForPodsInNamespace(newNS)
}
}

func (kmc *KmeshManageController) enableKmeshManage(pod *corev1.Pod) {
sendCertRequest(kmc.sm, pod, kmeshsecurity.ADD)
func (c *KmeshManageController) enableKmeshManage(pod *corev1.Pod) {
sendCertRequest(c.sm, pod, kmeshsecurity.ADD)
if !isPodReady(pod) {
log.Debugf("Pod %s/%s is not ready, skipping Kmesh manage enable", pod.GetNamespace(), pod.GetName())
return
Expand All @@ -230,12 +207,12 @@ func (kmc *KmeshManageController) enableKmeshManage(pod *corev1.Pod) {
log.Errorf("failed to enable Kmesh manage")
return
}
kmc.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation})
_ = linkXdp(nspath, kmc.xdpProgFd, kmc.mode)
c.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation})
_ = linkXdp(nspath, c.xdpProgFd, c.mode)
}

func (kmc *KmeshManageController) disableKmeshManage(pod *corev1.Pod) {
sendCertRequest(kmc.sm, pod, kmeshsecurity.DELETE)
func (c *KmeshManageController) disableKmeshManage(pod *corev1.Pod) {
sendCertRequest(c.sm, pod, kmeshsecurity.DELETE)
if !isPodReady(pod) {
log.Debugf("%s/%s is not ready, skipping Kmesh manage disable", pod.GetNamespace(), pod.GetName())
return
Expand All @@ -246,48 +223,53 @@ func (kmc *KmeshManageController) disableKmeshManage(pod *corev1.Pod) {
log.Error("failed to disable Kmesh manage")
return
}
kmc.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionDeleteAnnotation})
_ = unlinkXdp(nspath, kmc.mode)
c.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionDeleteAnnotation})
_ = unlinkXdp(nspath, c.mode)
}

func (kmc *KmeshManageController) enableKmeshForPodsInNamespace(namespace *corev1.Namespace) {
pods, err := kmc.podLister.Pods(namespace.Name).List(labels.Everything())
func (c *KmeshManageController) enableKmeshForPodsInNamespace(namespace *corev1.Namespace) {
pods, err := c.podLister.Pods(namespace.Name).List(labels.Everything())
if err != nil {
log.Errorf("Error listing pods in namespace %s: %v", namespace.Name, err)
return
}

for _, pod := range pods {
if utils.ShouldEnroll(pod, namespace) {
kmc.enableKmeshManage(pod)
c.enableKmeshManage(pod)
}
}
}

func (kmc *KmeshManageController) disableKmeshForPodsInNamespace(namespace *corev1.Namespace) {
pods, err := kmc.podLister.Pods(namespace.Name).List(labels.Everything())
func (c *KmeshManageController) disableKmeshForPodsInNamespace(namespace *corev1.Namespace) {
pods, err := c.podLister.Pods(namespace.Name).List(labels.Everything())
if err != nil {
log.Errorf("Error listing pods in namespace %s: %v", namespace.Name, err)
return
}

for _, pod := range pods {
if !utils.ShouldEnroll(pod, namespace) {
kmc.disableKmeshManage(pod)
c.disableKmeshManage(pod)
}
}
}

func (c *KmeshManageController) Run(stopChan <-chan struct{}) {
defer c.queue.ShutDown()
c.informerFactory.Start(stopChan)
go c.podInformer.Run(stopChan)
c.factory.Start(stopChan)
if !cache.WaitForCacheSync(stopChan, c.podInformer.HasSynced, c.namespaceInformer.HasSynced) {
log.Error("Timed out waiting for caches to sync")
log.Error("kmesh manage controller timed out waiting for caches to sync")
return
}
for c.processItems() {
}

go wait.Until(func() {
for c.processItems() {
}
}, 0, stopChan)

<-stopChan
}

func (c *KmeshManageController) processItems() bool {
Expand All @@ -303,27 +285,7 @@ func (c *KmeshManageController) processItems() bool {
return true
}

pod, err := c.podLister.Pods(queueItem.podNs).Get(queueItem.podName)
if err != nil {
if apierrors.IsNotFound(err) {
log.Infof("pod %s/%s has been deleted", queueItem.podNs, queueItem.podName)
return true
}
log.Errorf("failed to get pod %s/%s: %v", queueItem.podNs, queueItem.podName, err)
}
if pod != nil {
// TODO: handle error
namespace, _ := c.namespaceLister.Get(pod.Namespace)
if queueItem.action == ActionAddAnnotation && utils.ShouldEnroll(pod, namespace) {
log.Infof("add annotation for pod %s/%s", pod.Namespace, pod.Name)
err = utils.PatchKmeshRedirectAnnotation(c.client, pod)
} else if queueItem.action == ActionDeleteAnnotation && !utils.ShouldEnroll(pod, namespace) {
log.Infof("delete annotation for pod %s/%s", pod.Namespace, pod.Name)
err = utils.DelKmeshRedirectAnnotation(c.client, pod)
}
}

if err != nil {
if err := c.syncPod(queueItem); err != nil {
if c.queue.NumRequeues(key) < MaxRetries {
log.Errorf("failed to handle pod %s/%s action %s, err: %v, will retry", queueItem.podNs, queueItem.podName, queueItem.action, err)
c.queue.AddRateLimited(key)
Expand All @@ -333,10 +295,37 @@ func (c *KmeshManageController) processItems() bool {
}
return true
}

c.queue.Forget(key)
return true
}

func (c *KmeshManageController) syncPod(key QueueItem) error {
pod, err := c.podLister.Pods(key.podNs).Get(key.podName)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to get pod %s/%s: %v", key.podNs, key.podName, err)
}
namespace, err := c.namespaceLister.Get(pod.Namespace)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to get pod namespace %s: %v", pod.Namespace, err)
}

if key.action == ActionAddAnnotation && utils.ShouldEnroll(pod, namespace) {
log.Infof("add annotation for pod %s/%s", pod.Namespace, pod.Name)
return utils.PatchKmeshRedirectAnnotation(c.client, pod)
} else if key.action == ActionDeleteAnnotation && !utils.ShouldEnroll(pod, namespace) {
log.Infof("delete annotation for pod %s/%s", pod.Namespace, pod.Name)
return utils.DelKmeshRedirectAnnotation(c.client, pod)
}
return nil
}

func sendCertRequest(security *kmeshsecurity.SecretManager, pod *corev1.Pod, op int) {
if security != nil {
Identity := spiffe.Identity{
Expand Down

0 comments on commit f92af83

Please sign in to comment.