Skip to content

Commit

Permalink
Fixing too generic Endpoint, DanmEp, Services, and Pod interrogation.
Browse files Browse the repository at this point in the history
When handling an API event related to thes objects, svcwatcher unnecessrily
interrogated ALL existing objects.
It is fixed now by always using the namespace awarre lister client API instead the generic.
  • Loading branch information
Levovar committed Mar 23, 2020
1 parent 677ac86 commit 899ae9f
Showing 1 changed file with 27 additions and 27 deletions.
54 changes: 27 additions & 27 deletions pkg/svccontrol/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const (
type Controller struct {
kubeclient kubernetes.Interface
danmclient danmclientset.Interface

podLister corelisters.PodLister
podSynced cache.InformerSynced
serviceLister corelisters.ServiceLister
Expand Down Expand Up @@ -212,7 +211,7 @@ func (c *Controller) UpdateEndpointsList(epList []*corev1.Endpoints) error {

func (c *Controller) CreateModifyEndpoints(svc *corev1.Service, doesEpAlreadyExist bool, des []*danmv1.DanmEp) error {
var err error
epNew := c.MakeNewEps(svc, des)
epNew := c.MakeNewEps(svc, des)
if doesEpAlreadyExist {
_, err = c.kubeclient.CoreV1().Endpoints(svc.Namespace).Update(&epNew)
} else {
Expand Down Expand Up @@ -356,6 +355,7 @@ func (c *Controller) MakeNewEps(svc *corev1.Service, des []*danmv1.DanmEp) (core
epNew.Subsets = subsets
return epNew
}

//////////////////////////////
// //
// Danmep change handlers //
Expand All @@ -368,22 +368,22 @@ func (c *Controller) addDanmep(obj interface{}) {
glog.V(5).Infof("addDanmep is called: %s %s", obj.(*danmv1.DanmEp).GetName(), obj.(*danmv1.DanmEp).GetNamespace())
de := obj.(*danmv1.DanmEp)
ipAddr, ip6Addr := getIpsFromDanmEp(de)
sel := labels.Everything()
servicesList, err := c.serviceLister.List(sel)
svcNamespaceLister := c.serviceLister.Services(de.ObjectMeta.Namespace)
svcList, err := svcNamespaceLister.List(labels.Everything())
if err != nil {
glog.Errorf("addDanmEp: get services: %s", err)
return
}
svcList := MatchExistingSvc(de, servicesList)
if len(svcList) > 0 {
for _, svc := range svcList {
pod, err := c.podLister.Pods(de.Namespace).Get(de.Spec.Pod)
matchedSvcList := MatchExistingSvc(de, svcList)
if len(matchedSvcList) > 0 {
for _, svc := range matchedSvcList {
pod, err := c.podLister.Pods(de.ObjectMeta.Namespace).Get(de.Spec.Pod)
if err != nil {
glog.Errorf("addDanmEp: get pod %s", err)
continue
}
for i := 0; i < MaxUpdateRetry; i++ {
eps, err := c.epsLister.Endpoints(svc.Namespace).Get(svc.Name)
eps, err := c.epsLister.Endpoints(svc.ObjectMeta.Namespace).Get(svc.ObjectMeta.Name)
if err != nil && !errors.IsNotFound(err) {
glog.Errorf("addDanmEp: get ep %s", err)
break
Expand Down Expand Up @@ -432,11 +432,10 @@ func (c *Controller) delDanmep(obj interface{}) {
glog.V(5).Infof("delDanmep is called: %s %s", obj.(*danmv1.DanmEp).GetName(), obj.(*danmv1.DanmEp).GetNamespace())
de := obj.(*danmv1.DanmEp)
ipAddr, ip6Addr := getIpsFromDanmEp(de)
deNs := de.Namespace
var epList []*corev1.Endpoints
sel := labels.Everything()
for i := 0; i < MaxUpdateRetry; i++ {
epsList, err := c.epsLister.List(sel)
epNamespaceLister := c.epsLister.Endpoints(de.ObjectMeta.Namespace)
epsList, err := epNamespaceLister.List(labels.Everything())
if err != nil {
glog.Errorf("delDanmep: get epslist: %s", err)
return
Expand All @@ -452,11 +451,10 @@ func (c *Controller) delDanmep(obj interface{}) {
glog.Errorf("delDanmEp: selector %s", err)
return
}
if len(selectorMap) == 0 || !isDepSelectedBySvc(de, svcNets) || epNew.Namespace != deNs {
if len(selectorMap) == 0 || !isDepSelectedBySvc(de, svcNets) || epNew.Namespace != de.ObjectMeta.Namespace {
continue
}
deMap := de.GetLabels()
deFit := IsContain(deMap, selectorMap)
deFit := IsContain(de.GetLabels(), selectorMap)
if !deFit {
continue
}
Expand Down Expand Up @@ -495,15 +493,14 @@ func (c *Controller) updatePod(old, new interface{}) {
if oldPod.ResourceVersion == newPod.ResourceVersion || newPod.ObjectMeta.DeletionTimestamp != nil {
return
}

labelChange := PodLabelChanged(oldPod, newPod)
oldReady := PodReady(oldPod)
newReady := PodReady(newPod)
sel := labels.Everything()
if oldReady == newReady && !labelChange {
// nothing is changed just resource version. endpoints targetref need to be updated
// nothing is changed just resource version. endpoints targetref need to be updated
for i := 0; i < MaxUpdateRetry; i++ {
epsList, err := c.epsLister.List(sel)
epNamespaceLister := c.epsLister.Endpoints(newPod.ObjectMeta.Namespace)
epsList, err := epNamespaceLister.List(labels.Everything())
if err != nil {
glog.Errorf("updatePod: get eps %s", err)
return
Expand All @@ -527,7 +524,8 @@ func (c *Controller) updatePod(old, new interface{}) {
// first we need to reflect status change
if oldReady != newReady {
for i := 0; i < MaxUpdateRetry; i++ {
epsList, err := c.epsLister.List(sel)
epNamespaceLister := c.epsLister.Endpoints(newPod.ObjectMeta.Namespace)
epsList, err := epNamespaceLister.List(labels.Everything())
if err != nil {
glog.Errorf("updatePod: get eps %s", err)
return
Expand All @@ -552,7 +550,8 @@ func (c *Controller) updatePod(old, new interface{}) {
// label change
podName := newPod.Name
podNs := newPod.Namespace
desList, err := c.danmepLister.List(sel)
depNamespaceLister := c.danmepLister.DanmEps(newPod.ObjectMeta.Namespace)
desList, err := depNamespaceLister.List(labels.Everything())
if err != nil {
glog.Errorf("updatePod: get danmep %s", err)
return
Expand Down Expand Up @@ -592,20 +591,21 @@ func (c *Controller) addSvc(obj interface{}) {
}
if len(selectorMap) > 0 && len(svcNets) > 0 {
for i := 0; i < MaxUpdateRetry; i++ {
sel := labels.Everything()
d, err := c.danmepLister.List(sel)
depNamespaceLister := c.danmepLister.DanmEps(svcNs)
desList, err := depNamespaceLister.List(labels.Everything())
if err != nil {
glog.Errorf("addSvc: get danmep %s", err)
return
}
deList := SelectDesMatchLabels(d, selectorMap, svcNets, svcNs)
e, err := c.epsLister.List(sel)
matchingDesList := SelectDesMatchLabels(desList, selectorMap, svcNets, svcNs)
epNamespaceLister := c.epsLister.Endpoints(svcNs)
epsList, err := epNamespaceLister.List(labels.Everything())
if err != nil {
glog.Errorf("addSvc: get eps %s", err)
return
}
epFound := FindEpsForSvc(e, svcName, svcNs)
err = c.CreateModifyEndpoints(svc, epFound, deList)
epFound := FindEpsForSvc(epsList, svcName, svcNs)
err = c.CreateModifyEndpoints(svc, epFound, matchingDesList)
if err != nil {
if strings.Contains(err.Error(), datastructs.OptimisticLockErrorMsg) {
time.Sleep(RetryInterval * time.Millisecond)
Expand Down

0 comments on commit 899ae9f

Please sign in to comment.