diff --git a/operator/pkg/psp/emm.go b/operator/pkg/psp/emm.go index 704edf31f..cfc79b3b7 100644 --- a/operator/pkg/psp/emm.go +++ b/operator/pkg/psp/emm.go @@ -3,7 +3,7 @@ // PSP EMM operations are triggered by a user through a UI and appear to the // operator as taints on k8s nodes. To allow the EMM operation to proceed, all -// pods must be removed from the tainted node. EMM can be cancelled by adding +// pods must be removed from the tainted node. EMM can be cancelled by adding // a failure annotation to pods on the node. // // The way we have implemented EMM here is to only allow an EMM operation to @@ -62,7 +62,7 @@ const ( // This interface is what, in practice, the ReconciliationContext will need to // implement. Its intention is to keep coupling with the reconciliation package // low and also to keep this package relatively sheltered from certain thorny -// details, like determining which pods represent cassandra nodes that have +// details, like determining which pods represent cassandra nodes that have // bootstrapped at some point. type EMMSPI interface { GetAllNodesInDC() ([]*corev1.Node, error) @@ -77,6 +77,7 @@ type EMMSPI interface { IsStopped() bool IsInitialized() bool GetLogger() logr.Logger + GetAllNodes() ([]*corev1.Node, error) } type EMMChecks interface { @@ -87,6 +88,8 @@ type EMMChecks interface { getInProgressNodeReplacements() []string IsStopped() bool IsInitialized() bool + getNodeNameSet() (utils.StringSet, error) + getPodNameSet() utils.StringSet } type EMMOperations interface { @@ -99,6 +102,7 @@ type EMMOperations interface { removeNextPodFromEvacuateDataNode() (bool, error) removeAllPodsFromOnePlannedDowntimeNode() (bool, error) startNodeReplace(podName string) error + emmFailureStillProcessing() (bool, error) } type EMMService interface { @@ -172,7 +176,7 @@ func (impl *EMMServiceImpl) cleanupEMMAnnotations() (bool, error) { nodesNoLongerEMM := utils.SubtractStringSet( nodesWithPodsFailedEMM, nodes) - + podsNoLongerFailed := utils.FilterPodsWithNodeInNameSet(podsFailedEmm, nodesNoLongerEMM) didUpdate := false for _, pod := range podsNoLongerFailed { @@ -186,6 +190,24 @@ func (impl *EMMServiceImpl) cleanupEMMAnnotations() (bool, error) { return didUpdate, nil } +func (impl *EMMServiceImpl) emmFailureStillProcessing() (bool, error) { + nodes, err := impl.getEvacuateAllDataNodeNameSet() + if err != nil { + return false, err + } + nodes2, err := impl.getPlannedDownTimeNodeNameSet() + if err != nil { + return false, err + } + nodes = utils.UnionStringSet(nodes, nodes2) + + // Strip EMM failure annotation from pods where node is no longer tainted + podsFailedEmm := impl.getPodsWithAnnotationKey(EMMFailureAnnotation) + nodesWithPodsFailedEMM := utils.GetPodNodeNameSet(podsFailedEmm) + + return len(utils.IntersectionStringSet(nodes, nodesWithPodsFailedEMM)) > 0, nil +} + func (impl *EMMServiceImpl) getPlannedDownTimeNodeNameSet() (utils.StringSet, error) { nodes, err := impl.getNodesWithTaintKeyValueEffect(EMMTaintKey, string(PlannedDowntime), corev1.TaintEffectNoSchedule) if err != nil { @@ -216,7 +238,7 @@ func (impl *EMMServiceImpl) removeAllNotReadyPodsOnEMMNodes() (bool, error) { taintedNodesNameSet := utils.UnionStringSet(plannedDownNameSet, evacuateDataNameSet) podsNotReadyOnTaintedNodes := utils.FilterPodsWithNodeInNameSet(podsNotReady, taintedNodesNameSet) - + if len(podsNotReadyOnTaintedNodes) > 0 { for _, pod := range podsNotReadyOnTaintedNodes { err := impl.RemovePod(pod) @@ -234,14 +256,28 @@ func (impl *EMMServiceImpl) failEMM(nodeName string, failure EMMFailure) (bool, pods := impl.getPodsForNodeName(nodeName) didUpdate := false for _, pod := range pods { - err := impl.addPodAnnotation(pod, EMMFailureAnnotation, string(failure)) + added, err := impl.addPodAnnotation(pod, EMMFailureAnnotation, string(failure)) if err != nil { return false, err } + didUpdate = didUpdate || added } return didUpdate, nil } +func (impl *EMMServiceImpl) getNodeNameSet() (utils.StringSet, error) { + nodes, err := impl.EMMSPI.GetAllNodes() + if err != nil { + return nil, err + } + + return utils.GetNodeNameSet(nodes), nil +} + +func (impl *EMMServiceImpl) getPodNameSet() utils.StringSet { + return utils.GetPodNameSet(impl.EMMSPI.GetDCPods()) +} + func (impl *EMMServiceImpl) performEvacuateDataPodReplace() (bool, error) { downPods := impl.GetNotReadyPodsBootstrappedInDC() if len(downPods) > 0 { @@ -250,7 +286,7 @@ func (impl *EMMServiceImpl) performEvacuateDataPodReplace() (bool, error) { return false, err } - // Check if any of these pods are stuck due to PVC associated to a + // Check if any of these pods are stuck due to PVC associated to a // tainted node for evacuate all data. This would happen, for example, // in the case of local persistent volumes where the volume cannot // move with the pod. @@ -265,11 +301,11 @@ func (impl *EMMServiceImpl) performEvacuateDataPodReplace() (bool, error) { // NOTE: There isn't a great machine readable way to know why // the pod is unschedulable. The reasons are, unfortunately, // buried within human readable explanation text. As a result, - // a pod might not get scheduled due to no nodes having + // a pod might not get scheduled due to no nodes having // sufficent memory, and then we delete a PVC thinking that - // the PVC was causing scheduling to fail even though it + // the PVC was causing scheduling to fail even though it // wasn't. - + pvcNode, err := impl.getPodPVCSelectedNodeName(pod.Name) if err != nil { return false, err @@ -337,7 +373,6 @@ func (impl *EMMServiceImpl) getLogger() logr.Logger { return impl.GetLogger() } - // // EMMChecks impl // @@ -349,7 +384,7 @@ func (impl *EMMServiceImpl) getPodNameSetWithVolumeHealthInaccessiblePVC(rackNam if err != nil { return nil, err } - + pvcs = utils.FilterPVCsWithFn(pvcs, func(pvc *corev1.PersistentVolumeClaim) bool { return pvc.Annotations != nil && pvc.Annotations[VolumeHealthAnnotation] == string(VolumeHealthInaccessible) }) @@ -386,7 +421,6 @@ func (impl *EMMServiceImpl) getInProgressNodeReplacements() []string { return impl.GetInProgressNodeReplacements() } - // // Helper methods // @@ -407,13 +441,16 @@ func (impl *EMMServiceImpl) getPodsWithAnnotationKey(key string) []*corev1.Pod { return utils.FilterPodsWithAnnotationKey(pods, key) } -func (impl *EMMServiceImpl) addPodAnnotation(pod *corev1.Pod, key, value string) error { +func (impl *EMMServiceImpl) addPodAnnotation(pod *corev1.Pod, key, value string) (bool, error) { if pod.ObjectMeta.Annotations == nil { pod.ObjectMeta.Annotations = map[string]string{} + } else if currentValue, found := pod.Annotations[key]; found { + //check if the value matches, if so then the work is already done + return currentValue == value, nil } pod.Annotations[key] = value - return impl.UpdatePod(pod) + return true, impl.UpdatePod(pod) } func (impl *EMMServiceImpl) removePodAnnotation(pod *corev1.Pod, key string) error { @@ -424,7 +461,7 @@ func (impl *EMMServiceImpl) removePodAnnotation(pod *corev1.Pod, key string) err return nil } -// Check nodes for vmware PSP draining taints. This function embodies the +// Check nodes for vmware PSP draining taints. This function embodies the // business logic around when EMM operations are executed. func checkNodeEMM(provider EMMService) result.ReconcileResult { logger := provider.getLogger() @@ -440,6 +477,19 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult { return result.RequeueSoon(2) } + // Check if there are still pods annotated with EMM failure. + // If there are then that means vmware has not removed the node + // taint in response to a failure. We can requeue or stop + // reconciliation in this situation. + stillProcessing, err := provider.emmFailureStillProcessing() + if err != nil { + logger.Error(err, "Failed to check if EMM failures are still processing") + return result.Error(err) + } + if stillProcessing { + return result.RequeueSoon(2) + } + // Do not perform EMM operations while the datacenter is initializing if !provider.IsInitialized() { logger.Info("Skipping EMM check as the cluster is not yet initialized") @@ -459,9 +509,39 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult { return result.Error(err) } + allNodes, err := provider.getNodeNameSet() + if err != nil { + logger.Error(err, "Failed to get node name set") + return result.Error(err) + } + + // Fail EMM operations if insufficient nodes for pods + // + // VMWare requires that we perform some kind of check to ensure that there is at least hope that any + // impacted pods from an EMM operation will get rescheduled successfully to some other node. Here + // we do a basic check to ensure that there are at least as many nodes available as we have cassandra + // pods + unavailableNodes := utils.UnionStringSet(plannedDownNodeNameSet, evacuateDataNodeNameSet) + availableNodes := utils.SubtractStringSet(allNodes, unavailableNodes) + + if len(provider.getPodNameSet()) > len(availableNodes) { + anyUpdated := false + updated := false + for node := range unavailableNodes { + if updated, err = provider.failEMM(node, NotEnoughResources); err != nil { + logger.Error(err, "Failed to add "+EMMFailureAnnotation, "Node", node) + return result.Error(err) + } + anyUpdated = anyUpdated || updated + } + if anyUpdated { + return result.RequeueSoon(10) + } + } + // Fail any evacuate data EMM operations if the datacenter is stopped // - // Cassandra must be up and running to rebuild cassandra nodes. Since + // Cassandra must be up and running to rebuild cassandra nodes. Since // evacuating may entail deleting PVCs, we need to fail these operations // as we are unlikely to be able to carry them out successfully. if provider.IsStopped() { @@ -482,10 +562,10 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult { // NOTE: There might be pods that aren't ready for a variety of reasons, // however, with respect to data availability, we really only care if a - // pod representing a cassandra node that is _currently_ bootstrapped to + // pod representing a cassandra node that is _currently_ bootstrapped to // the is down. We do not care if a pod is down for a cassandra node - // that was never bootstrapped (for example, maybe we are in the middle of - // scaling up), as such pods are not part of the cluster presently and + // that was never bootstrapped (for example, maybe we are in the middle of + // scaling up), as such pods are not part of the cluster presently and // their being down has no impact on data availability. Also, simply // looking at the pod state label is insufficient here. The pod might be // brand new, but represents a cassandra node that is already part of the @@ -525,7 +605,7 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult { nodeNameSetForNoPodsInRack := utils.SubtractStringSet( utils.UnionStringSet( - plannedDownNodeNameSet, + plannedDownNodeNameSet, evacuateDataNodeNameSet), nodeNameSetForDownRack) @@ -540,7 +620,7 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult { didUpdate = didUpdate || podsUpdated } if didUpdate { - return result.RequeueSoon(2) + return result.RequeueSoon(2) } } } @@ -552,14 +632,14 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult { // For example, if we have a pod on the tainted node that is stuck in // "Starting", no other pods will have Cassandra started until that is // resolved. Admittedly, CheckPodsReady() will permit some pods to be - // not ready before starting, but we don't want these two functions + // not ready before starting, but we don't want these two functions // to become deeply coupled as it makes testing nightmarishly difficult, // so we just delete all the not ready pods. // // Note that, due to earlier checks, we know the only not-ready pods // are those that have not bootstrapped (so it doesn't matter if we delete - // them) or pods that all belong to the same rack. Consequently, we can - // delete all such pods on the tainted node without impacting + // them) or pods that all belong to the same rack. Consequently, we can + // delete all such pods on the tainted node without impacting // availability. didUpdate, err = provider.removeAllNotReadyPodsOnEMMNodes() if err != nil { @@ -575,7 +655,7 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult { // Wait for pods not on tainted nodes to become ready // - // If we have pods down (for cassandra nodes that have potentially + // If we have pods down (for cassandra nodes that have potentially // bootstrapped) that are not on the tainted nodes, these are likely pods // we previously deleted due to the taints. We try to move these pods // one-at-a-time to spare ourselves unnecessary rebuilds if @@ -589,35 +669,35 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult { return result.RequeueSoon(2) } - // Pods are not ready (because downRack isn't the empty string) and + // Pods are not ready (because downRack isn't the empty string) and // there aren't any pods stuck in an unscheduable state with PVCs on - // on nodes marked for evacuate all data, so continue to allow + // on nodes marked for evacuate all data, so continue to allow // cassandra a chance to start on the not ready pods. These not ready - // pods are likely ones we deleted previously when moving them off of + // pods are likely ones we deleted previously when moving them off of // the tainted node. // - // TODO: Some of these pods might be from a planned-downtime EMM + // TODO: Some of these pods might be from a planned-downtime EMM // operation and so will not become ready until their node comes back // online. With the way this logic works, if two nodes are marked for // planned-downtime, only one node will have its pods deleted, and the - // other will effectively be ignored until the other node is back + // other will effectively be ignored until the other node is back // online, even if both nodes belong to the same rack. Revisit whether // this behaviour is desirable. return result.Continue() } - // At this point, we know there are no not-ready pods on the tainted + // At this point, we know there are no not-ready pods on the tainted // nodes and we know there are no down pods that are bootstrapped, so we // can delete any pod we like without impacting availability. // Delete a pod for an evacuate data tainted node // // We give preference to nodes tainted to evacuate all data mainly to - // ensure some level of determinism. We could give preference to + // ensure some level of determinism. We could give preference to // planned downtime taints. In an ideal world, we'd address tainted // nodes in chronilogical order of when they received a taint, but the // operator doesn't track that information (and I'm not inclined to do - // more book keeping) and the node doesn't have this information as + // more book keeping) and the node doesn't have this information as // far as I'm aware. didUpdate, err = provider.removeNextPodFromEvacuateDataNode() if err != nil { @@ -644,7 +724,7 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult { return result.Continue() } -// Check for PVCs with health inaccessible. This function embodies the +// Check for PVCs with health inaccessible. This function embodies the // business logic around when inaccessible PVCs are replaced. func checkPVCHealth(provider EMMService) result.ReconcileResult { logger := provider.getLogger() @@ -658,7 +738,7 @@ func checkPVCHealth(provider EMMService) result.ReconcileResult { // nothing to do return result.Continue() } - + racksWithDownPods := provider.getRacksWithNotReadyPodsBootstrapped() if len(racksWithDownPods) > 1 { @@ -708,4 +788,4 @@ func CheckEMM(spi EMMSPI) result.ReconcileResult { logger := service.getLogger() logger.Info("psp::CheckEMM") return checkNodeEMM(service) -} \ No newline at end of file +} diff --git a/operator/pkg/psp/emm_test.go b/operator/pkg/psp/emm_test.go index 860a1ea81..95c1534d8 100644 --- a/operator/pkg/psp/emm_test.go +++ b/operator/pkg/psp/emm_test.go @@ -13,7 +13,6 @@ import ( "github.com/go-logr/logr" logrtesting "github.com/go-logr/logr/testing" - "github.com/datastax/cass-operator/operator/internal/result" "github.com/datastax/cass-operator/operator/pkg/utils" @@ -38,6 +37,21 @@ type MockEMMService struct { mock.Mock } +func (m *MockEMMService) getPodNameSet() utils.StringSet { + args := m.Called() + return args.Get(0).(utils.StringSet) +} + +func (m *MockEMMService) getNodeNameSet() (utils.StringSet, error) { + args := m.Called() + return args.Get(0).(utils.StringSet), args.Error(1) +} + +func (m *MockEMMService) emmFailureStillProcessing() (bool, error) { + args := m.Called() + return args.Get(0).(bool), args.Error(1) +} + func (m *MockEMMService) getRacksWithNotReadyPodsBootstrapped() []string { args := m.Called() return args.Get(0).([]string) @@ -132,43 +146,64 @@ func Test_checkNodeEMM(t *testing.T) { testObj.AssertExpectations(t) IsRequeue(t, r, "") - + // When emm failure is still processing then requeue + testObj = &MockEMMService{} + testObj.On("cleanupEMMAnnotations").Return(false, nil) + testObj.On("emmFailureStillProcessing").Return(true, nil) + + r = checkNodeEMM(testObj) + testObj.AssertExpectations(t) + IsRequeue(t, r, "") + // When datacenter is not initialized, ignore EMM taints testObj = &MockEMMService{} testObj.On("cleanupEMMAnnotations").Return(false, nil) + testObj.On("emmFailureStillProcessing").Return(false, nil) testObj.On("IsInitialized").Return(false) r = checkNodeEMM(testObj) testObj.AssertExpectations(t) IsContinue(t, r, "") - - // When datacenter is not stopped, fail all EMM operations to evacuate + // When datacenter is not stopped, fail all EMM operations to evacuate // data as cassandra is not running to rebuild the impacted cassandra // nodes testObj = &MockEMMService{} testObj.On("cleanupEMMAnnotations").Return(false, nil) + testObj.On("emmFailureStillProcessing").Return(false, nil) testObj.On("IsInitialized").Return(true) testObj.On("IsStopped").Return(true) testObj.On("getPlannedDownTimeNodeNameSet").Return(utils.StringSet{"node1": true, "node2": true}, nil) testObj.On("getEvacuateAllDataNodeNameSet").Return(utils.StringSet{"node3": true}, nil) + testObj.On("getNodeNameSet").Return(utils.StringSet{ + "node1": true, "node2": true, + "node3": true, "node4": true, + "node5": true, "node6": true, + }, nil) + testObj.On("getPodNameSet").Return(utils.StringSet{"pod1": true, "pod2": true, "pod3": true}) testObj.On("failEMM", "node3", GenericFailure).Return(true, nil) r = checkNodeEMM(testObj) testObj.AssertExpectations(t) - IsRequeue(t, r, "") - + IsRequeue(t, r, "") - // When there are no extraneous EMM failure annotations, and there are + // When there are no extraneous EMM failure annotations, and there are // pods not ready on multiple racks, fail EMM for all nodes tainted for // EMM testObj = &MockEMMService{} testObj.On("cleanupEMMAnnotations").Return(false, nil) + testObj.On("emmFailureStillProcessing").Return(false, nil) testObj.On("IsInitialized").Return(true) testObj.On("IsStopped").Return(false) testObj.On("getRacksWithNotReadyPodsBootstrapped").Return([]string{"rack1", "rack2"}) testObj.On("getPlannedDownTimeNodeNameSet").Return(utils.StringSet{"node1": true, "node2": true}, nil) testObj.On("getEvacuateAllDataNodeNameSet").Return(utils.StringSet{"node3": true}, nil) + testObj.On("getNodeNameSet").Return(utils.StringSet{ + "node1": true, "node2": true, + "node3": true, "node4": true, + "node5": true, "node6": true, + }, nil) + testObj.On("getPodNameSet").Return(utils.StringSet{"pod1": true, "pod2": true, "pod3": true}) testObj.On("failEMM", "node1", TooManyExistingFailures).Return(true, nil) testObj.On("failEMM", "node2", TooManyExistingFailures).Return(true, nil) testObj.On("failEMM", "node3", TooManyExistingFailures).Return(true, nil) @@ -177,18 +212,24 @@ func Test_checkNodeEMM(t *testing.T) { testObj.AssertExpectations(t) IsRequeue(t, r, "") - - // When there are no extraneous EMM failure annotations, and there are - // pods not ready on one rack, fail EMM for all nodes not part of that + // When there are no extraneous EMM failure annotations, and there are + // pods not ready on one rack, fail EMM for all nodes not part of that // rack testObj = &MockEMMService{} testObj.On("cleanupEMMAnnotations").Return(false, nil) + testObj.On("emmFailureStillProcessing").Return(false, nil) testObj.On("IsInitialized").Return(true) testObj.On("IsStopped").Return(false) testObj.On("getRacksWithNotReadyPodsBootstrapped").Return([]string{"rack1"}) testObj.On("getPlannedDownTimeNodeNameSet").Return(utils.StringSet{"node1": true, "node2": true}, nil) testObj.On("getEvacuateAllDataNodeNameSet").Return(utils.StringSet{"node3": true}, nil) testObj.On("getRackNodeNameSet", "rack1").Return(utils.StringSet{"node2": true}) + testObj.On("getNodeNameSet").Return(utils.StringSet{ + "node1": true, "node2": true, + "node3": true, "node4": true, + "node5": true, "node6": true, + }, nil) + testObj.On("getPodNameSet").Return(utils.StringSet{"pod1": true, "pod2": true, "pod3": true}) testObj.On("failEMM", "node1", TooManyExistingFailures).Return(true, nil) testObj.On("failEMM", "node3", TooManyExistingFailures).Return(true, nil) @@ -196,18 +237,24 @@ func Test_checkNodeEMM(t *testing.T) { testObj.AssertExpectations(t) IsRequeue(t, r, "") - // When there are no extraneous EMM failure annotations, there are not // multiple racks with pods not ready, and all nodes marked for EMM have pods // for the rack with not ready pods, delete any not ready pods on the nodes // marked for EMM. testObj = &MockEMMService{} testObj.On("cleanupEMMAnnotations").Return(false, nil) + testObj.On("emmFailureStillProcessing").Return(false, nil) testObj.On("IsInitialized").Return(true) testObj.On("IsStopped").Return(false) testObj.On("getRacksWithNotReadyPodsBootstrapped").Return([]string{"rack1"}) testObj.On("getPlannedDownTimeNodeNameSet").Return(utils.StringSet{"node1": true, "node2": true}, nil) testObj.On("getEvacuateAllDataNodeNameSet").Return(utils.StringSet{}, nil) + testObj.On("getNodeNameSet").Return(utils.StringSet{ + "node1": true, "node2": true, + "node3": true, "node4": true, + "node5": true, "node6": true, + }, nil) + testObj.On("getPodNameSet").Return(utils.StringSet{"pod1": true, "pod2": true, "pod3": true}) testObj.On("getRackNodeNameSet", "rack1").Return(utils.StringSet{"node1": true, "node2": true}) testObj.On("removeAllNotReadyPodsOnEMMNodes").Return(true, nil) @@ -216,17 +263,24 @@ func Test_checkNodeEMM(t *testing.T) { IsRequeue(t, r, "") // When there are no extraneous EMM failure annotations, therea are not - // multiple racks with pods not ready, and all nodes marked for EMM have + // multiple racks with pods not ready, and all nodes marked for EMM have // pods for the rack with not ready pods, there are no not ready pods on // the nodes marked for EMM, perform a replace on any pod with a PVC // associated with a node marked for EMM evacuate data. testObj = &MockEMMService{} testObj.On("cleanupEMMAnnotations").Return(false, nil) + testObj.On("emmFailureStillProcessing").Return(false, nil) testObj.On("IsInitialized").Return(true) testObj.On("IsStopped").Return(false) testObj.On("getRacksWithNotReadyPodsBootstrapped").Return([]string{"rack1"}) testObj.On("getPlannedDownTimeNodeNameSet").Return(utils.StringSet{}, nil) testObj.On("getEvacuateAllDataNodeNameSet").Return(utils.StringSet{"node2": true}, nil) + testObj.On("getNodeNameSet").Return(utils.StringSet{ + "node1": true, "node2": true, + "node3": true, "node4": true, + "node5": true, "node6": true, + }, nil) + testObj.On("getPodNameSet").Return(utils.StringSet{"pod1": true, "pod2": true, "pod3": true}) testObj.On("getRackNodeNameSet", "rack1").Return(utils.StringSet{"node1": true, "node2": true}) testObj.On("removeAllNotReadyPodsOnEMMNodes").Return(false, nil) testObj.On("performEvacuateDataPodReplace").Return(true, nil) @@ -235,20 +289,26 @@ func Test_checkNodeEMM(t *testing.T) { testObj.AssertExpectations(t) IsRequeue(t, r, "") - // When there are no extraneous EMM failure annotations, therea are not - // multiple racks with pods not ready, and all nodes marked for EMM have + // multiple racks with pods not ready, and all nodes marked for EMM have // pods for the rack with not ready pods, there are no not ready pods on // the nodes marked for EMM, and there are pods not ready not on the EMM // nodes that are scheduable, return and continue to allow the pods to // start and--hopefully--become ready. testObj = &MockEMMService{} testObj.On("cleanupEMMAnnotations").Return(false, nil) + testObj.On("emmFailureStillProcessing").Return(false, nil) testObj.On("IsInitialized").Return(true) testObj.On("IsStopped").Return(false) testObj.On("getRacksWithNotReadyPodsBootstrapped").Return([]string{"rack1"}) testObj.On("getPlannedDownTimeNodeNameSet").Return(utils.StringSet{}, nil) testObj.On("getEvacuateAllDataNodeNameSet").Return(utils.StringSet{"node2": true}, nil) + testObj.On("getNodeNameSet").Return(utils.StringSet{ + "node1": true, "node2": true, + "node3": true, "node4": true, + "node5": true, "node6": true, + }, nil) + testObj.On("getPodNameSet").Return(utils.StringSet{"pod1": true, "pod2": true, "pod3": true}) testObj.On("getRackNodeNameSet", "rack1").Return(utils.StringSet{"node1": true, "node2": true}) testObj.On("removeAllNotReadyPodsOnEMMNodes").Return(false, nil) testObj.On("performEvacuateDataPodReplace").Return(false, nil) @@ -257,16 +317,22 @@ func Test_checkNodeEMM(t *testing.T) { testObj.AssertExpectations(t) IsContinue(t, r, "should continue reconcile to allow cassandra to be started on not ready pods") - // When there are no pods not ready, remove a pod from an EMM node // marked for evacuate all data. testObj = &MockEMMService{} testObj.On("cleanupEMMAnnotations").Return(false, nil) + testObj.On("emmFailureStillProcessing").Return(false, nil) testObj.On("IsInitialized").Return(true) testObj.On("IsStopped").Return(false) testObj.On("getRacksWithNotReadyPodsBootstrapped").Return([]string{}) testObj.On("getPlannedDownTimeNodeNameSet").Return(utils.StringSet{}, nil) testObj.On("getEvacuateAllDataNodeNameSet").Return(utils.StringSet{"node2": true}, nil) + testObj.On("getNodeNameSet").Return(utils.StringSet{ + "node1": true, "node2": true, + "node3": true, "node4": true, + "node5": true, "node6": true, + }, nil) + testObj.On("getPodNameSet").Return(utils.StringSet{"pod1": true, "pod2": true, "pod3": true}) testObj.On("removeAllNotReadyPodsOnEMMNodes").Return(false, nil) testObj.On("removeNextPodFromEvacuateDataNode").Return(true, nil) @@ -274,16 +340,22 @@ func Test_checkNodeEMM(t *testing.T) { testObj.AssertExpectations(t) IsRequeue(t, r, "") - // When there are no pods not ready and no nodes are marked for evacuate // data, remove all pods for a node marked for planned down time testObj = &MockEMMService{} testObj.On("cleanupEMMAnnotations").Return(false, nil) + testObj.On("emmFailureStillProcessing").Return(false, nil) testObj.On("IsInitialized").Return(true) testObj.On("IsStopped").Return(false) testObj.On("getRacksWithNotReadyPodsBootstrapped").Return([]string{}) testObj.On("getPlannedDownTimeNodeNameSet").Return(utils.StringSet{}, nil) testObj.On("getEvacuateAllDataNodeNameSet").Return(utils.StringSet{"node2": true}, nil) + testObj.On("getNodeNameSet").Return(utils.StringSet{ + "node1": true, "node2": true, + "node3": true, "node4": true, + "node5": true, "node6": true, + }, nil) + testObj.On("getPodNameSet").Return(utils.StringSet{"pod1": true, "pod2": true, "pod3": true}) testObj.On("removeAllNotReadyPodsOnEMMNodes").Return(false, nil) testObj.On("removeNextPodFromEvacuateDataNode").Return(false, nil) testObj.On("removeAllPodsFromOnePlannedDowntimeNode").Return(true, nil) @@ -291,8 +363,47 @@ func Test_checkNodeEMM(t *testing.T) { r = checkNodeEMM(testObj) testObj.AssertExpectations(t) IsRequeue(t, r, "") -} + // When there are no available nodes to evacuate too + // fail emm + testObj = &MockEMMService{} + testObj.On("cleanupEMMAnnotations").Return(false, nil) + testObj.On("emmFailureStillProcessing").Return(false, nil) + testObj.On("IsInitialized").Return(true) + testObj.On("getPlannedDownTimeNodeNameSet").Return(utils.StringSet{}, nil) + testObj.On("getEvacuateAllDataNodeNameSet").Return(utils.StringSet{"node2": true}, nil) + testObj.On("getNodeNameSet").Return(utils.StringSet{ + "node1": true, + "node2": true, + "node3": true, + }, nil) + testObj.On("getPodNameSet").Return(utils.StringSet{"pod1": true, "pod2": true, "pod3": true}) + testObj.On("failEMM", "node2", NotEnoughResources).Return(true, nil) + + r = checkNodeEMM(testObj) + testObj.AssertExpectations(t) + IsRequeue(t, r, "") + + // When there are no available nodes to during a planned downtime + // for a node with a pod then fail emm + testObj = &MockEMMService{} + testObj.On("cleanupEMMAnnotations").Return(false, nil) + testObj.On("emmFailureStillProcessing").Return(false, nil) + testObj.On("IsInitialized").Return(true) + testObj.On("getPlannedDownTimeNodeNameSet").Return(utils.StringSet{"node2": true}, nil) + testObj.On("getEvacuateAllDataNodeNameSet").Return(utils.StringSet{}, nil) + testObj.On("getNodeNameSet").Return(utils.StringSet{ + "node1": true, + "node2": true, + "node3": true, + }, nil) + testObj.On("getPodNameSet").Return(utils.StringSet{"pod1": true, "pod2": true, "pod3": true}) + testObj.On("failEMM", "node2", NotEnoughResources).Return(true, nil) + + r = checkNodeEMM(testObj) + testObj.AssertExpectations(t) + IsRequeue(t, r, "") +} func Test_checkPVCHealth(t *testing.T) { // When no pods have PVCs marked as inaccessible, do nothing and continue @@ -328,6 +439,11 @@ type MockEMMSPI struct { mock.Mock } +func (m *MockEMMSPI) GetAllNodes() ([]*corev1.Node, error) { + args := m.Called() + return args.Get(0).([]*corev1.Node), args.Error(1) +} + func (m *MockEMMSPI) GetAllNodesInDC() ([]*corev1.Node, error) { args := m.Called() return args.Get(0).([]*corev1.Node), args.Error(1) @@ -400,8 +516,8 @@ func evacuateDataNode(name string) *corev1.Node { node.Name = name node.Spec.Taints = []corev1.Taint{ corev1.Taint{ - Key: "node.vmware.com/drain", - Value: "drain", + Key: "node.vmware.com/drain", + Value: "drain", Effect: corev1.TaintEffectNoSchedule, }, } @@ -413,7 +529,7 @@ func Test_removeAllNotReadyPodsOnEMMNodes(t *testing.T) { var testObj *MockEMMSPI testObj = &MockEMMSPI{} - service = &EMMServiceImpl { + service = &EMMServiceImpl{ EMMSPI: testObj, } diff --git a/operator/pkg/reconciliation/check_nodes.go b/operator/pkg/reconciliation/check_nodes.go index 3f654661f..b8877fcd4 100644 --- a/operator/pkg/reconciliation/check_nodes.go +++ b/operator/pkg/reconciliation/check_nodes.go @@ -7,7 +7,9 @@ import ( "fmt" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/datastax/cass-operator/operator/pkg/utils" @@ -101,7 +103,6 @@ func (rc *ReconciliationContext) getDCPodByName(podName string) *corev1.Pod { return nil } - // // functions to statisfy EMMSPI interface from psp package // @@ -110,7 +111,7 @@ func (rc *ReconciliationContext) GetAllNodesInDC() ([]*corev1.Node, error) { // Get all nodes for datacenter // // We need to check taints for all nodes that this datacenter cares about, - // this includes not just nodes where we have dc pods, but also nodes + // this includes not just nodes where we have dc pods, but also nodes // where we have PVCs, as PVCs might get separated from their pod when a // pod is rescheduled. pvcs, err := rc.getPodsPVCs(rc.dcPods) @@ -121,6 +122,28 @@ func (rc *ReconciliationContext) GetAllNodesInDC() ([]*corev1.Node, error) { return rc.getNodesForNameSet(nodeNameSet) } +func (rc *ReconciliationContext) GetAllNodes() ([]*corev1.Node, error) { + // Get all nodes + // + // we need to find all nodes for availability reasons + listOptions := &client.ListOptions{} + + nodeList := &corev1.NodeList{ + TypeMeta: metav1.TypeMeta{ + Kind: "Node", + APIVersion: "v1", + }, + } + if err := rc.Client.List(rc.Ctx, nodeList, listOptions); err != nil { + return nil, err + } + var nodes []*corev1.Node + for _, node := range nodeList.Items { + nodes = append(nodes, &node) + } + return nodes, nil +} + func (rc *ReconciliationContext) GetDCPods() []*corev1.Pod { return rc.dcPods } diff --git a/operator/pkg/utils/k8s_utils.go b/operator/pkg/utils/k8s_utils.go index c4be075e7..9f095cb26 100644 --- a/operator/pkg/utils/k8s_utils.go +++ b/operator/pkg/utils/k8s_utils.go @@ -30,6 +30,15 @@ func SubtractStringSet(a, b StringSet) StringSet { return result } +func IntersectionStringSet(a, b StringSet) StringSet { + result := StringSet{} + for k, v := range a { + if v && b[k] { + result[k] = true + } + } + return result +} // // k8s Node helper functions