Skip to content

Commit

Permalink
Reduce etcd traffic by using SharedInformer to list pods for drain logic
Browse files Browse the repository at this point in the history
  • Loading branch information
thiyyakat committed Oct 21, 2024
1 parent 67e4579 commit c84c066
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pkg/util/provider/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ func StartControllers(s *options.MCServer,
targetCoreClientBuilder.ClientOrDie("target-core-shared-informers"),
s.MinResyncPeriod.Duration,
)

var (
pdbV1Informer policyv1informers.PodDisruptionBudgetInformer
pdbV1beta1Informer policyv1beta1informers.PodDisruptionBudgetInformer
Expand Down Expand Up @@ -292,6 +291,7 @@ func StartControllers(s *options.MCServer,
targetCoreInformerFactory.Storage().V1().VolumeAttachments(),
machineSharedInformers.MachineClasses(),
machineSharedInformers.Machines(),
targetCoreInformerFactory.Core().V1().Pods(),
recorder,
s.SafetyOptions,
s.NodeConditions,
Expand Down
58 changes: 37 additions & 21 deletions pkg/util/provider/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"errors"
"fmt"
"io"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"regexp"
"sort"
"strings"
Expand All @@ -39,7 +41,6 @@ import (
storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -75,8 +76,10 @@ type Options struct {
pdbV1beta1Lister policyv1beta1listers.PodDisruptionBudgetLister
pdbV1Lister policyv1listers.PodDisruptionBudgetLister
nodeLister corelisters.NodeLister
podLister corelisters.PodLister
volumeAttachmentHandler *VolumeAttachmentHandler
Timeout time.Duration
podSynced cache.InformerSynced
}

// Takes a pod and returns a bool indicating whether or not to operate on the
Expand Down Expand Up @@ -185,7 +188,9 @@ func NewDrainOptions(
pdbV1beta1Lister policyv1beta1listers.PodDisruptionBudgetLister,
pdbV1Lister policyv1listers.PodDisruptionBudgetLister,
nodeLister corelisters.NodeLister,
podLister corelisters.PodLister,
volumeAttachmentHandler *VolumeAttachmentHandler,
podSynced cache.InformerSynced,
) *Options {
return &Options{
client: client,
Expand All @@ -208,7 +213,9 @@ func NewDrainOptions(
pdbV1beta1Lister: pdbV1beta1Lister,
pdbV1Lister: pdbV1Lister,
nodeLister: nodeLister,
podLister: podLister,
volumeAttachmentHandler: volumeAttachmentHandler,
podSynced: podSynced,
}
}

Expand Down Expand Up @@ -237,20 +244,26 @@ func (o *Options) RunDrain(ctx context.Context) error {
klog.Errorf("Drain Error: Cordoning of node failed with error: %v", err)
return err
}
stopCh := make(chan struct{})
defer close(stopCh)
if !cache.WaitForCacheSync(stopCh, o.podSynced) {
err := fmt.Errorf("timed out waiting for pod cache to sync")
return err
}

err := o.deleteOrEvictPodsSimple(drainContext)
return err
}

func (o *Options) deleteOrEvictPodsSimple(ctx context.Context) error {
pods, err := o.getPodsForDeletion(ctx)
pods, err := o.getPodsForDeletion()
if err != nil {
return err
}

err = o.deleteOrEvictPods(ctx, pods)
if err != nil {
pendingPods, newErr := o.getPodsForDeletion(ctx)
pendingPods, newErr := o.getPodsForDeletion()
if newErr != nil {
return newErr
}
Expand Down Expand Up @@ -342,32 +355,35 @@ func (ps podStatuses) Message() string {

// getPodsForDeletion returns all the pods we're going to delete. If there are
// any pods preventing us from deleting, we return that list in an error.
func (o *Options) getPodsForDeletion(ctx context.Context) (pods []corev1.Pod, err error) {
podList, err := o.client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeName}).String()})
func (o *Options) getPodsForDeletion() (pods []corev1.Pod, err error) {
podList, err := o.podLister.List(labels.Everything())
if err != nil {
return pods, err
return nil, err
}
if len(podList) == 0 {
klog.Infof("no pods found in store")
return
}

ws := podStatuses{}
fs := podStatuses{}

for _, pod := range podList.Items {
podOk := true
for _, filt := range []podFilter{mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter, o.daemonsetFilter} {
filterOk, w, f := filt(pod)

podOk = podOk && filterOk
if w != nil {
ws[w.string] = append(ws[w.string], pod.Name)
for _, pod := range podList {
if pod.Spec.NodeName == o.nodeName {
podOk := true
for _, filt := range []podFilter{mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter, o.daemonsetFilter} {
filterOk, w, f := filt(*pod)
podOk = podOk && filterOk
if w != nil {
ws[w.string] = append(ws[w.string], pod.Name)
}
if f != nil {
fs[f.string] = append(fs[f.string], pod.Name)
}
}
if f != nil {
fs[f.string] = append(fs[f.string], pod.Name)
if podOk {
pods = append(pods, *pod)
}
}
if podOk {
pods = append(pods, pod)
}
}

if len(fs) > 0 {
Expand Down
4 changes: 3 additions & 1 deletion pkg/util/provider/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ var _ = Describe("drain", func() {
targetCoreObjects = appendVolumeAttachments(targetCoreObjects, volumeAttachments)
}

fakeTargetCoreClient, fakePVLister, fakePVCLister, fakeNodeLister, pvcSynced, pvSynced, nodeSynced, tracker := createFakeController(
fakeTargetCoreClient, fakePVLister, fakePVCLister, fakeNodeLister, fakePodLister, pvcSynced, pvSynced, nodeSynced, podSynced, tracker := createFakeController(
stop, testNamespace, targetCoreObjects,
)
defer tracker.Stop()
Expand Down Expand Up @@ -160,8 +160,10 @@ var _ = Describe("drain", func() {
pdbV1beta1Lister: nil,
pdbV1Lister: nil,
nodeLister: fakeNodeLister,
podLister: fakePodLister,
Timeout: 2 * time.Minute,
volumeAttachmentHandler: volumeAttachmentHandler,
podSynced: podSynced,
}

// Get the pod directly from the ObjectTracker to avoid locking issues in the Fake object.
Expand Down
7 changes: 6 additions & 1 deletion pkg/util/provider/drain/fake_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func createFakeController(
corelisters.PersistentVolumeLister,
corelisters.PersistentVolumeClaimLister,
corelisters.NodeLister,
corelisters.PodLister,
func() bool,
func() bool,
func() bool,
func() bool,
Expand All @@ -45,14 +47,17 @@ func createFakeController(
pvcs := coreTargetSharedInformers.PersistentVolumeClaims()
pvs := coreTargetSharedInformers.PersistentVolumes()
nodes := coreTargetSharedInformers.Nodes()
pods := coreTargetSharedInformers.Pods()

pvcLister := pvcs.Lister()
pvLister := pvs.Lister()
nodeLister := nodes.Lister()
podLister := pods.Lister()

pvcSynced := pvcs.Informer().HasSynced
pvSynced := pvs.Informer().HasSynced
nodeSynced := nodes.Informer().HasSynced
podSynced := pods.Informer().HasSynced

return fakeTargetCoreClient, pvLister, pvcLister, nodeLister, pvcSynced, pvSynced, nodeSynced, targetCoreObjectTracker
return fakeTargetCoreClient, pvLister, pvcLister, nodeLister, podLister, pvcSynced, pvSynced, nodeSynced, podSynced, targetCoreObjectTracker
}
5 changes: 5 additions & 0 deletions pkg/util/provider/machinecontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func NewController(
volumeAttachmentInformer storageinformers.VolumeAttachmentInformer,
machineClassInformer machineinformers.MachineClassInformer,
machineInformer machineinformers.MachineInformer,
podInformer coreinformers.PodInformer,
recorder record.EventRecorder,
safetyOptions options.SafetyOptions,
nodeConditions string,
Expand Down Expand Up @@ -127,6 +128,7 @@ func NewController(
controller.machineClassLister = machineClassInformer.Lister()
controller.nodeLister = nodeInformer.Lister()
controller.machineLister = machineInformer.Lister()
controller.podLister = podInformer.Lister()

// Controller syncs
controller.pvcSynced = pvcInformer.Informer().HasSynced
Expand All @@ -136,6 +138,7 @@ func NewController(
controller.machineClassSynced = machineClassInformer.Informer().HasSynced
controller.nodeSynced = nodeInformer.Informer().HasSynced
controller.machineSynced = machineInformer.Informer().HasSynced
controller.podSynced = podInformer.Informer().HasSynced

if k8sutils.ConstraintK8sGreaterEqual121.Check(targetKubernetesVersion) {
controller.pdbV1Lister = pdbV1Informer.Lister()
Expand Down Expand Up @@ -253,6 +256,7 @@ type controller struct {
volumeAttachementLister storagelisters.VolumeAttachmentLister
machineClassLister machinelisters.MachineClassLister
machineLister machinelisters.MachineLister
podLister corelisters.PodLister
// queues
secretQueue workqueue.RateLimitingInterface
nodeQueue workqueue.RateLimitingInterface
Expand All @@ -270,6 +274,7 @@ type controller struct {
nodeSynced cache.InformerSynced
machineClassSynced cache.InformerSynced
machineSynced cache.InformerSynced
podSynced cache.InformerSynced
}

func (c *controller) Run(workers int, stopCh <-chan struct{}) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/provider/machinecontroller/controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ func createController(
nodes := coreTargetSharedInformers.Nodes()
pvcs := coreTargetSharedInformers.PersistentVolumeClaims()
pvs := coreTargetSharedInformers.PersistentVolumes()
pods := coreTargetSharedInformers.Pods()

coreControlInformerFactory := coreinformers.NewFilteredSharedInformerFactory(
fakeControlCoreClient,
Expand Down Expand Up @@ -593,9 +594,11 @@ func createController(
secretLister: secrets.Lister(),
pvLister: pvs.Lister(),
machineLister: machines.Lister(),
podLister: pods.Lister(),
machineSynced: machines.Informer().HasSynced,
nodeSynced: nodes.Informer().HasSynced,
secretSynced: secrets.Informer().HasSynced,
podSynced: pods.Informer().HasSynced,
machineClassQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machineclass"),
secretQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "secret"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node"),
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/provider/machinecontroller/machine_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,9 @@ func (c *controller) drainNode(ctx context.Context, deleteMachineRequest *driver
c.pdbV1beta1Lister,
c.pdbV1Lister,
c.nodeLister,
c.podLister,
c.volumeAttachmentHandler,
c.podSynced,
)
klog.V(3).Infof("(drainNode) Invoking RunDrain, forceDeleteMachine: %t, forceDeletePods: %t, timeOutDuration: %s", forceDeletePods, forceDeleteMachine, timeOutDuration)
err = drainOptions.RunDrain(ctx)
Expand Down

0 comments on commit c84c066

Please sign in to comment.