From 98257a88fd8da27ba61d8852caaf1e3310e01f24 Mon Sep 17 00:00:00 2001 From: Robin Deeboonchai Date: Thu, 8 Aug 2024 19:06:51 -0700 Subject: [PATCH] refactor: upstream Azure instance cache refactor --- .../azure/azure_force_delete_scale_set.go | 86 +++ .../azure_force_delete_scale_set_test.go | 79 +++ .../cloudprovider/azure/azure_manager_test.go | 8 +- .../cloudprovider/azure/azure_scale_set.go | 493 ++++++++++-------- .../azure/azure_scale_set_instance_cache.go | 260 +++++++++ .../azure_scale_set_instance_cache_test.go | 55 ++ .../azure/azure_scale_set_test.go | 458 ++++++++++++---- 7 files changed, 1112 insertions(+), 327 deletions(-) create mode 100644 cluster-autoscaler/cloudprovider/azure/azure_force_delete_scale_set.go create mode 100644 cluster-autoscaler/cloudprovider/azure/azure_force_delete_scale_set_test.go create mode 100644 cluster-autoscaler/cloudprovider/azure/azure_scale_set_instance_cache.go create mode 100644 cluster-autoscaler/cloudprovider/azure/azure_scale_set_instance_cache_test.go diff --git a/cluster-autoscaler/cloudprovider/azure/azure_force_delete_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_force_delete_scale_set.go new file mode 100644 index 000000000000..ee83119084e1 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_force_delete_scale_set.go @@ -0,0 +1,86 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "context" + "strings" + + "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute" + "github.com/Azure/go-autorest/autorest/azure" + + "k8s.io/klog/v2" + "sigs.k8s.io/cloud-provider-azure/pkg/retry" +) + +// When Azure Dedicated Host is enabled or using isolated vm skus, force deleting a VMSS fails with the following error: +// +// "predominantErrorDetail": { +// "innererror": { +// "internalErrorCode": "OperationNotAllowedOnResourceThatManagesUpdatesWithMaintenanceControl" +// }, +// "code": "OperationNotAllowed", +// "message": "Operation 'ForceDelete' is not allowed on resource 'aks-newnp-11436513-vmss' since it manages updates using maintenance control." +// }, +// +// A programmatically way to determine if a VM size is isolated or not has not been found. The isolated VM documentation: +// https://docs.microsoft.com/en-us/azure/virtual-machines/isolation +// has the current list of isolated VM sizes, but new isolated VM size could be introduced in the future. +// +// As a result of not being able to find out if a VM size is isolated or not, we'll do the following: +// - if scaleSet has isolated vm size or dedicated host, disable forDelete +// - else use forceDelete +// - if new isolated sku were added or dedicatedHost was not updated properly, this forceDelete call will fail with above error. +// In that case, call normal delete (fall-back) + +var isolatedVMSizes = map[string]bool{ + strings.ToLower("Standard_E80ids_v4"): true, + strings.ToLower("Standard_E80is_v4"): true, + strings.ToLower("Standard_E104i_v5"): true, + strings.ToLower("Standard_E104is_v5"): true, + strings.ToLower("Standard_E104id_v5"): true, + strings.ToLower("Standard_E104ids_v5"): true, + strings.ToLower("Standard_M192is_v2"): true, + strings.ToLower("Standard_M192ims_v2"): true, + strings.ToLower("Standard_M192ids_v2"): true, + strings.ToLower("Standard_M192idms_v2"): true, + strings.ToLower("Standard_F72s_v2"): true, + strings.ToLower("Standard_M128ms"): true, +} + +func (scaleSet *ScaleSet) deleteInstances(ctx context.Context, requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs, commonAsgId string) (*azure.Future, *retry.Error) { + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + + skuName := scaleSet.getSKU() + resourceGroup := scaleSet.manager.config.ResourceGroup + forceDelete := shouldForceDelete(skuName, scaleSet) + future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.DeleteInstancesAsync(ctx, resourceGroup, commonAsgId, *requiredIds, forceDelete) + if forceDelete && isOperationNotAllowed(rerr) { + klog.Infof("falling back to normal delete for instances %v for %s", requiredIds.InstanceIds, scaleSet.Name) + return scaleSet.manager.azClient.virtualMachineScaleSetsClient.DeleteInstancesAsync(ctx, resourceGroup, commonAsgId, *requiredIds, false) + } + return future, rerr +} + +func shouldForceDelete(skuName string, scaleSet *ScaleSet) bool { + return scaleSet.enableForceDelete && !isolatedVMSizes[strings.ToLower(skuName)] && !scaleSet.dedicatedHost +} + +func isOperationNotAllowed(rerr *retry.Error) bool { + return rerr != nil && rerr.ServiceErrorCode() == retry.OperationNotAllowed +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_force_delete_scale_set_test.go b/cluster-autoscaler/cloudprovider/azure/azure_force_delete_scale_set_test.go new file mode 100644 index 000000000000..c491c54c1e23 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_force_delete_scale_set_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "net/http" + "testing" + + "github.com/Azure/go-autorest/autorest/azure" + "github.com/stretchr/testify/assert" + "sigs.k8s.io/cloud-provider-azure/pkg/retry" +) + +func TestShouldForceDelete(t *testing.T) { + skuName := "test-vmssSku" + + t.Run("should return true", func(t *testing.T) { + scaleSet := &ScaleSet{} + scaleSet.enableForceDelete = true + assert.Equal(t, shouldForceDelete(skuName, scaleSet), true) + }) + + t.Run("should return false because of dedicated hosts", func(t *testing.T) { + scaleSet := &ScaleSet{} + scaleSet.enableForceDelete = true + scaleSet.dedicatedHost = true + assert.Equal(t, shouldForceDelete(skuName, scaleSet), false) + }) + + t.Run("should return false because of isolated sku", func(t *testing.T) { + scaleSet := &ScaleSet{} + scaleSet.enableForceDelete = true + skuName = "Standard_F72s_v2" // belongs to the map isolatedVMSizes + assert.Equal(t, shouldForceDelete(skuName, scaleSet), false) + }) + +} + +func TestIsOperationNotAllowed(t *testing.T) { + t.Run("should return false because it's not OperationNotAllowed error", func(t *testing.T) { + error := &retry.Error{ + HTTPStatusCode: http.StatusBadRequest, + } + assert.Equal(t, isOperationNotAllowed(error), false) + }) + + t.Run("should return false because error is nil", func(t *testing.T) { + assert.Equal(t, isOperationNotAllowed(nil), false) + }) + + t.Run("should return true if error is OperationNotAllowed", func(t *testing.T) { + sre := &azure.ServiceError{ + Code: retry.OperationNotAllowed, + Message: "error-message", + } + error := &retry.Error{ + RawError: sre, + } + assert.Equal(t, isOperationNotAllowed(error), false) + }) + + // It is difficult to condition the case where return error matched expected error string for forceDelete and the + // function should return true. + +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go index 942af43a65c4..baddccff26b1 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager_test.go @@ -703,8 +703,8 @@ func TestGetFilteredAutoscalingGroupsVmss(t *testing.T) { enableForceDelete: manager.config.EnableForceDelete, curSize: 3, sizeRefreshPeriod: manager.azureCache.refreshInterval, - instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, - getVmssSizeRefreshPeriod: time.Duration(VmssSizeRefreshPeriodDefault) * time.Second, + getVmssSizeRefreshPeriod: time.Duration(manager.azureCache.refreshInterval) * time.Second, + InstanceCache: InstanceCache{instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod}, }} assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs) } @@ -751,8 +751,8 @@ func TestGetFilteredAutoscalingGroupsVmssWithConfiguredSizes(t *testing.T) { enableForceDelete: manager.config.EnableForceDelete, curSize: 3, sizeRefreshPeriod: manager.azureCache.refreshInterval, - instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod, - getVmssSizeRefreshPeriod: time.Duration(VmssSizeRefreshPeriodDefault) * time.Second, + getVmssSizeRefreshPeriod: time.Duration(manager.azureCache.refreshInterval) * time.Second, + InstanceCache: InstanceCache{instancesRefreshPeriod: defaultVmssInstancesRefreshPeriod}, }} assert.True(t, assert.ObjectsAreEqualValues(expectedAsgs, asgs), "expected %#v, but found: %#v", expectedAsgs, asgs) } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index 9d2c9fcdcac3..3d03591f5e0c 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -33,11 +33,13 @@ import ( "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute" "github.com/Azure/go-autorest/autorest/azure" + "github.com/Azure/go-autorest/autorest/to" ) var ( defaultVmssInstancesRefreshPeriod = 5 * time.Minute vmssContextTimeout = 3 * time.Minute + asyncContextTimeout = 30 * time.Minute vmssSizeMutex sync.Mutex ) @@ -60,30 +62,28 @@ type ScaleSet struct { enableForceDelete bool enableDynamicInstanceList bool + enableDetailedCSEMessage bool + + // Current Size (Number of VMs) // curSize tracks (and caches) the number of VMs in this ScaleSet. // It is periodically updated from vmss.Sku.Capacity, with VMSS itself coming // either from azure.Cache (which periodically does VMSS.List) - // or from direct VMSS.Get (used for Spot). + // or from direct VMSS.Get (always used for Spot). curSize int64 - // lastSizeRefresh is the time curSize was last refreshed from vmss.Sku.Capacity. - // Together with sizeRefreshPeriod, it is used to determine if it is time to refresh curSize. - lastSizeRefresh time.Time // sizeRefreshPeriod is how often curSize is refreshed from vmss.Sku.Capacity. // (Set from azureCache.refreshInterval = VmssCacheTTL or [defaultMetadataCache]refreshInterval = 1min) sizeRefreshPeriod time.Duration - // getVmssSizeRefreshPeriod is how often curSize should be refreshed in case VMSS.Get call is used (only spot instances). - // (Set from GetVmssSizeRefreshPeriod, if specified = get-vmss-size-refresh-period = 30s, - // or override from autoscalerProfile.GetVmssSizeRefreshPeriod) + // lastSizeRefresh is the time curSize was last refreshed from vmss.Sku.Capacity. + // Together with sizeRefreshPeriod, it is used to determine if it is time to refresh curSize. + lastSizeRefresh time.Time + // getVmssSizeRefreshPeriod is how often curSize should be refreshed in case VMSS.Get call is used. + // (Set from GetVmssSizeRefreshPeriod, if specified = get-vmss-size-refresh-period = 30s getVmssSizeRefreshPeriod time.Duration + // sizeMutex protects curSize (the number of VMs in the ScaleSet) from concurrent access + sizeMutex sync.Mutex - instancesRefreshPeriod time.Duration - instancesRefreshJitter int - - sizeMutex sync.Mutex - instanceMutex sync.Mutex - instanceCache []cloudprovider.Instance - lastInstanceRefresh time.Time + InstanceCache // uses Azure Dedicated Host dedicatedHost bool @@ -95,14 +95,20 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64, d azureRef: azureRef{ Name: spec.Name, }, - minSize: spec.MinSize, - maxSize: spec.MaxSize, - manager: az, - curSize: curSize, - sizeRefreshPeriod: az.azureCache.refreshInterval, - enableDynamicInstanceList: az.config.EnableDynamicInstanceList, - instancesRefreshJitter: az.config.VmssVmsCacheJitter, + + minSize: spec.MinSize, + maxSize: spec.MaxSize, + + manager: az, + curSize: curSize, + sizeRefreshPeriod: az.azureCache.refreshInterval, + InstanceCache: InstanceCache{ + instancesRefreshJitter: az.config.VmssVmsCacheJitter, + }, + enableForceDelete: az.config.EnableForceDelete, + enableDynamicInstanceList: az.config.EnableDynamicInstanceList, + enableDetailedCSEMessage: az.config.EnableDetailedCSEMessage, dedicatedHost: dedicatedHost, } @@ -115,7 +121,11 @@ func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager, curSize int64, d if az.config.GetVmssSizeRefreshPeriod != 0 { scaleSet.getVmssSizeRefreshPeriod = time.Duration(az.config.GetVmssSizeRefreshPeriod) * time.Second } else { - scaleSet.getVmssSizeRefreshPeriod = time.Duration(VmssSizeRefreshPeriodDefault) * time.Second + scaleSet.getVmssSizeRefreshPeriod = time.Duration(az.azureCache.refreshInterval) * time.Second + } + + if az.config.EnableDetailedCSEMessage { + klog.V(2).Infof("enableDetailedCSEMessage: %t", scaleSet.enableDetailedCSEMessage) } return scaleSet, nil @@ -183,14 +193,22 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) { return -1, err } + // // Remove check for returning in-memory size when VMSS is in updating state + // // If VMSS state is updating, return the currentSize which would've been proactively incremented or decremented by CA + // // unless it's -1. In that case, its better to initialize it. + // if scaleSet.curSize != -1 && set.VirtualMachineScaleSetProperties != nil && + // strings.EqualFold(to.String(set.VirtualMachineScaleSetProperties.ProvisioningState), string(compute.GalleryProvisioningStateUpdating)) { + // klog.V(3).Infof("VMSS %q is in updating state, returning cached size: %d", scaleSet.Name, scaleSet.curSize) + // return scaleSet.curSize, nil + // } + effectiveSizeRefreshPeriod := scaleSet.sizeRefreshPeriod // If the scale set is Spot, we want to have a more fresh view of the Sku.Capacity field. - // This is because evictions can happen at any given point in time, - // even before VMs are materialized as nodes. We should be able to - // react to those and have the autoscaler readjust the goal again to force restoration. - // Taking into account only if orchestrationMode == Uniform because flex mode can have - // combination of spot and regular vms + // This is because evictions can happen + // at any given point in time, even before VMs are materialized as + // nodes. We should be able to react to those and have the autoscaler + // readjust the goal again to force restoration. if isSpot(&set) { effectiveSizeRefreshPeriod = scaleSet.getVmssSizeRefreshPeriod } @@ -206,8 +224,7 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) { defer cancel() var rerr *retry.Error - set, rerr = scaleSet.manager.azClient.virtualMachineScaleSetsClient.Get(ctx, scaleSet.manager.config.ResourceGroup, - scaleSet.Name) + set, rerr = scaleSet.manager.azClient.virtualMachineScaleSetsClient.Get(ctx, scaleSet.manager.config.ResourceGroup, scaleSet.Name) if rerr != nil { klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, rerr) return -1, err @@ -230,36 +247,25 @@ func (scaleSet *ScaleSet) getCurSize() (int64, error) { return scaleSet.curSize, nil } -func isSpot(vmss *compute.VirtualMachineScaleSet) bool { - return vmss != nil && vmss.VirtualMachineScaleSetProperties != nil && - vmss.VirtualMachineScaleSetProperties.VirtualMachineProfile != nil && - vmss.VirtualMachineScaleSetProperties.VirtualMachineProfile.Priority == compute.Spot -} - -// GetScaleSetSize gets Scale Set size. -func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) { - return scaleSet.getCurSize() -} - -func (scaleSet *ScaleSet) waitForDeleteInstances(future *azure.Future, requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs) { - ctx, cancel := getContextWithCancel() - defer cancel() - - klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(%v) for %s", requiredIds.InstanceIds, scaleSet.Name) - httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(ctx, future, scaleSet.manager.config.ResourceGroup) - isSuccess, err := isSuccessHTTPResponse(httpResponse, err) - if isSuccess { - klog.V(3).Infof("virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(%v) for %s success", requiredIds.InstanceIds, scaleSet.Name) - return +// getScaleSetSize gets Scale Set size. +func (scaleSet *ScaleSet) getScaleSetSize() (int64, error) { + // First, get the size of the ScaleSet reported by API + // -1 indiciates the ScaleSet hasn't been initialized + size, err := scaleSet.getCurSize() + if size == -1 || err != nil { + klog.V(3).Infof("getScaleSetSize: either size is -1 (actual: %d) or error exists (actual err:%v)", size, err) + return size, err } - klog.Errorf("virtualMachineScaleSetsClient.WaitForDeleteInstancesResult - DeleteInstances for instances %v for %s failed with error: %v", requiredIds.InstanceIds, scaleSet.Name, err) + return size, nil } -// updateVMSSCapacity invokes virtualMachineScaleSetsClient to update the capacity for VMSS. -func (scaleSet *ScaleSet) updateVMSSCapacity(future *azure.Future) { +// waitForCreateOrUpdate waits for the outcome of VMSS capacity update initiated via CreateOrUpdateAsync. +func (scaleSet *ScaleSet) waitForCreateOrUpdateInstances(future *azure.Future) { var err error defer func() { + // Invalidate instanceCache on success and failure. Failure might have created a few instances, but it is very rare. + scaleSet.invalidateInstanceCache() if err != nil { klog.Errorf("Failed to update the capacity for vmss %s with error %v, invalidate the cache so as to get the real size from API", scaleSet.Name, err) // Invalidate the VMSS size cache in order to fetch the size from the API. @@ -268,7 +274,7 @@ func (scaleSet *ScaleSet) updateVMSSCapacity(future *azure.Future) { } }() - ctx, cancel := getContextWithCancel() + ctx, cancel := getContextWithTimeout(asyncContextTimeout) defer cancel() klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult(%s)", scaleSet.Name) @@ -276,67 +282,41 @@ func (scaleSet *ScaleSet) updateVMSSCapacity(future *azure.Future) { isSuccess, err := isSuccessHTTPResponse(httpResponse, err) if isSuccess { - klog.V(3).Infof("virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult(%s) success", scaleSet.Name) - scaleSet.invalidateInstanceCache() + klog.V(3).Infof("waitForCreateOrUpdateInstances(%s) success", scaleSet.Name) return } - klog.Errorf("virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult - updateVMSSCapacity for scale set %q failed: %v", scaleSet.Name, err) + klog.Errorf("waitForCreateOrUpdateInstances(%s) failed, err: %v", scaleSet.Name, err) } -// SetScaleSetSize sets ScaleSet size. -func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { - scaleSet.sizeMutex.Lock() - defer scaleSet.sizeMutex.Unlock() - +// setScaleSetSize sets ScaleSet size. +func (scaleSet *ScaleSet) setScaleSetSize(size int64, delta int) error { vmssInfo, err := scaleSet.getVMSSFromCache() if err != nil { klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) return err } - // Update the new capacity to cache. - vmssSizeMutex.Lock() - vmssInfo.Sku.Capacity = &size - vmssSizeMutex.Unlock() + requiredInstances := delta - // Compose a new VMSS for updating. - op := compute.VirtualMachineScaleSet{ - Name: vmssInfo.Name, - Sku: vmssInfo.Sku, - Location: vmssInfo.Location, - } - - if vmssInfo.ExtendedLocation != nil { - op.ExtendedLocation = &compute.ExtendedLocation{ - Name: vmssInfo.ExtendedLocation.Name, - Type: vmssInfo.ExtendedLocation.Type, + // If after reallocating instances we still need more instances or we're just in Delete mode + // send a scale request + if requiredInstances > 0 { + klog.V(3).Infof("Remaining unsatisfied count is %d. Attempting to increase scale set %q "+ + "capacity", requiredInstances, scaleSet.Name) + err := scaleSet.createOrUpdateInstances(&vmssInfo, size) + if err != nil { + klog.Errorf("Failed to increase capacity for scale set %q to %d: %v", scaleSet.Name, requiredInstances, err) + return err } - - klog.V(3).Infof("Passing ExtendedLocation information if it is not nil, with Edge Zone name:(%s)", *op.ExtendedLocation.Name) - } - - ctx, cancel := getContextWithTimeout(vmssContextTimeout) - defer cancel() - klog.V(3).Infof("Waiting for virtualMachineScaleSetsClient.CreateOrUpdateAsync(%s)", scaleSet.Name) - future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdateAsync(ctx, scaleSet.manager.config.ResourceGroup, scaleSet.Name, op) - if rerr != nil { - klog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %v", scaleSet.Name, rerr) - return rerr.Error() } - - // Proactively set the VMSS size so autoscaler makes better decisions. - scaleSet.curSize = size - scaleSet.lastSizeRefresh = time.Now() - - go scaleSet.updateVMSSCapacity(future) return nil } // TargetSize returns the current TARGET size of the node group. It is possible that the // number is different from the number of nodes registered in Kubernetes. func (scaleSet *ScaleSet) TargetSize() (int, error) { - size, err := scaleSet.GetScaleSetSize() + size, err := scaleSet.getScaleSetSize() return int(size), err } @@ -346,7 +326,7 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error { return fmt.Errorf("size increase must be positive") } - size, err := scaleSet.GetScaleSetSize() + size, err := scaleSet.getScaleSetSize() if err != nil { return err } @@ -359,7 +339,7 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error { return fmt.Errorf("size increase too large - desired:%d max:%d", int(size)+delta, scaleSet.MaxSize()) } - return scaleSet.SetScaleSetSize(size + int64(delta)) + return scaleSet.setScaleSetSize(size+int64(delta), delta) } // AtomicIncreaseSize is not implemented. @@ -369,13 +349,14 @@ func (scaleSet *ScaleSet) AtomicIncreaseSize(delta int) error { // GetScaleSetVms returns list of nodes for the given scale set. func (scaleSet *ScaleSet) GetScaleSetVms() ([]compute.VirtualMachineScaleSetVM, *retry.Error) { - klog.V(4).Infof("GetScaleSetVms: starts") ctx, cancel := getContextWithTimeout(vmssContextTimeout) defer cancel() - resourceGroup := scaleSet.manager.config.ResourceGroup - vmList, rerr := scaleSet.manager.azClient.virtualMachineScaleSetVMsClient.List(ctx, resourceGroup, scaleSet.Name, "instanceView") + vmList, rerr := scaleSet.manager.azClient.virtualMachineScaleSetVMsClient.List(ctx, scaleSet.manager.config.ResourceGroup, + scaleSet.Name, string(compute.InstanceViewTypesInstanceView)) + klog.V(4).Infof("GetScaleSetVms: scaleSet.Name: %s, vmList: %v", scaleSet.Name, vmList) + if rerr != nil { klog.Errorf("VirtualMachineScaleSetVMsClient.List failed for %s: %v", scaleSet.Name, rerr) return nil, rerr @@ -418,7 +399,7 @@ func (scaleSet *ScaleSet) DecreaseTargetSize(delta int) error { // VMSS size should be changed automatically after the Node deletion, hence this operation is not required. // To prevent some unreproducible bugs, an extra refresh of cache is needed. scaleSet.invalidateInstanceCache() - _, err := scaleSet.GetScaleSetSize() + _, err := scaleSet.getScaleSetSize() if err != nil { klog.Warningf("DecreaseTargetSize: failed with error: %v", err) } @@ -446,7 +427,53 @@ func (scaleSet *ScaleSet) Belongs(node *apiv1.Node) (bool, error) { return true, nil } -// DeleteInstances deletes the given instances. All instances must be controlled by the same ASG. +func (scaleSet *ScaleSet) createOrUpdateInstances(vmssInfo *compute.VirtualMachineScaleSet, newSize int64) error { + if vmssInfo == nil { + return fmt.Errorf("vmssInfo cannot be nil while increating scaleSet capacity") + } + + scaleSet.sizeMutex.Lock() + defer scaleSet.sizeMutex.Unlock() + + // Update the new capacity to cache. + vmssSizeMutex.Lock() + vmssInfo.Sku.Capacity = &newSize + vmssSizeMutex.Unlock() + + // Compose a new VMSS for updating. + op := compute.VirtualMachineScaleSet{ + Name: vmssInfo.Name, + Sku: vmssInfo.Sku, + Location: vmssInfo.Location, + } + + if vmssInfo.ExtendedLocation != nil { + op.ExtendedLocation = &compute.ExtendedLocation{ + Name: vmssInfo.ExtendedLocation.Name, + Type: vmssInfo.ExtendedLocation.Type, + } + + klog.V(3).Infof("Passing ExtendedLocation information if it is not nil, with Edge Zone name:(%s)", *op.ExtendedLocation.Name) + } + + ctx, cancel := getContextWithTimeout(vmssContextTimeout) + defer cancel() + klog.V(3).Infof("Waiting for virtualMachineScaleSetsClient.CreateOrUpdateAsync(%s)", scaleSet.Name) + future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdateAsync(ctx, scaleSet.manager.config.ResourceGroup, scaleSet.Name, op) + if rerr != nil { + klog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %+v", scaleSet.Name, rerr) + return rerr.Error() + } + + // Proactively set the VMSS size so autoscaler makes better decisions. + scaleSet.curSize = newSize + scaleSet.lastSizeRefresh = time.Now() + + go scaleSet.waitForCreateOrUpdateInstances(future) + return nil +} + +// DeleteInstances deletes the given instances. All instances must be controlled by the same nodegroup. func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregisteredNodes bool) error { if len(instances) == 0 { return nil @@ -461,16 +488,13 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered instancesToDelete := []*azureRef{} for _, instance := range instances { - asg, err := scaleSet.manager.GetNodeGroupForInstance(instance) + err = scaleSet.verifyNodeGroup(instance, commonAsg.Id()) if err != nil { return err } - if !strings.EqualFold(asg.Id(), commonAsg.Id()) { - return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", instance.Name, commonAsg) - } - - if cpi, found := scaleSet.getInstanceByProviderID(instance.Name); found && cpi.Status != nil && cpi.Status.State == cloudprovider.InstanceDeleting { + if cpi, found, err := scaleSet.getInstanceByProviderID(instance.Name); found && err == nil && cpi.Status != nil && + cpi.Status.State == cloudprovider.InstanceDeleting { klog.V(3).Infof("Skipping deleting instance %s as its current state is deleting", instance.Name) continue } @@ -499,21 +523,10 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered ctx, cancel := getContextWithTimeout(vmssContextTimeout) defer cancel() - resourceGroup := scaleSet.manager.config.ResourceGroup - - scaleSet.instanceMutex.Lock() - klog.V(3).Infof("Calling virtualMachineScaleSetsClient.DeleteInstancesAsync(%v), force delete set to %v", requiredIds.InstanceIds, scaleSet.enableForceDelete) - future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.DeleteInstancesAsync(ctx, resourceGroup, commonAsg.Id(), *requiredIds, scaleSet.enableForceDelete) - if scaleSet.enableForceDelete && isOperationNotAllowed(rerr) { - klog.Infof("falling back to normal delete for instances %v for %s", requiredIds.InstanceIds, scaleSet.Name) - future, rerr = scaleSet.manager.azClient.virtualMachineScaleSetsClient.DeleteInstancesAsync(ctx, resourceGroup, - commonAsg.Id(), *requiredIds, false) - } - - scaleSet.instanceMutex.Unlock() + future, rerr := scaleSet.deleteInstances(ctx, requiredIds, commonAsg.Id()) if rerr != nil { - klog.Errorf("virtualMachineScaleSetsClient.DeleteInstancesAsync for instances %v failed: %v", requiredIds.InstanceIds, rerr) + klog.Errorf("virtualMachineScaleSetsClient.DeleteInstancesAsync for instances %v for %s failed: %+v", requiredIds.InstanceIds, scaleSet.Name, rerr) return rerr.Error() } @@ -533,14 +546,30 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef, hasUnregistered } go scaleSet.waitForDeleteInstances(future, requiredIds) - return nil } +func (scaleSet *ScaleSet) waitForDeleteInstances(future *azure.Future, requiredIds *compute.VirtualMachineScaleSetVMInstanceRequiredIDs) { + ctx, cancel := getContextWithTimeout(asyncContextTimeout) + + defer cancel() + klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(%v) for %s", requiredIds.InstanceIds, scaleSet.Name) + httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForDeleteInstancesResult(ctx, future, scaleSet.manager.config.ResourceGroup) + isSuccess, err := isSuccessHTTPResponse(httpResponse, err) + if isSuccess { + klog.V(3).Infof(".WaitForDeleteInstancesResult(%v) for %s success", requiredIds.InstanceIds, scaleSet.Name) + // No need to invalidateInstanceCache because instanceStates were proactively set to "deleting" + return + } + // On failure, invalidate the instanceCache - cannot have instances in deletingState + scaleSet.invalidateInstanceCache() + klog.Errorf("WaitForDeleteInstancesResult(%v) for %s failed with error: %v", requiredIds.InstanceIds, scaleSet.Name, err) +} + // DeleteNodes deletes the nodes from the group. func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error { klog.V(8).Infof("Delete nodes requested: %q\n", nodes) - size, err := scaleSet.GetScaleSetSize() + size, err := scaleSet.getScaleSetSize() if err != nil { return err } @@ -549,8 +578,11 @@ func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error { return fmt.Errorf("min size reached, nodes will not be deleted") } + // Distinguish between unregistered node deletion and normal node deletion refs := make([]*azureRef, 0, len(nodes)) hasUnregisteredNodes := false + unregisteredRefs := make([]*azureRef, 0, len(nodes)) + for _, node := range nodes { belongs, err := scaleSet.Belongs(node) if err != nil { @@ -567,7 +599,18 @@ func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error { ref := &azureRef{ Name: node.Spec.ProviderID, } - refs = append(refs, ref) + + if node.Annotations[cloudprovider.FakeNodeReasonAnnotation] == cloudprovider.FakeNodeUnregistered { + klog.V(5).Infof("Node: %s type is unregistered..Appending to the unregistered list", node.Name) + unregisteredRefs = append(unregisteredRefs, ref) + } else { + refs = append(refs, ref) + } + } + + if len(unregisteredRefs) > 0 { + klog.V(3).Infof("Removing unregisteredNodes: %v", unregisteredRefs) + return scaleSet.DeleteInstances(unregisteredRefs, true) } return scaleSet.DeleteInstances(refs, hasUnregisteredNodes) @@ -591,6 +634,7 @@ func (scaleSet *ScaleSet) TemplateNodeInfo() (*schedulerframework.NodeInfo, erro } node, err := buildNodeFromTemplate(scaleSet.Name, template, scaleSet.manager, scaleSet.enableDynamicInstanceList) + if err != nil { return nil, err } @@ -602,7 +646,6 @@ func (scaleSet *ScaleSet) TemplateNodeInfo() (*schedulerframework.NodeInfo, erro // Nodes returns a list of all nodes that belong to this node group. func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) { - klog.V(4).Infof("Nodes: starts, scaleSet.Name: %s", scaleSet.Name) curSize, err := scaleSet.getCurSize() if err != nil { klog.Errorf("Failed to get current size for vmss %q: %v", scaleSet.Name, err) @@ -618,73 +661,84 @@ func (scaleSet *ScaleSet) Nodes() ([]cloudprovider.Instance, error) { return scaleSet.instanceCache, nil } - klog.V(4).Infof("Nodes: starts to get VMSS VMs") - splay := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(scaleSet.instancesRefreshJitter + 1) - lastRefresh := time.Now().Add(-time.Second * time.Duration(splay)) - - orchestrationMode, err := scaleSet.getOrchestrationMode() + // Forcefully updating the instanceCache as the instanceCacheSize didn't match curSize or cache is invalid. + err = scaleSet.updateInstanceCache() if err != nil { - klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, err) return nil, err } - klog.V(4).Infof("VMSS: orchestration Mode %s", orchestrationMode) - - if orchestrationMode == compute.Uniform { - err := scaleSet.buildScaleSetCache(lastRefresh) - if err != nil { - return nil, err - } - - } else if orchestrationMode == compute.Flexible { - if scaleSet.manager.config.EnableVmssFlex { - err := scaleSet.buildScaleSetCacheForFlex(lastRefresh) - if err != nil { - return nil, err - } - } else { - return nil, fmt.Errorf("vmss - %q with Flexible orchestration detected but 'enableVmssFlex' feature flag is turned off", scaleSet.Name) - } - - } else { - return nil, fmt.Errorf("Failed to determine orchestration mode for vmss %q", scaleSet.Name) - } - klog.V(4).Infof("Nodes: returns") return scaleSet.instanceCache, nil } -func (scaleSet *ScaleSet) buildScaleSetCache(lastRefresh time.Time) error { - vms, rerr := scaleSet.GetScaleSetVms() +// buildScaleSetCacheForFlex is used by orchestrationMode == compute.Flexible +func (scaleSet *ScaleSet) buildScaleSetCacheForFlex() error { + klog.V(3).Infof("buildScaleSetCacheForFlex: resetting instance Cache for scaleSet %s", + scaleSet.Name) + splay := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(scaleSet.instancesRefreshJitter + 1) + lastRefresh := time.Now().Add(-time.Second * time.Duration(splay)) + + vms, rerr := scaleSet.GetFlexibleScaleSetVms() if rerr != nil { if isAzureRequestsThrottled(rerr) { // Log a warning and update the instance refresh time so that it would retry after cache expiration - klog.Warningf("GetScaleSetVms() is throttled with message %v, would return the cached instances", rerr) + klog.Warningf("GetFlexibleScaleSetVms() is throttled with message %v, would return the cached instances", rerr) scaleSet.lastInstanceRefresh = lastRefresh return nil } return rerr.Error() } - scaleSet.instanceCache = buildInstanceCache(vms) + scaleSet.instanceCache = buildInstanceCacheForFlex(vms) scaleSet.lastInstanceRefresh = lastRefresh return nil } -func (scaleSet *ScaleSet) buildScaleSetCacheForFlex(lastRefresh time.Time) error { - vms, rerr := scaleSet.GetFlexibleScaleSetVms() +func (scaleSet *ScaleSet) buildScaleSetCacheForUniform() error { + klog.V(3).Infof("updateInstanceCache: resetting instance Cache for scaleSet %s", + scaleSet.Name) + splay := rand.New(rand.NewSource(time.Now().UnixNano())).Intn(scaleSet.instancesRefreshJitter + 1) + lastRefresh := time.Now().Add(-time.Second * time.Duration(splay)) + vms, rerr := scaleSet.GetScaleSetVms() if rerr != nil { if isAzureRequestsThrottled(rerr) { - // Log a warning and update the instance refresh time so that it would retry after cache expiration - klog.Warningf("GetFlexibleScaleSetVms() is throttled with message %v, would return the cached instances", rerr) + // Log a warning and update the instance refresh time so that it would retry later. + // Ensure to retry no sooner than rerr.RetryAfter + klog.Warningf("updateInstanceCache: GetScaleSetVms() is throttled with message %v, would return the cached instances", rerr) + nextRefresh := lastRefresh.Add(scaleSet.instancesRefreshPeriod) + if nextRefresh.Before(rerr.RetryAfter) { + delay := rerr.RetryAfter.Sub(nextRefresh) + lastRefresh = lastRefresh.Add(delay) + } scaleSet.lastInstanceRefresh = lastRefresh return nil } return rerr.Error() } - scaleSet.instanceCache = buildInstanceCache(vms) + instances := []cloudprovider.Instance{} + // Note that the GetScaleSetVms() results is not used directly because for the List endpoint, + // their resource ID format is not consistent with Get endpoint + for i := range vms { + // The resource ID is empty string, which indicates the instance may be in deleting state. + if *vms[i].ID == "" { + continue + } + resourceID, err := convertResourceGroupNameToLower(*vms[i].ID) + if err != nil { + // This shouldn't happen. Log a warning message for tracking. + klog.Warningf("updateInstanceCache: buildInstanceCache.convertResourceGroupNameToLower failed with error: %v", err) + continue + } + + instances = append(instances, cloudprovider.Instance{ + Id: azurePrefix + resourceID, + Status: scaleSet.instanceStatusFromVM(&vms[i]), + }) + } + + scaleSet.instanceCache = instances scaleSet.lastInstanceRefresh = lastRefresh return nil @@ -692,32 +746,22 @@ func (scaleSet *ScaleSet) buildScaleSetCacheForFlex(lastRefresh time.Time) error // Note that the GetScaleSetVms() results is not used directly because for the List endpoint, // their resource ID format is not consistent with Get endpoint -func buildInstanceCache(vmList interface{}) []cloudprovider.Instance { - instances := []cloudprovider.Instance{} - - switch vms := vmList.(type) { - case []compute.VirtualMachineScaleSetVM: - for _, vm := range vms { - powerState := vmPowerStateRunning - if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { - powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses) - } - addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) - } - case []compute.VirtualMachine: - for _, vm := range vms { - powerState := vmPowerStateRunning - if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { - powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses) - } - addInstanceToCache(&instances, vm.ID, vm.ProvisioningState, powerState) +// buildInstanceCacheForFlex used by orchestrationMode == compute.Flexible +func buildInstanceCacheForFlex(vms []compute.VirtualMachine) []cloudprovider.Instance { + var instances []cloudprovider.Instance + for _, vm := range vms { + powerState := vmPowerStateRunning + if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { + powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses) } + addVMToCache(&instances, vm.ID, vm.ProvisioningState, powerState) } return instances } -func addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisioningState *string, powerState string) { +// addVMToCache used by orchestrationMode == compute.Flexible +func addVMToCache(instances *[]cloudprovider.Instance, id, provisioningState *string, powerState string) { // The resource ID is empty string, which indicates the instance may be in deleting state. if len(*id) == 0 { return @@ -731,41 +775,19 @@ func addInstanceToCache(instances *[]cloudprovider.Instance, id *string, provisi } *instances = append(*instances, cloudprovider.Instance{ - Id: "azure://" + resourceID, + Id: azurePrefix + resourceID, Status: instanceStatusFromProvisioningStateAndPowerState(resourceID, provisioningState, powerState), }) } -func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool) { - scaleSet.instanceMutex.Lock() - defer scaleSet.instanceMutex.Unlock() - for _, instance := range scaleSet.instanceCache { - if instance.Id == providerID { - return instance, true - } - } - return cloudprovider.Instance{}, false -} - -func (scaleSet *ScaleSet) setInstanceStatusByProviderID(providerID string, status cloudprovider.InstanceStatus) { - scaleSet.instanceMutex.Lock() - defer scaleSet.instanceMutex.Unlock() - for k, instance := range scaleSet.instanceCache { - if instance.Id == providerID { - klog.V(5).Infof("Setting instance %s status to %v", instance.Id, status) - scaleSet.instanceCache[k].Status = &status - } - } - scaleSet.lastInstanceRefresh = time.Now() -} - -// instanceStatusFromProvisioningStateAndPowerState converts the VM provisioning state and power state to cloudprovider.InstanceStatus -func instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisioningState *string, powerState string) *cloudprovider.InstanceStatus { +// instanceStatusFromProvisioningStateAndPowerState converts the VM provisioning state to cloudprovider.InstanceStatus +// instanceStatusFromProvisioningStateAndPowerState used by orchestrationMode == compute.Flexible +func instanceStatusFromProvisioningStateAndPowerState(resourceID string, provisioningState *string, powerState string) *cloudprovider.InstanceStatus { if provisioningState == nil { return nil } - klog.V(5).Infof("Getting vm instance provisioning state %s for %s", *provisioningState, resourceId) + klog.V(5).Infof("Getting vm instance provisioning state %s for %s", *provisioningState, resourceID) status := &cloudprovider.InstanceStatus{} switch *provisioningState { @@ -779,7 +801,7 @@ func instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisi // ProvisioningState represents the most recent provisioning state, therefore only report // InstanceCreating errors when the power state indicates the instance has not yet started running if !isRunningVmPowerState(powerState) { - klog.V(4).Infof("VM %s reports failed provisioning state with non-running power state: %s", resourceId, powerState) + klog.V(4).Infof("VM %s reports failed provisioning state with non-running power state: %s", resourceID, powerState) status.State = cloudprovider.InstanceCreating status.ErrorInfo = &cloudprovider.InstanceErrorInfo{ ErrorClass: cloudprovider.OutOfResourcesErrorClass, @@ -787,7 +809,7 @@ func instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisi ErrorMessage: "Azure failed to provision a node for this node group", } } else { - klog.V(5).Infof("VM %s reports a failed provisioning state but is running (%s)", resourceId, powerState) + klog.V(5).Infof("VM %s reports a failed provisioning state but is running (%s)", resourceID, powerState) status.State = cloudprovider.InstanceRunning } default: @@ -797,11 +819,10 @@ func instanceStatusFromProvisioningStateAndPowerState(resourceId string, provisi return status } -func (scaleSet *ScaleSet) invalidateInstanceCache() { - scaleSet.instanceMutex.Lock() - // Set the instanceCache as outdated. - scaleSet.lastInstanceRefresh = time.Now().Add(-1 * scaleSet.instancesRefreshPeriod) - scaleSet.instanceMutex.Unlock() +func isSpot(vmss *compute.VirtualMachineScaleSet) bool { + return vmss != nil && vmss.VirtualMachineScaleSetProperties != nil && + vmss.VirtualMachineScaleSetProperties.VirtualMachineProfile != nil && + vmss.VirtualMachineScaleSetProperties.VirtualMachineProfile.Priority == compute.Spot } func (scaleSet *ScaleSet) invalidateLastSizeRefreshWithLock() { @@ -819,6 +840,42 @@ func (scaleSet *ScaleSet) getOrchestrationMode() (compute.OrchestrationMode, err return vmss.OrchestrationMode, nil } -func isOperationNotAllowed(rerr *retry.Error) bool { - return rerr != nil && rerr.ServiceErrorCode() == retry.OperationNotAllowed +func (scaleSet *ScaleSet) cseErrors(extensions *[]compute.VirtualMachineExtensionInstanceView) ([]string, bool) { + var errs []string + failed := false + if extensions != nil { + for _, extension := range *extensions { + if strings.EqualFold(to.String(extension.Name), vmssCSEExtensionName) && extension.Statuses != nil { + for _, status := range *extension.Statuses { + if status.Level == "Error" { + errs = append(errs, to.String(status.Message)) + failed = true + } + } + } + } + } + return errs, failed +} + +func (scaleSet *ScaleSet) getSKU() string { + vmssInfo, err := scaleSet.getVMSSFromCache() + if err != nil { + klog.Errorf("Failed to get information for VMSS (%q): %v", scaleSet.Name, err) + return "" + } + return to.String(vmssInfo.Sku.Name) +} + +func (scaleSet *ScaleSet) verifyNodeGroup(instance *azureRef, commonNgID string) error { + ng, err := scaleSet.manager.GetNodeGroupForInstance(instance) + if err != nil { + return err + } + + if !strings.EqualFold(ng.Id(), commonNgID) { + return fmt.Errorf("cannot delete instance (%s) which don't belong to the same Scale Set (%q)", + instance.Name, commonNgID) + } + return nil } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_instance_cache.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_instance_cache.go new file mode 100644 index 000000000000..5b6843caf412 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_instance_cache.go @@ -0,0 +1,260 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "fmt" + "sync" + "time" + + "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute" + "github.com/Azure/go-autorest/autorest/to" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/klog/v2" +) + +/* +- "instanceCache" is included in the scaleSet data structures and holds +status information of the instances / vms. This data is used by the CAS +to make scaleUp / scaleDown decisions based on what is the current state +the cluster without making an api call. +- The time for this cache is represented by "instancesRefreshPeriod" which +by default is defaultVmssInstancesRefreshPeriod ~ 5 mins. +- "lastInstanceRefresh" represents the time when the cache was validated +the last time. +- Following methods are defined related to the instanceCache: + - invalidateInstanceCache() + - validateInstanceCache() + - validateInstanceCacheWithoutLock() + - updateInstanceCache() + - getInstanceByProviderID() + - getInstancesByState() + - getInstanceCacheSize() + - setInstanceStatusByProviderID() + - setInstanceStatusByProviderID() +*/ + +// InstanceCache tracks the VMs in the ScaleSet, in the form of corresponding cloudprovider.Instances. +// This struct also contains related locks and cache interval variables. +type InstanceCache struct { + // instanceCache tracks the VMs in the ScaleSet, in the form of corresponding cloudprovider.Instances. + // instanceCache directly backs the efficient response to NodeGroup.Nodes(), implemented by ScaleSet.Nodes(). + // It is periodially updated from VMSS using virtualMachineScaleSetVMsClient.List(). + instanceCache []cloudprovider.Instance + // instancesRefreshPeriod is how often instance cache is refreshed from VMSS. + // (Set from VmssVmsCacheTTL or defaultVmssInstancesRefreshPeriod = 5min) + instancesRefreshPeriod time.Duration + // lastInstanceRefresh is the time instanceCache was last refreshed from VMSS. + // Together with instancesRefreshPeriod, it is used to determine if it is time to refresh instanceCache. + lastInstanceRefresh time.Time + // instancesRefreshJitter (in seconds) is used to ensure refreshes (which involve expensive List call) + // don't happen at exactly the same time on all ScaleSets + instancesRefreshJitter int + // instanceMutex is used for protecting instance cache from concurrent access + instanceMutex sync.Mutex +} + +// invalidateInstanceCache invalidates the instanceCache by modifying the lastInstanceRefresh. +func (scaleSet *ScaleSet) invalidateInstanceCache() { + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + // Set the instanceCache as outdated. + klog.V(3).Infof("invalidating instanceCache for %s", scaleSet.Name) + scaleSet.lastInstanceRefresh = time.Now().Add(-1 * scaleSet.instancesRefreshPeriod) +} + +// validateInstanceCache updates the instanceCache if it has expired. It acquires lock. +func (scaleSet *ScaleSet) validateInstanceCache() error { + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + return scaleSet.validateInstanceCacheWithoutLock() +} + +// validateInstanceCacheWithoutLock is used a helper function for validateInstanceCache, get and set methods. +func (scaleSet *ScaleSet) validateInstanceCacheWithoutLock() error { + if scaleSet.lastInstanceRefresh.Add(scaleSet.instancesRefreshPeriod).After(time.Now()) { + klog.V(3).Infof("validateInstanceCacheWithoutLock: no need to reset instance Cache for scaleSet %s", + scaleSet.Name) + return nil + } + + return scaleSet.updateInstanceCache() +} + +// updateInstanceCache forcefully updates the cache without checking the timer - lastInstanceRefresh. +// Caller is responsible for acquiring lock on the instanceCache. +func (scaleSet *ScaleSet) updateInstanceCache() error { + orchestrationMode, err := scaleSet.getOrchestrationMode() + if err != nil { + klog.Errorf("failed to get information for VMSS: %s, error: %v", scaleSet.Name, err) + return err + } + + if orchestrationMode == compute.Flexible { + if scaleSet.manager.config.EnableVmssFlex { + return scaleSet.buildScaleSetCacheForFlex() + } + return fmt.Errorf("vmss - %q with Flexible orchestration detected but 'enableVmssFlex' feature flag is turned off", scaleSet.Name) + } else if orchestrationMode == compute.Uniform { + return scaleSet.buildScaleSetCacheForUniform() + } + + return fmt.Errorf("failed to determine orchestration mode for vmss %q", scaleSet.Name) +} + +// getInstanceByProviderID returns instance from instanceCache if given providerID exists. +func (scaleSet *ScaleSet) getInstanceByProviderID(providerID string) (cloudprovider.Instance, bool, error) { + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + + err := scaleSet.validateInstanceCacheWithoutLock() + if err != nil { + klog.Errorf("getInstanceByProviderID: error validating instanceCache for providerID %s for scaleSet %s, err: %v", + providerID, scaleSet.Name, err) + return cloudprovider.Instance{}, false, err + } + + for _, instance := range scaleSet.instanceCache { + if instance.Id == providerID { + return instance, true, nil + } + } + return cloudprovider.Instance{}, false, nil +} + +// getInstancesByState returns list of instances with given state. +func (scaleSet *ScaleSet) getInstancesByState(state cloudprovider.InstanceState) ([]cloudprovider.Instance, error) { + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + + err := scaleSet.validateInstanceCacheWithoutLock() + if err != nil { + klog.Errorf("getInstancesByState: error validating instanceCache for state %d for scaleSet %s, "+ + "err: %v", state, scaleSet.Name, err) + return []cloudprovider.Instance{}, err + } + + instances := []cloudprovider.Instance{} + for _, instance := range scaleSet.instanceCache { + if instance.Status != nil && instance.Status.State == state { + instances = append(instances, instance) + } + } + return instances, nil +} + +// getInstanceCacheSize returns the size of the instanceCache. +func (scaleSet *ScaleSet) getInstanceCacheSize() (int64, error) { + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + + err := scaleSet.validateInstanceCacheWithoutLock() + if err != nil { + klog.Errorf("getInstanceCacheSize: error validating instanceCache for scaleSet: %s, "+ + "err: %v", scaleSet.Name, err) + return -1, err + } + + return int64(len(scaleSet.instanceCache)), nil +} + +// setInstanceStatusByProviderID sets the status for an instance with given providerID. +// It reset the cache if stale and sets the status by acquiring a lock. +func (scaleSet *ScaleSet) setInstanceStatusByProviderID(providerID string, status cloudprovider.InstanceStatus) { + scaleSet.instanceMutex.Lock() + defer scaleSet.instanceMutex.Unlock() + + err := scaleSet.validateInstanceCacheWithoutLock() + if err != nil { + klog.Errorf("setInstanceStatusByProviderID: error validating instanceCache for providerID %s for "+ + "scaleSet: %s, err: %v", providerID, scaleSet.Name, err) + // return no error because CAS runs with the expectation that future runs will refresh instance Cache + } + + for k, instance := range scaleSet.instanceCache { + if instance.Id == providerID { + klog.V(3).Infof("setInstanceStatusByProviderID: setting instance state for %s for scaleSet "+ + "%s to %d", instance.Id, scaleSet.Name, status.State) + scaleSet.instanceCache[k].Status = &status + break + } + } +} + +// instanceStatusFromVM converts the VM provisioning state to cloudprovider.InstanceStatus. +func (scaleSet *ScaleSet) instanceStatusFromVM(vm *compute.VirtualMachineScaleSetVM) *cloudprovider.InstanceStatus { + // Prefer the proactive cache view of the instance state if we aren't in a terminal state + // This is because the power state may be taking longer to update and we don't want + // an unfortunate VM update (TTL 5 min) to reset that state to running. + if vm.ProvisioningState == nil || *vm.ProvisioningState == string(compute.GalleryProvisioningStateUpdating) { + resourceID, _ := convertResourceGroupNameToLower(*vm.ID) + providerID := azurePrefix + resourceID + for _, instance := range scaleSet.instanceCache { + if instance.Id == providerID { + return instance.Status + } + } + return nil + } + + status := &cloudprovider.InstanceStatus{} + switch *vm.ProvisioningState { + case string(compute.GalleryProvisioningStateDeleting): + status.State = cloudprovider.InstanceDeleting + case string(compute.GalleryProvisioningStateCreating): + status.State = cloudprovider.InstanceCreating + case string(compute.GalleryProvisioningStateFailed): + powerState := vmPowerStateRunning + if vm.InstanceView != nil && vm.InstanceView.Statuses != nil { + powerState = vmPowerStateFromStatuses(*vm.InstanceView.Statuses) + } + + // Provisioning can fail both during instance creation or after the instance is running. + // Per https://learn.microsoft.com/en-us/azure/virtual-machines/states-billing#provisioning-states, + // ProvisioningState represents the most recent provisioning state, therefore only report + // InstanceCreating errors when the power state indicates the instance has not yet started running + if !isRunningVmPowerState(powerState) { + klog.V(4).Infof("VM %s reports failed provisioning state with non-running power state: %s", *vm.ID, powerState) + status.State = cloudprovider.InstanceCreating + status.ErrorInfo = &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OutOfResourcesErrorClass, + ErrorCode: "provisioning-state-failed", + ErrorMessage: "Azure failed to provision a node for this node group", + } + } else { + klog.V(5).Infof("VM %s reports a failed provisioning state but is running (%s)", *vm.ID, powerState) + status.State = cloudprovider.InstanceRunning + } + default: + status.State = cloudprovider.InstanceRunning + } + + // Add vmssCSE Provisioning Failed Message in error info body for vmssCSE Extensions if enableDetailedCSEMessage is true + if scaleSet.enableDetailedCSEMessage && vm.InstanceView != nil { + if err, failed := scaleSet.cseErrors(vm.InstanceView.Extensions); failed { + errorInfo := &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OtherErrorClass, + ErrorCode: vmssExtensionProvisioningFailed, + ErrorMessage: fmt.Sprintf("%s: %v", to.String(vm.Name), err), + } + status.ErrorInfo = errorInfo + } + } + + return status +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_instance_cache_test.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_instance_cache_test.go new file mode 100644 index 000000000000..f491897a8041 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_instance_cache_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "fmt" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + + "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmssvmclient/mockvmssvmclient" +) + +var ( + ctrl *gomock.Controller + currentTime, expiredTime time.Time + provider *AzureCloudProvider + scaleSet *ScaleSet + mockVMSSVMClient *mockvmssvmclient.MockInterface + expectedVMSSVMs []compute.VirtualMachineScaleSetVM + expectedStates []cloudprovider.InstanceState + instanceCache, expectedInstanceCache []cloudprovider.Instance +) + +func testGetInstanceCacheWithStates(t *testing.T, vms []compute.VirtualMachineScaleSetVM, + states []cloudprovider.InstanceState) []cloudprovider.Instance { + assert.Equal(t, len(vms), len(states)) + var instanceCacheTest []cloudprovider.Instance + for i := 0; i < len(vms); i++ { + instanceCacheTest = append(instanceCacheTest, cloudprovider.Instance{ + Id: azurePrefix + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, i), + Status: &cloudprovider.InstanceStatus{State: states[i]}, + }) + } + return instanceCacheTest +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go index cda0c77bb230..f3aa0dc356d8 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go @@ -20,12 +20,14 @@ import ( "fmt" "net/http" "testing" + "time" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2022-08-01/compute" "github.com/Azure/go-autorest/autorest/to" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmclient/mockvmclient" "sigs.k8s.io/cloud-provider-azure/pkg/azureclients/vmssclient/mockvmssclient" @@ -131,7 +133,7 @@ func newApiNode(orchmode compute.OrchestrationMode, vmID int64) *apiv1.Node { node := &apiv1.Node{ Spec: apiv1.NodeSpec{ - ProviderID: "azure://" + fmt.Sprintf(providerId, vmID), + ProviderID: azurePrefix + fmt.Sprintf(providerId, vmID), }, } return node @@ -167,47 +169,70 @@ func TestTargetSize(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() + orchestrationModes := [2]compute.OrchestrationMode{compute.Uniform, compute.Flexible} expectedScaleSets := newTestVMSSList(3, "test-asg", "eastus", compute.Uniform) spotScaleSet := newTestVMSSList(5, "spot-vmss", "eastus", compute.Uniform)[0] - spotScaleSet.VirtualMachineProfile = &compute.VirtualMachineScaleSetVMProfile{ - Priority: compute.Spot, - } + spotScaleSet.VirtualMachineProfile = &compute.VirtualMachineScaleSetVMProfile{Priority: compute.Spot} expectedScaleSets = append(expectedScaleSets, spotScaleSet) - provider := newTestProvider(t) - mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) - mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes() - provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient - mockVMClient := mockvmclient.NewMockInterface(ctrl) - mockVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return([]compute.VirtualMachine{}, nil).AnyTimes() - provider.azureManager.azClient.virtualMachinesClient = mockVMClient + expectedVMSSVMs := newTestVMSSVMList(3) + expectedVMs := newTestVMList(3) - err := provider.azureManager.forceRefresh() - assert.NoError(t, err) + for _, orchMode := range orchestrationModes { + provider := newTestProvider(t) + mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) + mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes() + provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient + mockVMClient := mockvmclient.NewMockInterface(ctrl) + mockVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return([]compute.VirtualMachine{}, nil).AnyTimes() + provider.azureManager.azClient.virtualMachinesClient = mockVMClient - // non-spot nodepool - registered := provider.azureManager.RegisterNodeGroup( - newTestScaleSet(provider.azureManager, "test-asg")) - assert.True(t, registered) - assert.Equal(t, len(provider.NodeGroups()), 1) + // return a different capacity from GET API + spotScaleSet.Sku.Capacity = to.Int64Ptr(1) + mockVMSSClient.EXPECT().Get(gomock.Any(), provider.azureManager.config.ResourceGroup, "spot-vmss").Return(spotScaleSet, nil).Times(1) + provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient + mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) - targetSize, err := provider.NodeGroups()[0].TargetSize() - assert.NoError(t, err) - assert.Equal(t, 3, targetSize) + mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() + provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient + err := provider.azureManager.forceRefresh() + assert.NoError(t, err) - // Register a spot nodepool - spotregistered := provider.azureManager.RegisterNodeGroup( - newTestScaleSet(provider.azureManager, "spot-vmss")) - assert.True(t, spotregistered) - assert.Equal(t, len(provider.NodeGroups()), 2) + if orchMode == compute.Uniform { + mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) + mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() + provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient + } else { + provider.azureManager.config.EnableVmssFlex = true + mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(expectedVMs, nil).AnyTimes() + } - // mock getvmss call for spotnode pool returning different capacity - spotScaleSet.Sku.Capacity = to.Int64Ptr(1) - mockVMSSClient.EXPECT().Get(gomock.Any(), provider.azureManager.config.ResourceGroup, "spot-vmss").Return(spotScaleSet, nil).Times(1) + err = provider.azureManager.forceRefresh() + assert.NoError(t, err) - targetSize, err = provider.NodeGroups()[1].TargetSize() - assert.NoError(t, err) - assert.Equal(t, 1, targetSize) + registered := provider.azureManager.RegisterNodeGroup( + newTestScaleSet(provider.azureManager, testASG)) + assert.True(t, registered) + assert.Equal(t, len(provider.NodeGroups()), 1) + + targetSize, err := provider.NodeGroups()[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 3, targetSize) + + targetSize, err = provider.NodeGroups()[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 3, targetSize) + + // With a spot nodegroup + spotNodeGroup := newTestScaleSet(provider.azureManager, "spot-vmss") + registered = provider.azureManager.RegisterNodeGroup(spotNodeGroup) + assert.True(t, registered) + assert.Equal(t, len(provider.NodeGroups()), 2) + + targetSize, err = provider.NodeGroups()[1].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 1, targetSize) + } } func TestIncreaseSize(t *testing.T) { @@ -216,19 +241,25 @@ func TestIncreaseSize(t *testing.T) { orchestrationModes := [2]compute.OrchestrationMode{compute.Uniform, compute.Flexible} - expectedVMSSVMs := newTestVMSSVMList(3) - expectedVMs := newTestVMList(3) - for _, orchMode := range orchestrationModes { - provider := newTestProvider(t) - expectedScaleSets := newTestVMSSList(3, testASG, testLocation, orchMode) + expectedScaleSets := newTestVMSSList(3, testASG, "eastus", orchMode) + expectedVMSSVMs := newTestVMSSVMList(3) + expectedVMs := newTestVMList(3) // Include Edge Zone scenario here, testing scale from 3 to 5 and scale from zero cases. expectedEdgeZoneScaleSets := newTestVMSSListForEdgeZones(3, "edgezone-vmss") expectedEdgeZoneMinZeroScaleSets := newTestVMSSListForEdgeZones(0, "edgezone-minzero-vmss") expectedScaleSets = append(expectedScaleSets, *expectedEdgeZoneScaleSets, *expectedEdgeZoneMinZeroScaleSets) + provider := newTestProvider(t) + // expectedScaleSets := newTestVMSSList(3, testASG, testLocation, orchMode) + + // // Include Edge Zone scenario here, testing scale from 3 to 5 and scale from zero cases. + // expectedEdgeZoneScaleSets := newTestVMSSListForEdgeZones(3, "edgezone-vmss") + // expectedEdgeZoneMinZeroScaleSets := newTestVMSSListForEdgeZones(0, "edgezone-minzero-vmss") + // expectedScaleSets = append(expectedScaleSets, *expectedEdgeZoneScaleSets, *expectedEdgeZoneMinZeroScaleSets) + mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) mockVMSSClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes() mockVMSSClient.EXPECT().CreateOrUpdateAsync(gomock.Any(), provider.azureManager.config.ResourceGroup, testASG, gomock.Any()).Return(nil, nil) @@ -244,12 +275,11 @@ func TestIncreaseSize(t *testing.T) { if orchMode == compute.Uniform { mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) - mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, testASG, gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() + mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient } else { - provider.azureManager.config.EnableVmssFlex = true - mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), testASG).Return(expectedVMs, nil).AnyTimes() + mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(expectedVMs, nil).AnyTimes() } err := provider.azureManager.forceRefresh() assert.NoError(t, err) @@ -320,26 +350,35 @@ func TestIncreaseSize(t *testing.T) { } } +// TestIncreaseSizeOnVMProvisioningFailed has been tweeked only for Uniform Orchestration mode. +// If ProvisioningState == failed and power state is not running, Status.State == InstanceCreating with errorInfo populated. func TestIncreaseSizeOnVMProvisioningFailed(t *testing.T) { testCases := map[string]struct { - expectInstanceRunning bool - isMissingInstanceView bool - statuses []compute.InstanceViewStatus + expectInstanceRunning bool + isMissingInstanceView bool + statuses []compute.InstanceViewStatus + expectErrorInfoPopulated bool }{ - "out of resources when no power state exists": {}, + "out of resources when no power state exists": { + expectErrorInfoPopulated: true, + }, "out of resources when VM is stopped": { - statuses: []compute.InstanceViewStatus{{Code: to.StringPtr(vmPowerStateStopped)}}, + statuses: []compute.InstanceViewStatus{{Code: to.StringPtr(vmPowerStateStopped)}}, + expectErrorInfoPopulated: true, }, "out of resources when VM reports invalid power state": { - statuses: []compute.InstanceViewStatus{{Code: to.StringPtr("PowerState/invalid")}}, + statuses: []compute.InstanceViewStatus{{Code: to.StringPtr("PowerState/invalid")}}, + expectErrorInfoPopulated: true, }, "instance running when power state is running": { - expectInstanceRunning: true, - statuses: []compute.InstanceViewStatus{{Code: to.StringPtr(vmPowerStateRunning)}}, + expectInstanceRunning: true, + statuses: []compute.InstanceViewStatus{{Code: to.StringPtr(vmPowerStateRunning)}}, + expectErrorInfoPopulated: false, }, "instance running if instance view cannot be retrieved": { - expectInstanceRunning: true, - isMissingInstanceView: true, + expectInstanceRunning: true, + isMissingInstanceView: true, + expectErrorInfoPopulated: false, }, } for testName, testCase := range testCases { @@ -352,6 +391,7 @@ func TestIncreaseSizeOnVMProvisioningFailed(t *testing.T) { expectedScaleSets := newTestVMSSList(3, "vmss-failed-upscale", "eastus", compute.Uniform) expectedVMSSVMs := newTestVMSSVMList(3) + // The failed state is important line of code here expectedVMs := newTestVMList(3) expectedVMSSVMs[2].ProvisioningState = to.StringPtr(provisioningStateFailed) if !testCase.isMissingInstanceView { @@ -366,9 +406,11 @@ func TestIncreaseSizeOnVMProvisioningFailed(t *testing.T) { mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) mockVMSSVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup, "vmss-failed-upscale", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() manager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient + mockVMClient := mockvmclient.NewMockInterface(ctrl) mockVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedVMs, nil).AnyTimes() manager.azClient.virtualMachinesClient = mockVMClient + manager.explicitlyConfigured["vmss-failed-upscale"] = true registered := manager.RegisterNodeGroup(newTestScaleSet(manager, vmssName)) assert.True(t, registered) @@ -388,11 +430,12 @@ func TestIncreaseSizeOnVMProvisioningFailed(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 3, len(nodes)) - if testCase.expectInstanceRunning { - assert.Equal(t, cloudprovider.InstanceRunning, nodes[2].Status.State) - } else { + + assert.Equal(t, testCase.expectErrorInfoPopulated, nodes[2].Status.ErrorInfo != nil) + if testCase.expectErrorInfoPopulated { assert.Equal(t, cloudprovider.InstanceCreating, nodes[2].Status.State) - assert.Equal(t, cloudprovider.OutOfResourcesErrorClass, nodes[2].Status.ErrorInfo.ErrorClass) + } else { + assert.Equal(t, cloudprovider.InstanceRunning, nodes[2].Status.State) } }) } @@ -413,7 +456,7 @@ func TestIncreaseSizeOnVMSSUpdating(t *testing.T) { Capacity: &vmssCapacity, }, VirtualMachineScaleSetProperties: &compute.VirtualMachineScaleSetProperties{ - ProvisioningState: to.StringPtr(provisioningStateUpdating), + ProvisioningState: to.StringPtr(string(compute.GalleryProvisioningStateUpdating)), OrchestrationMode: compute.Uniform, }, }, @@ -422,16 +465,21 @@ func TestIncreaseSizeOnVMSSUpdating(t *testing.T) { mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil) - mockVMSSClient.EXPECT().CreateOrUpdateAsync(gomock.Any(), manager.config.ResourceGroup, vmssName, gomock.Any()).Return(nil, nil) - mockVMSSClient.EXPECT().WaitForCreateOrUpdateResult(gomock.Any(), gomock.Any(), manager.config.ResourceGroup).Return(&http.Response{StatusCode: http.StatusOK}, nil).AnyTimes() + mockVMSSClient.EXPECT().CreateOrUpdateAsync(gomock.Any(), manager.config.ResourceGroup, vmssName, gomock.Any()).Return( + nil, nil) + mockVMSSClient.EXPECT().WaitForCreateOrUpdateResult(gomock.Any(), gomock.Any(), manager.config.ResourceGroup).Return( + &http.Response{StatusCode: http.StatusOK}, nil).AnyTimes() manager.azClient.virtualMachineScaleSetsClient = mockVMSSClient mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) - mockVMSSVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup, "vmss-updating", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() + mockVMSSVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup, "vmss-updating", + gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() manager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient manager.explicitlyConfigured["vmss-updating"] = true registered := manager.RegisterNodeGroup(newTestScaleSet(manager, vmssName)) assert.True(t, registered) - manager.Refresh() + + err := manager.Refresh() + assert.NoError(t, err) provider, err := BuildAzureCloudProvider(manager, nil) assert.NoError(t, err) @@ -452,7 +500,6 @@ func TestBelongs(t *testing.T) { expectedVMs := newTestVMList(3) for _, orchMode := range orchestrationModes { - expectedScaleSets := newTestVMSSList(3, "test-asg", "eastus", orchMode) provider := newTestProvider(t) mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) @@ -463,32 +510,30 @@ func TestBelongs(t *testing.T) { provider.azureManager.azClient.virtualMachinesClient = mockVMClient if orchMode == compute.Uniform { - mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient - } else { - provider.azureManager.config.EnableVmssFlex = true mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(expectedVMs, nil).AnyTimes() } registered := provider.azureManager.RegisterNodeGroup( - newTestScaleSet(provider.azureManager, "test-asg")) + newTestScaleSet(provider.azureManager, testASG)) assert.True(t, registered) scaleSet, ok := provider.NodeGroups()[0].(*ScaleSet) assert.True(t, ok) provider.azureManager.explicitlyConfigured["test-asg"] = true - provider.azureManager.Refresh() + err := provider.azureManager.Refresh() + assert.NoError(t, err) invalidNode := &apiv1.Node{ Spec: apiv1.NodeSpec{ - ProviderID: "azure:///subscriptions/test-subscrition-id/resourcegroups/invalid-asg/providers/microsoft.compute/virtualmachinescalesets/agents/virtualmachines/0", + ProviderID: azurePrefix + "/subscriptions/test-subscrition-id/resourcegroups/invalid-asg/providers/microsoft.compute/virtualmachinescalesets/agents/virtualmachines/0", }, } - _, err := scaleSet.Belongs(invalidNode) + _, err = scaleSet.Belongs(invalidNode) assert.Error(t, err) validNode := newApiNode(orchMode, 0) @@ -496,7 +541,6 @@ func TestBelongs(t *testing.T) { assert.Equal(t, true, belongs) assert.NoError(t, err) } - } func TestDeleteNodes(t *testing.T) { @@ -561,7 +605,7 @@ func TestDeleteNodes(t *testing.T) { } else { manager.config.EnableVmssFlex = true mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(expectedVMs, nil).AnyTimes() - + manager.azClient.virtualMachinesClient = mockVMClient } err := manager.forceRefresh() @@ -575,8 +619,8 @@ func TestDeleteNodes(t *testing.T) { assert.NoError(t, err) registered := manager.RegisterNodeGroup( - newTestScaleSet(manager, "test-asg")) - manager.explicitlyConfigured["test-asg"] = true + newTestScaleSet(manager, testASG)) + manager.explicitlyConfigured[testASG] = true assert.True(t, registered) err = manager.forceRefresh() @@ -621,12 +665,18 @@ func TestDeleteNodes(t *testing.T) { assert.Equal(t, 1, targetSize) // Ensure that the status for the instances is Deleting - instance0, found := scaleSet.getInstanceByProviderID(nodesToDelete[0].Spec.ProviderID) + // lastInstanceRefresh is set to time.Now() to avoid resetting instanceCache. + scaleSet.lastInstanceRefresh = time.Now() + instance0, found, err := scaleSet.getInstanceByProviderID(nodesToDelete[0].Spec.ProviderID) assert.True(t, found, true) + assert.NoError(t, err) assert.Equal(t, instance0.Status.State, cloudprovider.InstanceDeleting) - instance2, found := scaleSet.getInstanceByProviderID(nodesToDelete[1].Spec.ProviderID) + // lastInstanceRefresh is set to time.Now() to avoid resetting instanceCache. + scaleSet.lastInstanceRefresh = time.Now() + instance2, found, err := scaleSet.getInstanceByProviderID(nodesToDelete[1].Spec.ProviderID) assert.True(t, found, true) + assert.NoError(t, err) assert.Equal(t, instance2.Status.State, cloudprovider.InstanceDeleting) } } @@ -681,15 +731,14 @@ func TestDeleteNodeUnregistered(t *testing.T) { mockVMSSClient.EXPECT().WaitForDeleteInstancesResult(gomock.Any(), gomock.Any(), manager.config.ResourceGroup).Return(&http.Response{StatusCode: http.StatusOK}, nil).AnyTimes() manager.azClient.virtualMachineScaleSetsClient = mockVMSSClient mockVMClient := mockvmclient.NewMockInterface(ctrl) - mockVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return([]compute.VirtualMachine{}, nil).AnyTimes() + mockVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedVMs, nil).AnyTimes() manager.azClient.virtualMachinesClient = mockVMClient - if orchMode == compute.Uniform { + if orchMode == compute.Uniform { mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) mockVMSSVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() manager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient } else { - manager.config.EnableVmssFlex = true mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(expectedVMs, nil).AnyTimes() } @@ -699,18 +748,20 @@ func TestDeleteNodeUnregistered(t *testing.T) { resourceLimiter := cloudprovider.NewResourceLimiter( map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}) + provider, err := BuildAzureCloudProvider(manager, resourceLimiter) assert.NoError(t, err) registered := manager.RegisterNodeGroup( - newTestScaleSet(manager, "test-asg")) - manager.explicitlyConfigured["test-asg"] = true + newTestScaleSet(manager, testASG)) + manager.explicitlyConfigured[testASG] = true assert.True(t, registered) err = manager.forceRefresh() assert.NoError(t, err) scaleSet, ok := provider.NodeGroups()[0].(*ScaleSet) assert.True(t, ok) + scaleSet.instancesRefreshPeriod = defaultVmssInstancesRefreshPeriod targetSize, err := scaleSet.TargetSize() assert.NoError(t, err) @@ -734,10 +785,126 @@ func TestDeleteNodeUnregistered(t *testing.T) { assert.Equal(t, 2, targetSize) // Ensure that the status for the instances is Deleting - instance0, found := scaleSet.getInstanceByProviderID(nodesToDelete[0].Spec.ProviderID) + // lastInstanceRefresh is set to time.Now() to avoid resetting instanceCache. + scaleSet.lastInstanceRefresh = time.Now() + instance0, found, err := scaleSet.getInstanceByProviderID(nodesToDelete[0].Spec.ProviderID) assert.True(t, found, true) - assert.Equal(t, instance0.Status.State, cloudprovider.InstanceDeleting) + assert.NoError(t, err) + assert.Equal(t, cloudprovider.InstanceDeleting, instance0.Status.State) + } +} + +func TestDeleteInstancesWithForceDeleteEnabled(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + manager := newTestAzureManager(t) + // enabling forceDelete + manager.config.EnableForceDelete = true + + vmssName := "test-asg" + var vmssCapacity int64 = 3 + //hostGroupId := "test-hostGroup" + //hostGroup := &compute.SubResource{ + // ID: &hostGroupId, + //} + + expectedScaleSets := []compute.VirtualMachineScaleSet{ + { + Name: &vmssName, + Sku: &compute.Sku{ + Capacity: &vmssCapacity, + }, + VirtualMachineScaleSetProperties: &compute.VirtualMachineScaleSetProperties{ + OrchestrationMode: compute.Uniform, + }, + }, + } + expectedVMSSVMs := newTestVMSSVMList(3) + + mockVMSSClient := mockvmssclient.NewMockInterface(ctrl) + mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil).Times(2) + mockVMSSClient.EXPECT().DeleteInstancesAsync(gomock.Any(), manager.config.ResourceGroup, gomock.Any(), gomock.Any(), true).Return(nil, nil) + mockVMSSClient.EXPECT().WaitForDeleteInstancesResult(gomock.Any(), gomock.Any(), manager.config.ResourceGroup).Return(&http.Response{StatusCode: http.StatusOK}, nil).AnyTimes() + manager.azClient.virtualMachineScaleSetsClient = mockVMSSClient + mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) + mockVMSSVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() + manager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient + err := manager.forceRefresh() + assert.NoError(t, err) + + resourceLimiter := cloudprovider.NewResourceLimiter( + map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, + map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}) + provider, err := BuildAzureCloudProvider(manager, resourceLimiter) + assert.NoError(t, err) + + registered := manager.RegisterNodeGroup( + newTestScaleSet(manager, "test-asg")) + manager.explicitlyConfigured["test-asg"] = true + assert.True(t, registered) + err = manager.forceRefresh() + assert.NoError(t, err) + + scaleSet, ok := provider.NodeGroups()[0].(*ScaleSet) + assert.True(t, ok) + + targetSize, err := scaleSet.TargetSize() + assert.NoError(t, err) + assert.Equal(t, 3, targetSize) + + // Perform the delete operation + nodesToDelete := []*apiv1.Node{ + { + Spec: apiv1.NodeSpec{ + ProviderID: azurePrefix + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 0), + }, + }, + { + Spec: apiv1.NodeSpec{ + ProviderID: azurePrefix + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 2), + }, + }, + } + err = scaleSet.DeleteNodes(nodesToDelete) + assert.NoError(t, err) + vmssCapacity = 1 + expectedScaleSets = []compute.VirtualMachineScaleSet{ + { + Name: &vmssName, + Sku: &compute.Sku{ + Capacity: &vmssCapacity, + }, + VirtualMachineScaleSetProperties: &compute.VirtualMachineScaleSetProperties{ + OrchestrationMode: compute.Uniform, + }, + }, } + mockVMSSClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup).Return(expectedScaleSets, nil).AnyTimes() + expectedVMSSVMs[0].ProvisioningState = to.StringPtr(string(compute.GalleryProvisioningStateDeleting)) + expectedVMSSVMs[2].ProvisioningState = to.StringPtr(string(compute.GalleryProvisioningStateDeleting)) + mockVMSSVMClient.EXPECT().List(gomock.Any(), manager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() + err = manager.forceRefresh() + assert.NoError(t, err) + + // Ensure the the cached size has been proactively decremented by 2 + targetSize, err = scaleSet.TargetSize() + assert.NoError(t, err) + assert.Equal(t, 1, targetSize) + + // Ensure that the status for the instances is Deleting + // lastInstanceRefresh is set to time.Now() to avoid resetting instanceCache. + scaleSet.lastInstanceRefresh = time.Now() + instance0, found, err := scaleSet.getInstanceByProviderID(azurePrefix + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 0)) + assert.True(t, found, true) + assert.NoError(t, err) + assert.Equal(t, instance0.Status.State, cloudprovider.InstanceDeleting) + + // lastInstanceRefresh is set to time.Now() to avoid resetting instanceCache. + scaleSet.lastInstanceRefresh = time.Now() + instance2, found, err := scaleSet.getInstanceByProviderID(azurePrefix + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 2)) + assert.True(t, found, true) + assert.NoError(t, err) + assert.Equal(t, instance2.Status.State, cloudprovider.InstanceDeleting) } @@ -792,7 +959,7 @@ func TestDeleteNoConflictRequest(t *testing.T) { node := &apiv1.Node{ Spec: apiv1.NodeSpec{ - ProviderID: "azure://" + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 0), + ProviderID: azurePrefix + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 0), }, } @@ -830,7 +997,6 @@ func TestScaleSetNodes(t *testing.T) { expectedVMs := newTestVMList(3) for _, orchMode := range orchestrationModes { - expectedScaleSets := newTestVMSSList(3, "test-asg", "eastus", orchMode) provider := newTestProvider(t) @@ -842,7 +1008,6 @@ func TestScaleSetNodes(t *testing.T) { provider.azureManager.azClient.virtualMachinesClient = mockVMClient if orchMode == compute.Uniform { - mockVMSSVMClient := mockvmssvmclient.NewMockInterface(ctrl) mockVMSSVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup, "test-asg", gomock.Any()).Return(expectedVMSSVMs, nil).AnyTimes() provider.azureManager.azClient.virtualMachineScaleSetVMsClient = mockVMSSVMClient @@ -855,7 +1020,9 @@ func TestScaleSetNodes(t *testing.T) { registered := provider.azureManager.RegisterNodeGroup( newTestScaleSet(provider.azureManager, "test-asg")) provider.azureManager.explicitlyConfigured["test-asg"] = true - provider.azureManager.Refresh() + err := provider.azureManager.Refresh() + assert.NoError(t, err) + assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) @@ -863,11 +1030,12 @@ func TestScaleSetNodes(t *testing.T) { group, err := provider.NodeGroupForNode(node) assert.NoError(t, err) assert.NotNil(t, group, "Group should not be nil") - assert.Equal(t, group.Id(), "test-asg") + assert.Equal(t, group.Id(), testASG) assert.Equal(t, group.MinSize(), 1) assert.Equal(t, group.MaxSize(), 5) ss, ok := group.(*ScaleSet) + ss.lastInstanceRefresh = time.Now() assert.True(t, ok) assert.NotNil(t, ss) instances, err := group.Nodes() @@ -876,14 +1044,14 @@ func TestScaleSetNodes(t *testing.T) { if orchMode == compute.Uniform { - assert.Equal(t, instances[0], cloudprovider.Instance{Id: "azure://" + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 0)}) - assert.Equal(t, instances[1], cloudprovider.Instance{Id: "azure://" + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 1)}) - assert.Equal(t, instances[2], cloudprovider.Instance{Id: "azure://" + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 2)}) + assert.Equal(t, instances[0], cloudprovider.Instance{Id: azurePrefix + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 0)}) + assert.Equal(t, instances[1], cloudprovider.Instance{Id: azurePrefix + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 1)}) + assert.Equal(t, instances[2], cloudprovider.Instance{Id: azurePrefix + fmt.Sprintf(fakeVirtualMachineScaleSetVMID, 2)}) } else { - assert.Equal(t, instances[0], cloudprovider.Instance{Id: "azure://" + fmt.Sprintf(fakeVirtualMachineVMID, 0)}) - assert.Equal(t, instances[1], cloudprovider.Instance{Id: "azure://" + fmt.Sprintf(fakeVirtualMachineVMID, 1)}) - assert.Equal(t, instances[2], cloudprovider.Instance{Id: "azure://" + fmt.Sprintf(fakeVirtualMachineVMID, 2)}) + assert.Equal(t, instances[0], cloudprovider.Instance{Id: azurePrefix + fmt.Sprintf(fakeVirtualMachineVMID, 0)}) + assert.Equal(t, instances[1], cloudprovider.Instance{Id: azurePrefix + fmt.Sprintf(fakeVirtualMachineVMID, 1)}) + assert.Equal(t, instances[2], cloudprovider.Instance{Id: azurePrefix + fmt.Sprintf(fakeVirtualMachineVMID, 2)}) } } @@ -904,13 +1072,14 @@ func TestEnableVmssFlexFlag(t *testing.T) { provider.azureManager.config.EnableVmssFlex = false provider.azureManager.azClient.virtualMachineScaleSetsClient = mockVMSSClient mockVMClient := mockvmclient.NewMockInterface(ctrl) - mockVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return([]compute.VirtualMachine{}, nil).AnyTimes() - mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), "test-asg").Return(expectedVMs, nil).AnyTimes() + + mockVMClient.EXPECT().List(gomock.Any(), provider.azureManager.config.ResourceGroup).Return(expectedVMs, nil).AnyTimes() + mockVMClient.EXPECT().ListVmssFlexVMsWithoutInstanceView(gomock.Any(), testASG).Return(expectedVMs, nil).AnyTimes() provider.azureManager.azClient.virtualMachinesClient = mockVMClient provider.azureManager.RegisterNodeGroup( - newTestScaleSet(provider.azureManager, "test-asg")) - provider.azureManager.explicitlyConfigured["test-asg"] = true + newTestScaleSet(provider.azureManager, testASG)) + provider.azureManager.explicitlyConfigured[testASG] = true err := provider.azureManager.Refresh() assert.Error(t, err, "vmss - \"test-asg\" with Flexible orchestration detected but 'enbaleVmssFlex' feature flag is turned off") @@ -939,19 +1108,29 @@ func TestTemplateNodeInfo(t *testing.T) { assert.Equal(t, len(provider.NodeGroups()), 1) asg := ScaleSet{ - manager: provider.azureManager, - minSize: 1, - maxSize: 5, - enableDynamicInstanceList: true, + manager: provider.azureManager, + minSize: 1, + maxSize: 5, } asg.Name = "test-asg" - nodeInfo, err := asg.TemplateNodeInfo() - assert.NoError(t, err) - assert.NotNil(t, nodeInfo) - assert.NotEmpty(t, nodeInfo.Pods) + t.Run("Checking fallback to static because dynamic list is empty", func(t *testing.T) { + asg.enableDynamicInstanceList = true + + nodeInfo, err := asg.TemplateNodeInfo() + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + assert.NotEmpty(t, nodeInfo.Pods) + }) + + // Properly testing dynamic SKU list through skewer is not possible, + // because there are no Resource API mocks included yet. + // Instead, the rest of the (consumer side) tests here + // override GetVMSSTypeDynamically and GetVMSSTypeStatically functions. t.Run("Checking dynamic workflow", func(t *testing.T) { + asg.enableDynamicInstanceList = true + GetVMSSTypeDynamically = func(template compute.VirtualMachineScaleSet, azCache *azureCache) (InstanceType, error) { vmssType := InstanceType{} vmssType.VCPU = 1 @@ -960,12 +1139,16 @@ func TestTemplateNodeInfo(t *testing.T) { return vmssType, nil } nodeInfo, err := asg.TemplateNodeInfo() + assert.Equal(t, *nodeInfo.Node().Status.Capacity.Cpu(), *resource.NewQuantity(1, resource.DecimalSI)) + assert.Equal(t, *nodeInfo.Node().Status.Capacity.Memory(), *resource.NewQuantity(3*1024*1024, resource.DecimalSI)) assert.NoError(t, err) assert.NotNil(t, nodeInfo) assert.NotEmpty(t, nodeInfo.Pods) }) t.Run("Checking static workflow if dynamic fails", func(t *testing.T) { + asg.enableDynamicInstanceList = true + GetVMSSTypeDynamically = func(template compute.VirtualMachineScaleSet, azCache *azureCache) (InstanceType, error) { return InstanceType{}, fmt.Errorf("dynamic error exists") } @@ -977,12 +1160,16 @@ func TestTemplateNodeInfo(t *testing.T) { return &vmssType, nil } nodeInfo, err := asg.TemplateNodeInfo() + assert.Equal(t, *nodeInfo.Node().Status.Capacity.Cpu(), *resource.NewQuantity(1, resource.DecimalSI)) + assert.Equal(t, *nodeInfo.Node().Status.Capacity.Memory(), *resource.NewQuantity(3*1024*1024, resource.DecimalSI)) assert.NoError(t, err) assert.NotNil(t, nodeInfo) assert.NotEmpty(t, nodeInfo.Pods) }) t.Run("Fails to find vmss instance information using static and dynamic workflow, instance not supported", func(t *testing.T) { + asg.enableDynamicInstanceList = true + GetVMSSTypeDynamically = func(template compute.VirtualMachineScaleSet, azCache *azureCache) (InstanceType, error) { return InstanceType{}, fmt.Errorf("dynamic error exists") } @@ -994,8 +1181,9 @@ func TestTemplateNodeInfo(t *testing.T) { assert.Equal(t, err, fmt.Errorf("static error exists")) }) - // Note: This test should be removed once enableDynamicInstanceList toggled is removed and the feature is completely enabled. - t.Run("Checking static workflow if enableDynamicInstanceList Toggle is false", func(t *testing.T) { + // Note: static-only workflow tests can be removed once support for dynamic is always on + + t.Run("Checking static-only workflow", func(t *testing.T) { asg.enableDynamicInstanceList = false GetVMSSTypeStatically = func(template compute.VirtualMachineScaleSet) (*InstanceType, error) { @@ -1005,9 +1193,69 @@ func TestTemplateNodeInfo(t *testing.T) { vmssType.MemoryMb = 3 return &vmssType, nil } + nodeInfo, err := asg.TemplateNodeInfo() + assert.Equal(t, *nodeInfo.Node().Status.Capacity.Cpu(), *resource.NewQuantity(1, resource.DecimalSI)) + assert.Equal(t, *nodeInfo.Node().Status.Capacity.Memory(), *resource.NewQuantity(3*1024*1024, resource.DecimalSI)) + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + assert.NotEmpty(t, nodeInfo.Pods) + }) + + t.Run("Checking static-only workflow with built-in SKU list", func(t *testing.T) { + asg.enableDynamicInstanceList = false + nodeInfo, err := asg.TemplateNodeInfo() assert.NoError(t, err) assert.NotNil(t, nodeInfo) assert.NotEmpty(t, nodeInfo.Pods) }) + +} +func TestCseErrors(t *testing.T) { + errorMessage := to.StringPtr("Error Message Test") + vmssVMs := compute.VirtualMachineScaleSetVM{ + Name: to.StringPtr("vmTest"), + ID: to.StringPtr(fakeVirtualMachineScaleSetVMID), + InstanceID: to.StringPtr("0"), + VirtualMachineScaleSetVMProperties: &compute.VirtualMachineScaleSetVMProperties{ + VMID: to.StringPtr("123E4567-E89B-12D3-A456-426655440000"), + ProvisioningState: to.StringPtr("Succeeded"), + InstanceView: &compute.VirtualMachineScaleSetVMInstanceView{ + Extensions: &[]compute.VirtualMachineExtensionInstanceView{ + { + Statuses: &[]compute.InstanceViewStatus{ + { + Level: "Error", + Message: errorMessage, + }, + }, + }, + }, + }, + }, + } + + manager := newTestAzureManager(t) + resourceLimiter := cloudprovider.NewResourceLimiter( + map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, + map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}) + provider, _ := BuildAzureCloudProvider(manager, resourceLimiter) + manager.RegisterNodeGroup( + newTestScaleSet(manager, "test-asg")) + manager.explicitlyConfigured["test-asg"] = true + scaleSet, _ := provider.NodeGroups()[0].(*ScaleSet) + + t.Run("getCSEErrorMessages test with CSE error in VM extensions", func(t *testing.T) { + expectedCSEWErrorMessage := "Error Message Test" + (*vmssVMs.InstanceView.Extensions)[0].Name = to.StringPtr(vmssCSEExtensionName) + actualCSEErrorMessage, actualCSEFailureBool := scaleSet.cseErrors(vmssVMs.InstanceView.Extensions) + assert.True(t, actualCSEFailureBool) + assert.Equal(t, []string{expectedCSEWErrorMessage}, actualCSEErrorMessage) + }) + t.Run("getCSEErrorMessages test with no CSE error in VM extensions", func(t *testing.T) { + (*vmssVMs.InstanceView.Extensions)[0].Name = to.StringPtr("notCSEExtension") + actualCSEErrorMessage, actualCSEFailureBool := scaleSet.cseErrors(vmssVMs.InstanceView.Extensions) + assert.False(t, actualCSEFailureBool) + assert.Equal(t, []string(nil), actualCSEErrorMessage) + }) }