Skip to content

Commit

Permalink
Emm updates (#342)
Browse files Browse the repository at this point in the history
* changes for vmware certification

* test and cleanup for vmware certification fixes

- satisfied emm interfaces and fixed tests
- added test cases for emm node count
- review fixes

Co-authored-by: John Sanda <john.sanda@gmail.com>
  • Loading branch information
2 people authored and jimdickinson committed Jan 15, 2021
1 parent e68babc commit ddc3a99
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 58 deletions.
152 changes: 116 additions & 36 deletions operator/pkg/psp/emm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

// PSP EMM operations are triggered by a user through a UI and appear to the
// operator as taints on k8s nodes. To allow the EMM operation to proceed, all
// pods must be removed from the tainted node. EMM can be cancelled by adding
// pods must be removed from the tainted node. EMM can be cancelled by adding
// a failure annotation to pods on the node.
//
// The way we have implemented EMM here is to only allow an EMM operation to
Expand Down Expand Up @@ -62,7 +62,7 @@ const (
// This interface is what, in practice, the ReconciliationContext will need to
// implement. Its intention is to keep coupling with the reconciliation package
// low and also to keep this package relatively sheltered from certain thorny
// details, like determining which pods represent cassandra nodes that have
// details, like determining which pods represent cassandra nodes that have
// bootstrapped at some point.
type EMMSPI interface {
GetAllNodesInDC() ([]*corev1.Node, error)
Expand All @@ -77,6 +77,7 @@ type EMMSPI interface {
IsStopped() bool
IsInitialized() bool
GetLogger() logr.Logger
GetAllNodes() ([]*corev1.Node, error)
}

type EMMChecks interface {
Expand All @@ -87,6 +88,8 @@ type EMMChecks interface {
getInProgressNodeReplacements() []string
IsStopped() bool
IsInitialized() bool
getNodeNameSet() (utils.StringSet, error)
getPodNameSet() utils.StringSet
}

type EMMOperations interface {
Expand All @@ -99,6 +102,7 @@ type EMMOperations interface {
removeNextPodFromEvacuateDataNode() (bool, error)
removeAllPodsFromOnePlannedDowntimeNode() (bool, error)
startNodeReplace(podName string) error
emmFailureStillProcessing() (bool, error)
}

type EMMService interface {
Expand Down Expand Up @@ -172,7 +176,7 @@ func (impl *EMMServiceImpl) cleanupEMMAnnotations() (bool, error) {
nodesNoLongerEMM := utils.SubtractStringSet(
nodesWithPodsFailedEMM,
nodes)

podsNoLongerFailed := utils.FilterPodsWithNodeInNameSet(podsFailedEmm, nodesNoLongerEMM)
didUpdate := false
for _, pod := range podsNoLongerFailed {
Expand All @@ -186,6 +190,24 @@ func (impl *EMMServiceImpl) cleanupEMMAnnotations() (bool, error) {
return didUpdate, nil
}

func (impl *EMMServiceImpl) emmFailureStillProcessing() (bool, error) {
nodes, err := impl.getEvacuateAllDataNodeNameSet()
if err != nil {
return false, err
}
nodes2, err := impl.getPlannedDownTimeNodeNameSet()
if err != nil {
return false, err
}
nodes = utils.UnionStringSet(nodes, nodes2)

// Strip EMM failure annotation from pods where node is no longer tainted
podsFailedEmm := impl.getPodsWithAnnotationKey(EMMFailureAnnotation)
nodesWithPodsFailedEMM := utils.GetPodNodeNameSet(podsFailedEmm)

return len(utils.IntersectionStringSet(nodes, nodesWithPodsFailedEMM)) > 0, nil
}

func (impl *EMMServiceImpl) getPlannedDownTimeNodeNameSet() (utils.StringSet, error) {
nodes, err := impl.getNodesWithTaintKeyValueEffect(EMMTaintKey, string(PlannedDowntime), corev1.TaintEffectNoSchedule)
if err != nil {
Expand Down Expand Up @@ -216,7 +238,7 @@ func (impl *EMMServiceImpl) removeAllNotReadyPodsOnEMMNodes() (bool, error) {

taintedNodesNameSet := utils.UnionStringSet(plannedDownNameSet, evacuateDataNameSet)
podsNotReadyOnTaintedNodes := utils.FilterPodsWithNodeInNameSet(podsNotReady, taintedNodesNameSet)

if len(podsNotReadyOnTaintedNodes) > 0 {
for _, pod := range podsNotReadyOnTaintedNodes {
err := impl.RemovePod(pod)
Expand All @@ -234,14 +256,28 @@ func (impl *EMMServiceImpl) failEMM(nodeName string, failure EMMFailure) (bool,
pods := impl.getPodsForNodeName(nodeName)
didUpdate := false
for _, pod := range pods {
err := impl.addPodAnnotation(pod, EMMFailureAnnotation, string(failure))
added, err := impl.addPodAnnotation(pod, EMMFailureAnnotation, string(failure))
if err != nil {
return false, err
}
didUpdate = didUpdate || added
}
return didUpdate, nil
}

func (impl *EMMServiceImpl) getNodeNameSet() (utils.StringSet, error) {
nodes, err := impl.EMMSPI.GetAllNodes()
if err != nil {
return nil, err
}

return utils.GetNodeNameSet(nodes), nil
}

func (impl *EMMServiceImpl) getPodNameSet() utils.StringSet {
return utils.GetPodNameSet(impl.EMMSPI.GetDCPods())
}

func (impl *EMMServiceImpl) performEvacuateDataPodReplace() (bool, error) {
downPods := impl.GetNotReadyPodsBootstrappedInDC()
if len(downPods) > 0 {
Expand All @@ -250,7 +286,7 @@ func (impl *EMMServiceImpl) performEvacuateDataPodReplace() (bool, error) {
return false, err
}

// Check if any of these pods are stuck due to PVC associated to a
// Check if any of these pods are stuck due to PVC associated to a
// tainted node for evacuate all data. This would happen, for example,
// in the case of local persistent volumes where the volume cannot
// move with the pod.
Expand All @@ -265,11 +301,11 @@ func (impl *EMMServiceImpl) performEvacuateDataPodReplace() (bool, error) {
// NOTE: There isn't a great machine readable way to know why
// the pod is unschedulable. The reasons are, unfortunately,
// buried within human readable explanation text. As a result,
// a pod might not get scheduled due to no nodes having
// a pod might not get scheduled due to no nodes having
// sufficent memory, and then we delete a PVC thinking that
// the PVC was causing scheduling to fail even though it
// the PVC was causing scheduling to fail even though it
// wasn't.

pvcNode, err := impl.getPodPVCSelectedNodeName(pod.Name)
if err != nil {
return false, err
Expand Down Expand Up @@ -337,7 +373,6 @@ func (impl *EMMServiceImpl) getLogger() logr.Logger {
return impl.GetLogger()
}


//
// EMMChecks impl
//
Expand All @@ -349,7 +384,7 @@ func (impl *EMMServiceImpl) getPodNameSetWithVolumeHealthInaccessiblePVC(rackNam
if err != nil {
return nil, err
}

pvcs = utils.FilterPVCsWithFn(pvcs, func(pvc *corev1.PersistentVolumeClaim) bool {
return pvc.Annotations != nil && pvc.Annotations[VolumeHealthAnnotation] == string(VolumeHealthInaccessible)
})
Expand Down Expand Up @@ -386,7 +421,6 @@ func (impl *EMMServiceImpl) getInProgressNodeReplacements() []string {
return impl.GetInProgressNodeReplacements()
}


//
// Helper methods
//
Expand All @@ -407,13 +441,16 @@ func (impl *EMMServiceImpl) getPodsWithAnnotationKey(key string) []*corev1.Pod {
return utils.FilterPodsWithAnnotationKey(pods, key)
}

func (impl *EMMServiceImpl) addPodAnnotation(pod *corev1.Pod, key, value string) error {
func (impl *EMMServiceImpl) addPodAnnotation(pod *corev1.Pod, key, value string) (bool, error) {
if pod.ObjectMeta.Annotations == nil {
pod.ObjectMeta.Annotations = map[string]string{}
} else if currentValue, found := pod.Annotations[key]; found {
//check if the value matches, if so then the work is already done
return currentValue == value, nil
}

pod.Annotations[key] = value
return impl.UpdatePod(pod)
return true, impl.UpdatePod(pod)
}

func (impl *EMMServiceImpl) removePodAnnotation(pod *corev1.Pod, key string) error {
Expand All @@ -424,7 +461,7 @@ func (impl *EMMServiceImpl) removePodAnnotation(pod *corev1.Pod, key string) err
return nil
}

// Check nodes for vmware PSP draining taints. This function embodies the
// Check nodes for vmware PSP draining taints. This function embodies the
// business logic around when EMM operations are executed.
func checkNodeEMM(provider EMMService) result.ReconcileResult {
logger := provider.getLogger()
Expand All @@ -440,6 +477,19 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult {
return result.RequeueSoon(2)
}

// Check if there are still pods annotated with EMM failure.
// If there are then that means vmware has not removed the node
// taint in response to a failure. We can requeue or stop
// reconciliation in this situation.
stillProcessing, err := provider.emmFailureStillProcessing()
if err != nil {
logger.Error(err, "Failed to check if EMM failures are still processing")
return result.Error(err)
}
if stillProcessing {
return result.RequeueSoon(2)
}

// Do not perform EMM operations while the datacenter is initializing
if !provider.IsInitialized() {
logger.Info("Skipping EMM check as the cluster is not yet initialized")
Expand All @@ -459,9 +509,39 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult {
return result.Error(err)
}

allNodes, err := provider.getNodeNameSet()
if err != nil {
logger.Error(err, "Failed to get node name set")
return result.Error(err)
}

// Fail EMM operations if insufficient nodes for pods
//
// VMWare requires that we perform some kind of check to ensure that there is at least hope that any
// impacted pods from an EMM operation will get rescheduled successfully to some other node. Here
// we do a basic check to ensure that there are at least as many nodes available as we have cassandra
// pods
unavailableNodes := utils.UnionStringSet(plannedDownNodeNameSet, evacuateDataNodeNameSet)
availableNodes := utils.SubtractStringSet(allNodes, unavailableNodes)

if len(provider.getPodNameSet()) > len(availableNodes) {
anyUpdated := false
updated := false
for node := range unavailableNodes {
if updated, err = provider.failEMM(node, NotEnoughResources); err != nil {
logger.Error(err, "Failed to add "+EMMFailureAnnotation, "Node", node)
return result.Error(err)
}
anyUpdated = anyUpdated || updated
}
if anyUpdated {
return result.RequeueSoon(10)
}
}

// Fail any evacuate data EMM operations if the datacenter is stopped
//
// Cassandra must be up and running to rebuild cassandra nodes. Since
// Cassandra must be up and running to rebuild cassandra nodes. Since
// evacuating may entail deleting PVCs, we need to fail these operations
// as we are unlikely to be able to carry them out successfully.
if provider.IsStopped() {
Expand All @@ -482,10 +562,10 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult {

// NOTE: There might be pods that aren't ready for a variety of reasons,
// however, with respect to data availability, we really only care if a
// pod representing a cassandra node that is _currently_ bootstrapped to
// pod representing a cassandra node that is _currently_ bootstrapped to
// the is down. We do not care if a pod is down for a cassandra node
// that was never bootstrapped (for example, maybe we are in the middle of
// scaling up), as such pods are not part of the cluster presently and
// that was never bootstrapped (for example, maybe we are in the middle of
// scaling up), as such pods are not part of the cluster presently and
// their being down has no impact on data availability. Also, simply
// looking at the pod state label is insufficient here. The pod might be
// brand new, but represents a cassandra node that is already part of the
Expand Down Expand Up @@ -525,7 +605,7 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult {

nodeNameSetForNoPodsInRack := utils.SubtractStringSet(
utils.UnionStringSet(
plannedDownNodeNameSet,
plannedDownNodeNameSet,
evacuateDataNodeNameSet),
nodeNameSetForDownRack)

Expand All @@ -540,7 +620,7 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult {
didUpdate = didUpdate || podsUpdated
}
if didUpdate {
return result.RequeueSoon(2)
return result.RequeueSoon(2)
}
}
}
Expand All @@ -552,14 +632,14 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult {
// For example, if we have a pod on the tainted node that is stuck in
// "Starting", no other pods will have Cassandra started until that is
// resolved. Admittedly, CheckPodsReady() will permit some pods to be
// not ready before starting, but we don't want these two functions
// not ready before starting, but we don't want these two functions
// to become deeply coupled as it makes testing nightmarishly difficult,
// so we just delete all the not ready pods.
//
// Note that, due to earlier checks, we know the only not-ready pods
// are those that have not bootstrapped (so it doesn't matter if we delete
// them) or pods that all belong to the same rack. Consequently, we can
// delete all such pods on the tainted node without impacting
// them) or pods that all belong to the same rack. Consequently, we can
// delete all such pods on the tainted node without impacting
// availability.
didUpdate, err = provider.removeAllNotReadyPodsOnEMMNodes()
if err != nil {
Expand All @@ -575,7 +655,7 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult {

// Wait for pods not on tainted nodes to become ready
//
// If we have pods down (for cassandra nodes that have potentially
// If we have pods down (for cassandra nodes that have potentially
// bootstrapped) that are not on the tainted nodes, these are likely pods
// we previously deleted due to the taints. We try to move these pods
// one-at-a-time to spare ourselves unnecessary rebuilds if
Expand All @@ -589,35 +669,35 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult {
return result.RequeueSoon(2)
}

// Pods are not ready (because downRack isn't the empty string) and
// Pods are not ready (because downRack isn't the empty string) and
// there aren't any pods stuck in an unscheduable state with PVCs on
// on nodes marked for evacuate all data, so continue to allow
// on nodes marked for evacuate all data, so continue to allow
// cassandra a chance to start on the not ready pods. These not ready
// pods are likely ones we deleted previously when moving them off of
// pods are likely ones we deleted previously when moving them off of
// the tainted node.
//
// TODO: Some of these pods might be from a planned-downtime EMM
// TODO: Some of these pods might be from a planned-downtime EMM
// operation and so will not become ready until their node comes back
// online. With the way this logic works, if two nodes are marked for
// planned-downtime, only one node will have its pods deleted, and the
// other will effectively be ignored until the other node is back
// other will effectively be ignored until the other node is back
// online, even if both nodes belong to the same rack. Revisit whether
// this behaviour is desirable.
return result.Continue()
}

// At this point, we know there are no not-ready pods on the tainted
// At this point, we know there are no not-ready pods on the tainted
// nodes and we know there are no down pods that are bootstrapped, so we
// can delete any pod we like without impacting availability.

// Delete a pod for an evacuate data tainted node
//
// We give preference to nodes tainted to evacuate all data mainly to
// ensure some level of determinism. We could give preference to
// ensure some level of determinism. We could give preference to
// planned downtime taints. In an ideal world, we'd address tainted
// nodes in chronilogical order of when they received a taint, but the
// operator doesn't track that information (and I'm not inclined to do
// more book keeping) and the node doesn't have this information as
// more book keeping) and the node doesn't have this information as
// far as I'm aware.
didUpdate, err = provider.removeNextPodFromEvacuateDataNode()
if err != nil {
Expand All @@ -644,7 +724,7 @@ func checkNodeEMM(provider EMMService) result.ReconcileResult {
return result.Continue()
}

// Check for PVCs with health inaccessible. This function embodies the
// Check for PVCs with health inaccessible. This function embodies the
// business logic around when inaccessible PVCs are replaced.
func checkPVCHealth(provider EMMService) result.ReconcileResult {
logger := provider.getLogger()
Expand All @@ -658,7 +738,7 @@ func checkPVCHealth(provider EMMService) result.ReconcileResult {
// nothing to do
return result.Continue()
}

racksWithDownPods := provider.getRacksWithNotReadyPodsBootstrapped()

if len(racksWithDownPods) > 1 {
Expand Down Expand Up @@ -708,4 +788,4 @@ func CheckEMM(spi EMMSPI) result.ReconcileResult {
logger := service.getLogger()
logger.Info("psp::CheckEMM")
return checkNodeEMM(service)
}
}
Loading

0 comments on commit ddc3a99

Please sign in to comment.