diff --git a/pkg/util/provider/app/app.go b/pkg/util/provider/app/app.go index b9781634e..904a986c5 100644 --- a/pkg/util/provider/app/app.go +++ b/pkg/util/provider/app/app.go @@ -275,6 +275,7 @@ func StartControllers(s *options.MCServer, targetCoreInformerFactory.Core().V1().PersistentVolumes(), controlCoreInformerFactory.Core().V1().Secrets(), targetCoreInformerFactory.Core().V1().Nodes(), + targetCoreInformerFactory.Core().V1().Pods(), pdbInformer, targetCoreInformerFactory.Storage().V1().VolumeAttachments(), machineSharedInformers.MachineClasses(), diff --git a/pkg/util/provider/drain/drain.go b/pkg/util/provider/drain/drain.go index a711fdcff..dcf660851 100644 --- a/pkg/util/provider/drain/drain.go +++ b/pkg/util/provider/drain/drain.go @@ -27,6 +27,8 @@ import ( "errors" "fmt" "io" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" "regexp" "sort" "strings" @@ -39,7 +41,6 @@ import ( storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -72,8 +73,10 @@ type Options struct { pvLister corelisters.PersistentVolumeLister pdbLister policyv1listers.PodDisruptionBudgetLister nodeLister corelisters.NodeLister + podLister corelisters.PodLister volumeAttachmentHandler *VolumeAttachmentHandler Timeout time.Duration + podSynced cache.InformerSynced } // Takes a pod and returns a bool indicating whether or not to operate on the @@ -181,7 +184,9 @@ func NewDrainOptions( pvLister corelisters.PersistentVolumeLister, pdbLister policyv1listers.PodDisruptionBudgetLister, nodeLister corelisters.NodeLister, + podLister corelisters.PodLister, volumeAttachmentHandler *VolumeAttachmentHandler, + podSynced cache.InformerSynced, ) *Options { return &Options{ client: client, @@ -203,7 +208,9 @@ func NewDrainOptions( pvLister: pvLister, pdbLister: pdbLister, nodeLister: nodeLister, + podLister: podLister, volumeAttachmentHandler: volumeAttachmentHandler, + podSynced: podSynced, } } @@ -232,20 +239,24 @@ func (o *Options) RunDrain(ctx context.Context) error { klog.Errorf("Drain Error: Cordoning of node failed with error: %v", err) return err } + if !cache.WaitForCacheSync(drainContext.Done(), o.podSynced) { + err := fmt.Errorf("timed out waiting for pod cache to sync") + return err + } err := o.deleteOrEvictPodsSimple(drainContext) return err } func (o *Options) deleteOrEvictPodsSimple(ctx context.Context) error { - pods, err := o.getPodsForDeletion(ctx) + pods, err := o.getPodsForDeletion() if err != nil { return err } err = o.deleteOrEvictPods(ctx, pods) if err != nil { - pendingPods, newErr := o.getPodsForDeletion(ctx) + pendingPods, newErr := o.getPodsForDeletion() if newErr != nil { return newErr } @@ -337,21 +348,25 @@ func (ps podStatuses) Message() string { // getPodsForDeletion returns all the pods we're going to delete. If there are // any pods preventing us from deleting, we return that list in an error. -func (o *Options) getPodsForDeletion(ctx context.Context) (pods []corev1.Pod, err error) { - podList, err := o.client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{ - FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeName}).String()}) +func (o *Options) getPodsForDeletion() (pods []corev1.Pod, err error) { + podList, err := o.podLister.List(labels.Everything()) if err != nil { - return pods, err + return + } + if len(podList) == 0 { + klog.Infof("no pods found in store") + return } - ws := podStatuses{} fs := podStatuses{} - for _, pod := range podList.Items { + for _, pod := range podList { + if pod.Spec.NodeName != o.nodeName { + continue + } podOk := true for _, filt := range []podFilter{mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter, o.daemonsetFilter} { - filterOk, w, f := filt(pod) - + filterOk, w, f := filt(*pod) podOk = podOk && filterOk if w != nil { ws[w.string] = append(ws[w.string], pod.Name) @@ -361,7 +376,7 @@ func (o *Options) getPodsForDeletion(ctx context.Context) (pods []corev1.Pod, er } } if podOk { - pods = append(pods, pod) + pods = append(pods, *pod) } } diff --git a/pkg/util/provider/drain/drain_test.go b/pkg/util/provider/drain/drain_test.go index a66aba237..0161cc7fd 100644 --- a/pkg/util/provider/drain/drain_test.go +++ b/pkg/util/provider/drain/drain_test.go @@ -119,7 +119,7 @@ var _ = Describe("drain", func() { targetCoreObjects = appendVolumeAttachments(targetCoreObjects, volumeAttachments) } - fakeTargetCoreClient, fakePVLister, fakePVCLister, fakeNodeLister, pvcSynced, pvSynced, nodeSynced, tracker := createFakeController( + fakeTargetCoreClient, fakePVLister, fakePVCLister, fakeNodeLister, fakePodLister, pvcSynced, pvSynced, nodeSynced, podSynced, tracker := createFakeController( stop, testNamespace, targetCoreObjects, ) defer tracker.Stop() @@ -159,8 +159,10 @@ var _ = Describe("drain", func() { pvLister: fakePVLister, pdbLister: nil, nodeLister: fakeNodeLister, + podLister: fakePodLister, Timeout: 2 * time.Minute, volumeAttachmentHandler: volumeAttachmentHandler, + podSynced: podSynced, } // Get the pod directly from the ObjectTracker to avoid locking issues in the Fake object. diff --git a/pkg/util/provider/drain/fake_controller.go b/pkg/util/provider/drain/fake_controller.go index 550dc775a..44f82e13e 100644 --- a/pkg/util/provider/drain/fake_controller.go +++ b/pkg/util/provider/drain/fake_controller.go @@ -24,6 +24,8 @@ func createFakeController( corelisters.PersistentVolumeLister, corelisters.PersistentVolumeClaimLister, corelisters.NodeLister, + corelisters.PodLister, + func() bool, func() bool, func() bool, func() bool, @@ -45,14 +47,17 @@ func createFakeController( pvcs := coreTargetSharedInformers.PersistentVolumeClaims() pvs := coreTargetSharedInformers.PersistentVolumes() nodes := coreTargetSharedInformers.Nodes() + pods := coreTargetSharedInformers.Pods() pvcLister := pvcs.Lister() pvLister := pvs.Lister() nodeLister := nodes.Lister() + podLister := pods.Lister() pvcSynced := pvcs.Informer().HasSynced pvSynced := pvs.Informer().HasSynced nodeSynced := nodes.Informer().HasSynced + podSynced := pods.Informer().HasSynced - return fakeTargetCoreClient, pvLister, pvcLister, nodeLister, pvcSynced, pvSynced, nodeSynced, targetCoreObjectTracker + return fakeTargetCoreClient, pvLister, pvcLister, nodeLister, podLister, pvcSynced, pvSynced, nodeSynced, podSynced, targetCoreObjectTracker } diff --git a/pkg/util/provider/machinecontroller/controller.go b/pkg/util/provider/machinecontroller/controller.go index 97fa3e8db..7eb26d22f 100644 --- a/pkg/util/provider/machinecontroller/controller.go +++ b/pkg/util/provider/machinecontroller/controller.go @@ -65,6 +65,7 @@ func NewController( pvInformer coreinformers.PersistentVolumeInformer, secretInformer coreinformers.SecretInformer, nodeInformer coreinformers.NodeInformer, + podInformer coreinformers.PodInformer, pdbInformer policyv1informers.PodDisruptionBudgetInformer, volumeAttachmentInformer storageinformers.VolumeAttachmentInformer, machineClassInformer machineinformers.MachineClassInformer, @@ -124,6 +125,7 @@ func NewController( controller.machineClassLister = machineClassInformer.Lister() controller.nodeLister = nodeInformer.Lister() controller.machineLister = machineInformer.Lister() + controller.podLister = podInformer.Lister() // Controller syncs controller.pvcSynced = pvcInformer.Informer().HasSynced @@ -133,6 +135,7 @@ func NewController( controller.machineClassSynced = machineClassInformer.Informer().HasSynced controller.nodeSynced = nodeInformer.Informer().HasSynced controller.machineSynced = machineInformer.Informer().HasSynced + controller.podSynced = podInformer.Informer().HasSynced controller.pdbLister = pdbInformer.Lister() controller.pdbSynced = pdbInformer.Informer().HasSynced @@ -244,6 +247,7 @@ type controller struct { volumeAttachementLister storagelisters.VolumeAttachmentLister machineClassLister machinelisters.MachineClassLister machineLister machinelisters.MachineLister + podLister corelisters.PodLister // queues secretQueue workqueue.RateLimitingInterface nodeQueue workqueue.RateLimitingInterface @@ -260,6 +264,7 @@ type controller struct { nodeSynced cache.InformerSynced machineClassSynced cache.InformerSynced machineSynced cache.InformerSynced + podSynced cache.InformerSynced } func (c *controller) Run(workers int, stopCh <-chan struct{}) { diff --git a/pkg/util/provider/machinecontroller/controller_suite_test.go b/pkg/util/provider/machinecontroller/controller_suite_test.go index 0dd97f5fd..43cfb8e24 100644 --- a/pkg/util/provider/machinecontroller/controller_suite_test.go +++ b/pkg/util/provider/machinecontroller/controller_suite_test.go @@ -539,6 +539,7 @@ func createController( nodes := coreTargetSharedInformers.Nodes() pvcs := coreTargetSharedInformers.PersistentVolumeClaims() pvs := coreTargetSharedInformers.PersistentVolumes() + pods := coreTargetSharedInformers.Pods() coreControlInformerFactory := coreinformers.NewFilteredSharedInformerFactory( fakeControlCoreClient, @@ -593,9 +594,11 @@ func createController( secretLister: secrets.Lister(), pvLister: pvs.Lister(), machineLister: machines.Lister(), + podLister: pods.Lister(), machineSynced: machines.Informer().HasSynced, nodeSynced: nodes.Informer().HasSynced, secretSynced: secrets.Informer().HasSynced, + podSynced: pods.Informer().HasSynced, machineClassQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machineclass"), secretQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "secret"), nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node"), diff --git a/pkg/util/provider/machinecontroller/machine_util.go b/pkg/util/provider/machinecontroller/machine_util.go index b0af4dbdd..a0c10645c 100644 --- a/pkg/util/provider/machinecontroller/machine_util.go +++ b/pkg/util/provider/machinecontroller/machine_util.go @@ -1175,7 +1175,9 @@ func (c *controller) drainNode(ctx context.Context, deleteMachineRequest *driver c.pvLister, c.pdbLister, c.nodeLister, + c.podLister, c.volumeAttachmentHandler, + c.podSynced, ) klog.V(3).Infof("(drainNode) Invoking RunDrain, forceDeleteMachine: %t, forceDeletePods: %t, timeOutDuration: %s", forceDeletePods, forceDeleteMachine, timeOutDuration) err = drainOptions.RunDrain(ctx)