Skip to content

Commit

Permalink
Reduce load on etcd/kube-apiserver on pod eviction (#949)
Browse files Browse the repository at this point in the history
* Reduce etcd traffic by using SharedInformer to list pods for drain logic

* Address review comments
  • Loading branch information
thiyyakat authored Oct 28, 2024
1 parent e91182a commit 31834e2
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/util/provider/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func StartControllers(s *options.MCServer,
targetCoreInformerFactory.Core().V1().PersistentVolumes(),
controlCoreInformerFactory.Core().V1().Secrets(),
targetCoreInformerFactory.Core().V1().Nodes(),
targetCoreInformerFactory.Core().V1().Pods(),
pdbInformer,
targetCoreInformerFactory.Storage().V1().VolumeAttachments(),
machineSharedInformers.MachineClasses(),
Expand Down
39 changes: 27 additions & 12 deletions pkg/util/provider/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"errors"
"fmt"
"io"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"regexp"
"sort"
"strings"
Expand All @@ -39,7 +41,6 @@ import (
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -72,8 +73,10 @@ type Options struct {
pvLister corelisters.PersistentVolumeLister
pdbLister policyv1listers.PodDisruptionBudgetLister
nodeLister corelisters.NodeLister
podLister corelisters.PodLister
volumeAttachmentHandler *VolumeAttachmentHandler
Timeout time.Duration
podSynced cache.InformerSynced
}

// Takes a pod and returns a bool indicating whether or not to operate on the
Expand Down Expand Up @@ -181,7 +184,9 @@ func NewDrainOptions(
pvLister corelisters.PersistentVolumeLister,
pdbLister policyv1listers.PodDisruptionBudgetLister,
nodeLister corelisters.NodeLister,
podLister corelisters.PodLister,
volumeAttachmentHandler *VolumeAttachmentHandler,
podSynced cache.InformerSynced,
) *Options {
return &Options{
client: client,
Expand All @@ -203,7 +208,9 @@ func NewDrainOptions(
pvLister: pvLister,
pdbLister: pdbLister,
nodeLister: nodeLister,
podLister: podLister,
volumeAttachmentHandler: volumeAttachmentHandler,
podSynced: podSynced,
}
}

Expand Down Expand Up @@ -232,20 +239,24 @@ func (o *Options) RunDrain(ctx context.Context) error {
klog.Errorf("Drain Error: Cordoning of node failed with error: %v", err)
return err
}
if !cache.WaitForCacheSync(drainContext.Done(), o.podSynced) {
err := fmt.Errorf("timed out waiting for pod cache to sync")
return err
}

err := o.deleteOrEvictPodsSimple(drainContext)
return err
}

func (o *Options) deleteOrEvictPodsSimple(ctx context.Context) error {
pods, err := o.getPodsForDeletion(ctx)
pods, err := o.getPodsForDeletion()
if err != nil {
return err
}

err = o.deleteOrEvictPods(ctx, pods)
if err != nil {
pendingPods, newErr := o.getPodsForDeletion(ctx)
pendingPods, newErr := o.getPodsForDeletion()
if newErr != nil {
return newErr
}
Expand Down Expand Up @@ -337,21 +348,25 @@ func (ps podStatuses) Message() string {

// getPodsForDeletion returns all the pods we're going to delete. If there are
// any pods preventing us from deleting, we return that list in an error.
func (o *Options) getPodsForDeletion(ctx context.Context) (pods []corev1.Pod, err error) {
podList, err := o.client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeName}).String()})
func (o *Options) getPodsForDeletion() (pods []corev1.Pod, err error) {
podList, err := o.podLister.List(labels.Everything())
if err != nil {
return pods, err
return
}
if len(podList) == 0 {
klog.Infof("no pods found in store")
return
}

ws := podStatuses{}
fs := podStatuses{}

for _, pod := range podList.Items {
for _, pod := range podList {
if pod.Spec.NodeName != o.nodeName {
continue
}
podOk := true
for _, filt := range []podFilter{mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter, o.daemonsetFilter} {
filterOk, w, f := filt(pod)

filterOk, w, f := filt(*pod)
podOk = podOk && filterOk
if w != nil {
ws[w.string] = append(ws[w.string], pod.Name)
Expand All @@ -361,7 +376,7 @@ func (o *Options) getPodsForDeletion(ctx context.Context) (pods []corev1.Pod, er
}
}
if podOk {
pods = append(pods, pod)
pods = append(pods, *pod)
}
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/util/provider/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ var _ = Describe("drain", func() {
targetCoreObjects = appendVolumeAttachments(targetCoreObjects, volumeAttachments)
}

fakeTargetCoreClient, fakePVLister, fakePVCLister, fakeNodeLister, pvcSynced, pvSynced, nodeSynced, tracker := createFakeController(
fakeTargetCoreClient, fakePVLister, fakePVCLister, fakeNodeLister, fakePodLister, pvcSynced, pvSynced, nodeSynced, podSynced, tracker := createFakeController(
stop, testNamespace, targetCoreObjects,
)
defer tracker.Stop()
Expand Down Expand Up @@ -159,8 +159,10 @@ var _ = Describe("drain", func() {
pvLister: fakePVLister,
pdbLister: nil,
nodeLister: fakeNodeLister,
podLister: fakePodLister,
Timeout: 2 * time.Minute,
volumeAttachmentHandler: volumeAttachmentHandler,
podSynced: podSynced,
}

// Get the pod directly from the ObjectTracker to avoid locking issues in the Fake object.
Expand Down
7 changes: 6 additions & 1 deletion pkg/util/provider/drain/fake_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func createFakeController(
corelisters.PersistentVolumeLister,
corelisters.PersistentVolumeClaimLister,
corelisters.NodeLister,
corelisters.PodLister,
func() bool,
func() bool,
func() bool,
func() bool,
Expand All @@ -45,14 +47,17 @@ func createFakeController(
pvcs := coreTargetSharedInformers.PersistentVolumeClaims()
pvs := coreTargetSharedInformers.PersistentVolumes()
nodes := coreTargetSharedInformers.Nodes()
pods := coreTargetSharedInformers.Pods()

pvcLister := pvcs.Lister()
pvLister := pvs.Lister()
nodeLister := nodes.Lister()
podLister := pods.Lister()

pvcSynced := pvcs.Informer().HasSynced
pvSynced := pvs.Informer().HasSynced
nodeSynced := nodes.Informer().HasSynced
podSynced := pods.Informer().HasSynced

return fakeTargetCoreClient, pvLister, pvcLister, nodeLister, pvcSynced, pvSynced, nodeSynced, targetCoreObjectTracker
return fakeTargetCoreClient, pvLister, pvcLister, nodeLister, podLister, pvcSynced, pvSynced, nodeSynced, podSynced, targetCoreObjectTracker
}
5 changes: 5 additions & 0 deletions pkg/util/provider/machinecontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func NewController(
pvInformer coreinformers.PersistentVolumeInformer,
secretInformer coreinformers.SecretInformer,
nodeInformer coreinformers.NodeInformer,
podInformer coreinformers.PodInformer,
pdbInformer policyv1informers.PodDisruptionBudgetInformer,
volumeAttachmentInformer storageinformers.VolumeAttachmentInformer,
machineClassInformer machineinformers.MachineClassInformer,
Expand Down Expand Up @@ -124,6 +125,7 @@ func NewController(
controller.machineClassLister = machineClassInformer.Lister()
controller.nodeLister = nodeInformer.Lister()
controller.machineLister = machineInformer.Lister()
controller.podLister = podInformer.Lister()

// Controller syncs
controller.pvcSynced = pvcInformer.Informer().HasSynced
Expand All @@ -133,6 +135,7 @@ func NewController(
controller.machineClassSynced = machineClassInformer.Informer().HasSynced
controller.nodeSynced = nodeInformer.Informer().HasSynced
controller.machineSynced = machineInformer.Informer().HasSynced
controller.podSynced = podInformer.Informer().HasSynced

controller.pdbLister = pdbInformer.Lister()
controller.pdbSynced = pdbInformer.Informer().HasSynced
Expand Down Expand Up @@ -244,6 +247,7 @@ type controller struct {
volumeAttachementLister storagelisters.VolumeAttachmentLister
machineClassLister machinelisters.MachineClassLister
machineLister machinelisters.MachineLister
podLister corelisters.PodLister
// queues
secretQueue workqueue.RateLimitingInterface
nodeQueue workqueue.RateLimitingInterface
Expand All @@ -260,6 +264,7 @@ type controller struct {
nodeSynced cache.InformerSynced
machineClassSynced cache.InformerSynced
machineSynced cache.InformerSynced
podSynced cache.InformerSynced
}

func (c *controller) Run(workers int, stopCh <-chan struct{}) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/provider/machinecontroller/controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ func createController(
nodes := coreTargetSharedInformers.Nodes()
pvcs := coreTargetSharedInformers.PersistentVolumeClaims()
pvs := coreTargetSharedInformers.PersistentVolumes()
pods := coreTargetSharedInformers.Pods()

coreControlInformerFactory := coreinformers.NewFilteredSharedInformerFactory(
fakeControlCoreClient,
Expand Down Expand Up @@ -593,9 +594,11 @@ func createController(
secretLister: secrets.Lister(),
pvLister: pvs.Lister(),
machineLister: machines.Lister(),
podLister: pods.Lister(),
machineSynced: machines.Informer().HasSynced,
nodeSynced: nodes.Informer().HasSynced,
secretSynced: secrets.Informer().HasSynced,
podSynced: pods.Informer().HasSynced,
machineClassQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machineclass"),
secretQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "secret"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node"),
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/provider/machinecontroller/machine_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,9 @@ func (c *controller) drainNode(ctx context.Context, deleteMachineRequest *driver
c.pvLister,
c.pdbLister,
c.nodeLister,
c.podLister,
c.volumeAttachmentHandler,
c.podSynced,
)
klog.V(3).Infof("(drainNode) Invoking RunDrain, forceDeleteMachine: %t, forceDeletePods: %t, timeOutDuration: %s", forceDeletePods, forceDeleteMachine, timeOutDuration)
err = drainOptions.RunDrain(ctx)
Expand Down

0 comments on commit 31834e2

Please sign in to comment.