Skip to content

Commit

Permalink
fix: Refresh VM cache when node is not found
Browse files Browse the repository at this point in the history
(cherry picked from commit 8850c8c)
  • Loading branch information
Cecile Robert-Michon authored and nilo19 committed Jun 18, 2021
1 parent 22d66e7 commit afa2787
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 5 deletions.
25 changes: 25 additions & 0 deletions staging/src/k8s.io/legacy-cloud-providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ type Cloud struct {
ipv6DualStackEnabled bool
// Lock for access to node caches, includes nodeZones, nodeResourceGroups, and unmanagedNodes.
nodeCachesLock sync.RWMutex
// nodeNames holds current nodes for tracking added nodes in VM caches.
nodeNames sets.String
// nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone
// it is updated by the nodeInformer
nodeZones map[string]sets.String
Expand Down Expand Up @@ -342,6 +344,7 @@ func NewCloudWithoutFeatureGates(configReader io.Reader) (*Cloud, error) {
}

az := &Cloud{
nodeNames: sets.NewString(),
nodeZones: map[string]sets.String{},
nodeResourceGroups: map[string]string{},
unmanagedNodes: sets.NewString(),
Expand Down Expand Up @@ -782,6 +785,9 @@ func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) {
defer az.nodeCachesLock.Unlock()

if prevNode != nil {
// Remove from nodeNames cache.
az.nodeNames.Delete(prevNode.ObjectMeta.Name)

// Remove from nodeZones cache.
prevZone, ok := prevNode.ObjectMeta.Labels[LabelFailureDomainBetaZone]
if ok && az.isAvailabilityZone(prevZone) {
Expand All @@ -805,6 +811,9 @@ func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) {
}

if newNode != nil {
// Add to nodeNames cache.
az.nodeNames.Insert(newNode.ObjectMeta.Name)

// Add to nodeZones cache.
newZone, ok := newNode.ObjectMeta.Labels[LabelFailureDomainBetaZone]
if ok && az.isAvailabilityZone(newZone) {
Expand Down Expand Up @@ -876,6 +885,22 @@ func (az *Cloud) GetNodeResourceGroup(nodeName string) (string, error) {
return az.ResourceGroup, nil
}

// GetNodeNames returns a set of all node names in the k8s cluster.
func (az *Cloud) GetNodeNames() (sets.String, error) {
// Kubelet won't set az.nodeInformerSynced, return nil.
if az.nodeInformerSynced == nil {
return nil, nil
}

az.nodeCachesLock.RLock()
defer az.nodeCachesLock.RUnlock()
if !az.nodeInformerSynced() {
return nil, fmt.Errorf("node informer is not synced when trying to GetNodeNames")
}

return sets.NewString(az.nodeNames.List()...), nil
}

// GetResourceGroups returns a set of resource groups that all nodes are running on.
func (az *Cloud) GetResourceGroups() (sets.String, error) {
// Kubelet won't set az.nodeInformerSynced, always return configured resourceGroup.
Expand Down
3 changes: 3 additions & 0 deletions staging/src/k8s.io/legacy-cloud-providers/azure/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3244,6 +3244,7 @@ func TestUpdateNodeCaches(t *testing.T) {
az.nodeZones = map[string]sets.String{zone: nodesInZone}
az.nodeResourceGroups = map[string]string{"prevNode": "rg"}
az.unmanagedNodes = sets.NewString("prevNode")
az.nodeNames = sets.NewString("prevNode")

prevNode := v1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -3260,6 +3261,7 @@ func TestUpdateNodeCaches(t *testing.T) {
assert.Equal(t, 0, len(az.nodeZones[zone]))
assert.Equal(t, 0, len(az.nodeResourceGroups))
assert.Equal(t, 0, len(az.unmanagedNodes))
assert.Equal(t, 0, len(az.nodeNames))

newNode := v1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -3276,6 +3278,7 @@ func TestUpdateNodeCaches(t *testing.T) {
assert.Equal(t, 1, len(az.nodeZones[zone]))
assert.Equal(t, 1, len(az.nodeResourceGroups))
assert.Equal(t, 1, len(az.unmanagedNodes))
assert.Equal(t, 1, len(az.nodeNames))
}

func TestGetActiveZones(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ type scaleSet struct {
*Cloud

// availabilitySet is also required for scaleSet because some instances
// (e.g. master nodes) may not belong to any scale sets.
// (e.g. control plane nodes) may not belong to any scale sets.
// this also allows for clusters with both VM and VMSS nodes.
availabilitySet VMSet

vmssCache *azcache.TimedCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type vmssEntry struct {
lastUpdate time.Time
}

type availabilitySetEntry struct {
vmNames sets.String
nodeNames sets.String
}

func (ss *scaleSet) newVMSSCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) {
localCache := &sync.Map{} // [vmssName]*vmssEntry
Expand Down Expand Up @@ -278,7 +283,7 @@ func (ss *scaleSet) deleteCacheForNode(nodeName string) error {

func (ss *scaleSet) newAvailabilitySetNodesCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) {
localCache := sets.NewString()
vmNames := sets.NewString()
resourceGroups, err := ss.GetResourceGroups()
if err != nil {
return nil, err
Expand All @@ -292,11 +297,22 @@ func (ss *scaleSet) newAvailabilitySetNodesCache() (*azcache.TimedCache, error)

for _, vm := range vmList {
if vm.Name != nil {
localCache.Insert(*vm.Name)
vmNames.Insert(*vm.Name)
}
}
}

// store all the node names in the cluster when the cache data was created.
nodeNames, err := ss.GetNodeNames()
if err != nil {
return nil, err
}

localCache := availabilitySetEntry{
vmNames: vmNames,
nodeNames: nodeNames,
}

return localCache, nil
}

Expand All @@ -318,6 +334,16 @@ func (ss *scaleSet) isNodeManagedByAvailabilitySet(nodeName string, crt azcache.
return false, err
}

availabilitySetNodes := cached.(sets.String)
return availabilitySetNodes.Has(nodeName), nil
cachedNodes := cached.(availabilitySetEntry).nodeNames
// if the node is not in the cache, assume the node has joined after the last cache refresh and attempt to refresh the cache.
if !cachedNodes.Has(nodeName) {
klog.V(2).Infof("Node %s has joined the cluster since the last VM cache refresh, refreshing the cache", nodeName)
cached, err = ss.availabilitySetNodesCache.Get(availabilitySetNodesKey, azcache.CacheReadTypeForceRefresh)
if err != nil {
return false, err
}
}

cachedVMs := cached.(availabilitySetEntry).vmNames
return cachedVMs.Has(nodeName), nil
}

0 comments on commit afa2787

Please sign in to comment.