diff --git a/cluster-autoscaler/cloudprovider/aws/README.md b/cluster-autoscaler/cloudprovider/aws/README.md index 7f7f777e7496..3b0872ae3bb6 100644 --- a/cluster-autoscaler/cloudprovider/aws/README.md +++ b/cluster-autoscaler/cloudprovider/aws/README.md @@ -48,10 +48,41 @@ discover](#auto-discovery-setup) EC2 Auto Scaling Groups **(recommended)**, add Launch Configuration) and/or `ec2:DescribeLaunchTemplateVersions` (if you created your ASG using a Launch Template) to the `Action` list. -If you prefer, you can restrict the target resources for the autoscaling actions -by specifying Auto Scaling Group ARNs in the `Resource` list of the policy. More -information can be found -[here](https://docs.aws.amazon.com/autoscaling/latest/userguide/control-access-using-iam.html#policy-auto-scaling-resources). +*NOTE:* The below policies/arguments to the Cluster Autoscaler need to be modified as appropriate +for the names of your ASGs, as well as account ID and AWS region before being used. + +The following policy provides the minimum privileges necessary for Cluster Autoscaler to run. +When using this policy, you cannot use autodiscovery of ASGs. In addition, it restricts the +IAM permissions to the node groups the Cluster Autoscaler is configured to scale. + +This in turn means that you must pass the following arguments to the Cluster Autoscaler +binary, replacing min and max node counts and the ASG: + +```bash +--aws-use-static-instance-list=false +--nodes=1:100:exampleASG1 +--nodes=1:100:exampleASG2 +``` + +```json +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "autoscaling:DescribeAutoScalingGroups", + "autoscaling:DescribeAutoScalingInstances", + "autoscaling:DescribeLaunchConfigurations", + "autoscaling:DescribeScalingActivities", + "autoscaling:SetDesiredCapacity", + "autoscaling:TerminateInstanceInAutoScalingGroup" + ], + "Resource": ["arn:aws:autoscaling:${YOUR_CLUSTER_AWS_REGION}:${YOUR_AWS_ACCOUNT_ID}:autoScalingGroup:*:autoScalingGroupName/${YOUR_ASG_NAME}"] + } + ] +} +``` ### Using OIDC Federated Authentication diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 5f66e808dbdc..cf719e23532f 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -67,7 +67,7 @@ type asg struct { minSize int maxSize int curSize int - lastUpdateTime *time.Time + lastUpdateTime time.Time AvailabilityZones []string LaunchConfigurationName string @@ -254,7 +254,7 @@ func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error { } // Proactively set the ASG size so autoscaler makes better decisions - asg.lastUpdateTime = &start + asg.lastUpdateTime = start asg.curSize = size return nil @@ -479,7 +479,6 @@ func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*aut func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) { input := &autoscaling.DescribeScalingActivitiesInput{ AutoScalingGroupName: group.AutoScalingGroupName, - MaxRecords: aws.Int64(1), // We only care about the most recent event } start := time.Now() @@ -489,12 +488,14 @@ func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) return true, err // If we can't describe the scaling activities we assume the node group is available } - if len(response.Activities) > 0 { - activity := response.Activities[0] + for _, activity := range response.Activities { asgRef := AwsRef{Name: *group.AutoScalingGroupName} if a, ok := m.registeredAsgs[asgRef]; ok { lut := a.lastUpdateTime - if lut != nil && activity.StartTime.After(*lut) && *activity.StatusCode == "Failed" { + if activity.StartTime.Before(lut) { + break + } else if *activity.StatusCode == "Failed" { + klog.Warningf("ASG %s scaling failed with %s", asgRef.Name, *activity) return false, nil } } else { diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go index d28519f29c0c..c5078b03b5b7 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups_test.go @@ -59,7 +59,7 @@ func TestCreatePlaceholders(t *testing.T) { name string desiredCapacity *int64 activities []*autoscaling.Activity - groupLastUpdateTime *time.Time + groupLastUpdateTime time.Time describeErr error asgToCheck *string }{ @@ -85,7 +85,7 @@ func TestCreatePlaceholders(t *testing.T) { StartTime: aws.Time(time.Unix(10, 0)), }, }, - groupLastUpdateTime: aws.Time(time.Unix(9, 0)), + groupLastUpdateTime: time.Unix(9, 0), }, { name: "AWS scaling failed event before CA scale_up", @@ -96,7 +96,7 @@ func TestCreatePlaceholders(t *testing.T) { StartTime: aws.Time(time.Unix(9, 0)), }, }, - groupLastUpdateTime: aws.Time(time.Unix(10, 0)), + groupLastUpdateTime: time.Unix(10, 0), }, { name: "asg not registered", @@ -107,7 +107,7 @@ func TestCreatePlaceholders(t *testing.T) { StartTime: aws.Time(time.Unix(10, 0)), }, }, - groupLastUpdateTime: aws.Time(time.Unix(9, 0)), + groupLastUpdateTime: time.Unix(9, 0), asgToCheck: aws.String("unregisteredAsgName"), }, } @@ -128,7 +128,6 @@ func TestCreatePlaceholders(t *testing.T) { if shouldCallDescribeScalingActivities { a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ AutoScalingGroupName: asgName, - MaxRecords: aws.Int64(1), }).Return( &autoscaling.DescribeScalingActivitiesOutput{Activities: tc.activities}, tc.describeErr, @@ -158,7 +157,7 @@ func TestCreatePlaceholders(t *testing.T) { } asgCache.createPlaceholdersForDesiredNonStartedInstances(groups) assert.Equal(t, int64(len(groups[0].Instances)), *tc.desiredCapacity) - if tc.activities != nil && *tc.activities[0].StatusCode == "Failed" && tc.activities[0].StartTime.After(*tc.groupLastUpdateTime) && asgName == registeredAsgName { + if tc.activities != nil && *tc.activities[0].StatusCode == "Failed" && tc.activities[0].StartTime.After(tc.groupLastUpdateTime) && asgName == registeredAsgName { assert.Equal(t, *groups[0].Instances[0].HealthStatus, placeholderUnfulfillableStatus) } else if len(groups[0].Instances) > 0 { assert.Equal(t, *groups[0].Instances[0].HealthStatus, "") diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 2753daabd2dc..e1d6a92a1ef9 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -466,7 +466,6 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ AutoScalingGroupName: aws.String("test-asg"), - MaxRecords: aws.Int64(1), }, ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go index 02c298201b32..295bbf0e92d2 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager_test.go @@ -394,7 +394,6 @@ func TestFetchExplicitAsgs(t *testing.T) { a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ AutoScalingGroupName: aws.String("coolasg"), - MaxRecords: aws.Int64(1), }, ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) @@ -559,7 +558,6 @@ func TestFetchAutoAsgs(t *testing.T) { a.On("DescribeScalingActivities", &autoscaling.DescribeScalingActivitiesInput{ AutoScalingGroupName: aws.String("coolasg"), - MaxRecords: aws.Int64(1), }, ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 0d60b6f2f356..9c0e1cb9d6bb 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -169,4 +169,25 @@ type AutoscalingOptions struct { DaemonSetEvictionForOccupiedNodes bool // User agent to use for HTTP calls. UserAgent string + // InitialNodeGroupBackoffDuration is the duration of first backoff after a new node failed to start + InitialNodeGroupBackoffDuration time.Duration + // MaxNodeGroupBackoffDuration is the maximum backoff duration for a NodeGroup after new nodes failed to start. + MaxNodeGroupBackoffDuration time.Duration + // NodeGroupBackoffResetTimeout is the time after last failed scale-up when the backoff duration is reset. + NodeGroupBackoffResetTimeout time.Duration + // MaxScaleDownParallelism is the maximum number of nodes (both empty and needing drain) that can be deleted in parallel. + MaxScaleDownParallelism int + // MaxDrainParallelism is the maximum number of nodes needing drain, that can be drained and deleted in parallel. + MaxDrainParallelism int + // GceExpanderEphemeralStorageSupport is whether scale-up takes ephemeral storage resources into account. + GceExpanderEphemeralStorageSupport bool + // RecordDuplicatedEvents controls whether events should be duplicated within a 5 minute window. + RecordDuplicatedEvents bool + // MaxNodesPerScaleUp controls how many nodes can be added in a single scale-up. + // Note that this is strictly a performance optimization aimed at limiting binpacking time, not a tool to rate-limit + // scale-up. There is nothing stopping CA from adding MaxNodesPerScaleUp every loop. + MaxNodesPerScaleUp int + // MaxNodeGroupBinpackingDuration is a maximum time that can be spent binpacking a single NodeGroup. If the threshold + // is exceeded binpacking will be cut short and a partial scale-up will be performed. + MaxNodeGroupBinpackingDuration time.Duration } diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 62216018769d..33650fec2892 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -110,7 +110,7 @@ func initializeDefaultOptions(opts *AutoscalerOptions) error { opts.ExpanderStrategy = expanderStrategy } if opts.EstimatorBuilder == nil { - estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName) + estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration)) if err != nil { return err } diff --git a/cluster-autoscaler/core/scale_test_common.go b/cluster-autoscaler/core/scale_test_common.go index 57a4e48d5200..c070ff385231 100644 --- a/cluster-autoscaler/core/scale_test_common.go +++ b/cluster-autoscaler/core/scale_test_common.go @@ -165,7 +165,7 @@ func NewScaleTestAutoscalingContext( } // Ignoring error here is safe - if a test doesn't specify valid estimatorName, // it either doesn't need one, or should fail when it turns out to be nil. - estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName) + estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(0, 0)) predicateChecker, err := simulator.NewTestPredicateChecker() if err != nil { return context.AutoscalingContext{}, err diff --git a/cluster-autoscaler/core/scale_up.go b/cluster-autoscaler/core/scale_up.go index 3e1f11017fcc..7f0113ff2555 100644 --- a/cluster-autoscaler/core/scale_up.go +++ b/cluster-autoscaler/core/scale_up.go @@ -312,7 +312,7 @@ func computeExpansionOption(context *context.AutoscalingContext, podEquivalenceG if len(option.Pods) > 0 { estimator := context.EstimatorBuilder(context.PredicateChecker, context.ClusterSnapshot) - option.NodeCount = estimator.Estimate(option.Pods, nodeInfo) + option.NodeCount, option.Pods = estimator.Estimate(option.Pods, nodeInfo, option.NodeGroup) } return option, nil diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 70ead85d1654..f48c67abcff8 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -351,7 +351,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError return nil } - if a.deleteCreatedNodesWithErrors() { + danglingNodes, err := a.deleteCreatedNodesWithErrors() + if err != nil { + klog.Warningf("Failed to remove nodes that were created with errors, skipping iteration: %v", err) + return nil + } + if danglingNodes { klog.V(0).Infof("Some nodes that failed to create were removed, skipping iteration") return nil } @@ -643,7 +648,7 @@ func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNod return removedAny, nil } -func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() bool { +func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() (bool, error) { // We always schedule deleting of incoming errornous nodes // TODO[lukaszos] Consider adding logic to not retry delete every loop iteration nodes := a.clusterStateRegistry.GetCreatedNodesWithErrors() @@ -661,6 +666,9 @@ func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() bool { klog.Warningf("Cannot determine nodeGroup for node %v; %v", id, err) continue } + if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { + return false, fmt.Errorf("node %s has no known nodegroup", node.GetName()) + } nodesToBeDeletedByNodeGroupId[nodeGroup.Id()] = append(nodesToBeDeletedByNodeGroupId[nodeGroup.Id()], node) } @@ -685,7 +693,7 @@ func (a *StaticAutoscaler) deleteCreatedNodesWithErrors() bool { a.clusterStateRegistry.InvalidateNodeInstancesCacheEntry(nodeGroup) } - return deletedAny + return deletedAny, nil } func (a *StaticAutoscaler) nodeGroupsById() map[string]cloudprovider.NodeGroup { diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index e51c433fd5b3..78cfbbce8693 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -923,7 +923,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock) } -func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) { +func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) { // setup provider := &mockprovider.CloudProvider{} @@ -1058,7 +1058,9 @@ func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) { clusterState.UpdateNodes([]*apiv1.Node{}, nil, now) // delete nodes with create errors - assert.True(t, autoscaler.deleteCreatedNodesWithErrors()) + removedNodes, err := autoscaler.deleteCreatedNodesWithErrors() + assert.True(t, removedNodes) + assert.NoError(t, err) // check delete was called on correct nodes nodeGroupA.AssertCalled(t, "DeleteNodes", mock.MatchedBy( @@ -1082,7 +1084,9 @@ func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) { clusterState.UpdateNodes([]*apiv1.Node{}, nil, now) // delete nodes with create errors - assert.True(t, autoscaler.deleteCreatedNodesWithErrors()) + removedNodes, err = autoscaler.deleteCreatedNodesWithErrors() + assert.True(t, removedNodes) + assert.NoError(t, err) // nodes should be deleted again nodeGroupA.AssertCalled(t, "DeleteNodes", mock.MatchedBy( @@ -1145,10 +1149,48 @@ func TestStaticAutoscalerInstaceCreationErrors(t *testing.T) { clusterState.UpdateNodes([]*apiv1.Node{}, nil, now) // delete nodes with create errors - assert.False(t, autoscaler.deleteCreatedNodesWithErrors()) + removedNodes, err = autoscaler.deleteCreatedNodesWithErrors() + assert.False(t, removedNodes) + assert.NoError(t, err) // we expect no more Delete Nodes nodeGroupA.AssertNumberOfCalls(t, "DeleteNodes", 2) + + // failed node not included by NodeGroupForNode + nodeGroupC := &mockprovider.NodeGroup{} + nodeGroupC.On("Exist").Return(true) + nodeGroupC.On("Autoprovisioned").Return(false) + nodeGroupC.On("TargetSize").Return(1, nil) + nodeGroupC.On("Id").Return("C") + nodeGroupC.On("DeleteNodes", mock.Anything).Return(nil) + nodeGroupC.On("Nodes").Return([]cloudprovider.Instance{ + { + Id: "C1", + Status: &cloudprovider.InstanceStatus{ + State: cloudprovider.InstanceCreating, + ErrorInfo: &cloudprovider.InstanceErrorInfo{ + ErrorClass: cloudprovider.OutOfResourcesErrorClass, + ErrorCode: "QUOTA", + }, + }, + }, + }, nil) + provider = &mockprovider.CloudProvider{} + provider.On("NodeGroups").Return([]cloudprovider.NodeGroup{nodeGroupC}) + provider.On("NodeGroupForNode", mock.Anything).Return(nil, nil) + + clusterState = clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, newBackoff()) + clusterState.RefreshCloudProviderNodeInstancesCache() + autoscaler.clusterStateRegistry = clusterState + + // update cluster state + clusterState.UpdateNodes([]*apiv1.Node{}, nil, time.Now()) + + // return early on failed nodes without matching nodegroups + removedNodes, err = autoscaler.deleteCreatedNodesWithErrors() + assert.False(t, removedNodes) + assert.Error(t, err) + nodeGroupC.AssertNumberOfCalls(t, "DeleteNodes", 0) } func TestStaticAutoscalerProcessorCallbacks(t *testing.T) { diff --git a/cluster-autoscaler/estimator/binpacking_estimator.go b/cluster-autoscaler/estimator/binpacking_estimator.go index 87482f4921f1..5e6134a56d53 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator.go +++ b/cluster-autoscaler/estimator/binpacking_estimator.go @@ -22,6 +22,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" klog "k8s.io/klog/v2" @@ -38,15 +39,18 @@ type podInfo struct { type BinpackingNodeEstimator struct { predicateChecker simulator.PredicateChecker clusterSnapshot simulator.ClusterSnapshot + limiter EstimationLimiter } // NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator. func NewBinpackingNodeEstimator( predicateChecker simulator.PredicateChecker, - clusterSnapshot simulator.ClusterSnapshot) *BinpackingNodeEstimator { + clusterSnapshot simulator.ClusterSnapshot, + limiter EstimationLimiter) *BinpackingNodeEstimator { return &BinpackingNodeEstimator{ predicateChecker: predicateChecker, clusterSnapshot: clusterSnapshot, + limiter: limiter, } } @@ -57,69 +61,102 @@ func NewBinpackingNodeEstimator( // still be maintained. // It is assumed that all pods from the given list can fit to nodeTemplate. // Returns the number of nodes needed to accommodate all pods from the list. -func (estimator *BinpackingNodeEstimator) Estimate( +func (e *BinpackingNodeEstimator) Estimate( pods []*apiv1.Pod, - nodeTemplate *schedulerframework.NodeInfo) int { + nodeTemplate *schedulerframework.NodeInfo, + nodeGroup cloudprovider.NodeGroup) (int, []*apiv1.Pod) { + + e.limiter.StartEstimation(pods, nodeGroup) + defer e.limiter.EndEstimation() + podInfos := calculatePodScore(pods, nodeTemplate) sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score }) newNodeNames := make(map[string]bool) + newNodesWithPods := make(map[string]bool) - if err := estimator.clusterSnapshot.Fork(); err != nil { + if err := e.clusterSnapshot.Fork(); err != nil { klog.Errorf("Error while calling ClusterSnapshot.Fork; %v", err) - return 0 + return 0, nil } defer func() { - if err := estimator.clusterSnapshot.Revert(); err != nil { + if err := e.clusterSnapshot.Revert(); err != nil { klog.Fatalf("Error while calling ClusterSnapshot.Revert; %v", err) } }() newNodeNameIndex := 0 + scheduledPods := []*apiv1.Pod{} + lastNodeName := "" for _, podInfo := range podInfos { found := false - nodeName, err := estimator.predicateChecker.FitsAnyNodeMatching(estimator.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool { + nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool { return newNodeNames[nodeInfo.Node().Name] }) if err == nil { found = true - if err := estimator.clusterSnapshot.AddPod(podInfo.pod, nodeName); err != nil { + if err := e.clusterSnapshot.AddPod(podInfo.pod, nodeName); err != nil { klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, nodeName, err) - return 0 + return 0, nil } + scheduledPods = append(scheduledPods, podInfo.pod) + newNodesWithPods[nodeName] = true } if !found { + // Stop binpacking if we reach the limit of nodes we can add. + // We return the result of the binpacking that we already performed. + if !e.limiter.PermissionToAddNode() { + break + } + + // If the last node we've added is empty and the pod couldn't schedule on it, it wouldn't be able to schedule + // on a new node either. There is no point adding more nodes to snapshot in such case, especially because of + // performance cost each extra node adds to future FitsAnyNodeMatching calls. + if lastNodeName != "" && !newNodesWithPods[lastNodeName] { + continue + } + // Add new node - newNodeName, err := estimator.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex) + newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex) if err != nil { klog.Errorf("Error while adding new node for template to ClusterSnapshot; %v", err) - return 0 + return 0, nil } newNodeNameIndex++ - // And schedule pod to it - if err := estimator.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil { + newNodeNames[newNodeName] = true + lastNodeName = newNodeName + + // And try to schedule pod to it. + // Note that this may still fail (ex. if topology spreading with zonal topologyKey is used); + // in this case we can't help the pending pod. We keep the node in clusterSnapshot to avoid + // adding and removing node to snapshot for each such pod. + if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, podInfo.pod, newNodeName); err != nil { + continue + } + if err := e.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil { klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, newNodeName, err) - return 0 + return 0, nil } - newNodeNames[newNodeName] = true + newNodesWithPods[newNodeName] = true + scheduledPods = append(scheduledPods, podInfo.pod) } } - return len(newNodeNames) + return len(newNodesWithPods), scheduledPods } -func (estimator *BinpackingNodeEstimator) addNewNodeToSnapshot( +func (e *BinpackingNodeEstimator) addNewNodeToSnapshot( template *schedulerframework.NodeInfo, nameIndex int) (string, error) { - newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("estimator-%d", nameIndex)) + newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", nameIndex)) var pods []*apiv1.Pod for _, podInfo := range newNodeInfo.Pods { pods = append(pods, podInfo.Pod) } - if err := estimator.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil { + if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil { return "", err } return newNodeInfo.Node().Name, nil diff --git a/cluster-autoscaler/estimator/binpacking_estimator_test.go b/cluster-autoscaler/estimator/binpacking_estimator_test.go index 6aa4ee1b84ec..797d3d28e742 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator_test.go +++ b/cluster-autoscaler/estimator/binpacking_estimator_test.go @@ -22,6 +22,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/autoscaler/cluster-autoscaler/utils/units" @@ -30,89 +31,157 @@ import ( "github.com/stretchr/testify/assert" ) -func makePod(cpuPerPod, memoryPerPod int64) *apiv1.Pod { - return &apiv1.Pod{ +func makePods(cpuPerPod int64, memoryPerPod int64, hostport int32, maxSkew int32, topologySpreadingKey string, podCount int) []*apiv1.Pod { + pod := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "estimatee", + Namespace: "universe", + Labels: map[string]string{ + "app": "estimatee", + }, + }, Spec: apiv1.PodSpec{ Containers: []apiv1.Container{ { Resources: apiv1.ResourceRequirements{ Requests: apiv1.ResourceList{ apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(memoryPerPod, resource.DecimalSI), + apiv1.ResourceMemory: *resource.NewQuantity(memoryPerPod*units.MiB, resource.DecimalSI), }, }, }, }, }, } -} - -func TestBinpackingEstimate(t *testing.T) { - estimator := newBinPackingEstimator(t) - - cpuPerPod := int64(350) - memoryPerPod := int64(1000 * units.MiB) - pod := makePod(cpuPerPod, memoryPerPod) - - pods := make([]*apiv1.Pod, 0) - for i := 0; i < 10; i++ { + if hostport > 0 { + pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{ + { + HostPort: hostport, + }, + } + } + if maxSkew > 0 { + pod.Spec.TopologySpreadConstraints = []apiv1.TopologySpreadConstraint{ + { + MaxSkew: maxSkew, + TopologyKey: topologySpreadingKey, + WhenUnsatisfiable: "DoNotSchedule", + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "estimatee", + }, + }, + }, + } + } + pods := []*apiv1.Pod{} + for i := 0; i < podCount; i++ { pods = append(pods, pod) } + return pods +} + +func makeNode(cpu int64, mem int64, name string, zone string) *apiv1.Node { node := &apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + "kubernetes.io/hostname": name, + "topology.kubernetes.io/zone": zone, + }, + }, Status: apiv1.NodeStatus{ Capacity: apiv1.ResourceList{ - apiv1.ResourceCPU: *resource.NewMilliQuantity(cpuPerPod*3-50, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(2*memoryPerPod, resource.DecimalSI), + apiv1.ResourceCPU: *resource.NewMilliQuantity(cpu, resource.DecimalSI), + apiv1.ResourceMemory: *resource.NewQuantity(mem*units.MiB, resource.DecimalSI), apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), }, }, } node.Status.Allocatable = node.Status.Capacity SetNodeReadyState(node, true, time.Time{}) - - nodeInfo := schedulerframework.NewNodeInfo() - nodeInfo.SetNode(node) - estimate := estimator.Estimate(pods, nodeInfo) - assert.Equal(t, 5, estimate) + return node } -func TestBinpackingEstimateWithPorts(t *testing.T) { - estimator := newBinPackingEstimator(t) - - cpuPerPod := int64(200) - memoryPerPod := int64(1000 * units.MiB) - pod := makePod(cpuPerPod, memoryPerPod) - pod.Spec.Containers[0].Ports = []apiv1.ContainerPort{ +func TestBinpackingEstimate(t *testing.T) { + testCases := []struct { + name string + millicores int64 + memory int64 + maxNodes int + pods []*apiv1.Pod + topologySpreadingKey string + expectNodeCount int + expectPodCount int + }{ { - HostPort: 5555, + name: "simple resource-based binpacking", + millicores: 350*3 - 50, + memory: 2 * 1000, + pods: makePods(350, 1000, 0, 0, "", 10), + expectNodeCount: 5, + expectPodCount: 10, }, - } - pods := make([]*apiv1.Pod, 0) - for i := 0; i < 8; i++ { - pods = append(pods, pod) - } - node := &apiv1.Node{ - Status: apiv1.NodeStatus{ - Capacity: apiv1.ResourceList{ - apiv1.ResourceCPU: *resource.NewMilliQuantity(5*cpuPerPod, resource.DecimalSI), - apiv1.ResourceMemory: *resource.NewQuantity(5*memoryPerPod, resource.DecimalSI), - apiv1.ResourcePods: *resource.NewQuantity(10, resource.DecimalSI), - }, + { + name: "pods-per-node bound binpacking", + millicores: 10000, + memory: 20000, + pods: makePods(10, 100, 0, 0, "", 20), + expectNodeCount: 2, + expectPodCount: 20, + }, + { + name: "hostport conflict forces pod-per-node", + millicores: 1000, + memory: 5000, + pods: makePods(200, 1000, 5555, 0, "", 8), + expectNodeCount: 8, + expectPodCount: 8, + }, + { + name: "limiter cuts binpacking", + millicores: 1000, + memory: 5000, + pods: makePods(500, 1000, 0, 0, "", 20), + maxNodes: 5, + expectNodeCount: 5, + expectPodCount: 10, + }, + { + name: "hostname topology spreading with maxSkew=2 forces 2 pods/node", + millicores: 1000, + memory: 5000, + pods: makePods(20, 100, 0, 2, "kubernetes.io/hostname", 8), + expectNodeCount: 4, + expectPodCount: 8, + }, + { + name: "zonal topology spreading with maxSkew=2 only allows 2 pods to schedule", + millicores: 1000, + memory: 5000, + pods: makePods(20, 100, 0, 2, "topology.kubernetes.io/zone", 8), + expectNodeCount: 1, + expectPodCount: 2, }, } - node.Status.Allocatable = node.Status.Capacity - SetNodeReadyState(node, true, time.Time{}) - - nodeInfo := schedulerframework.NewNodeInfo() - nodeInfo.SetNode(node) - estimate := estimator.Estimate(pods, nodeInfo) - assert.Equal(t, 8, estimate) -} - -func newBinPackingEstimator(t *testing.T) *BinpackingNodeEstimator { - predicateChecker, err := simulator.NewTestPredicateChecker() - clusterSnapshot := simulator.NewBasicClusterSnapshot() - assert.NoError(t, err) - estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot) - return estimator + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + clusterSnapshot := simulator.NewBasicClusterSnapshot() + // Add one node in different zone to trigger topology spread constraints + clusterSnapshot.AddNode(makeNode(100, 100, "oldnode", "zone-jupiter")) + + predicateChecker, err := simulator.NewTestPredicateChecker() + assert.NoError(t, err) + limiter := NewThresholdBasedEstimationLimiter(tc.maxNodes, time.Duration(0)) + estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter) + + node := makeNode(tc.millicores, tc.memory, "template", "zone-mars") + nodeInfo := schedulerframework.NewNodeInfo() + nodeInfo.SetNode(node) + + estimatedNodes, estimatedPods := estimator.Estimate(tc.pods, nodeInfo, nil) + assert.Equal(t, tc.expectNodeCount, estimatedNodes) + assert.Equal(t, tc.expectPodCount, len(estimatedPods)) + }) + } } diff --git a/cluster-autoscaler/estimator/estimator.go b/cluster-autoscaler/estimator/estimator.go index 8a86ec9c66b5..7d4f819dcf19 100644 --- a/cluster-autoscaler/estimator/estimator.go +++ b/cluster-autoscaler/estimator/estimator.go @@ -20,6 +20,7 @@ import ( "fmt" apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/simulator" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -33,22 +34,40 @@ const ( var AvailableEstimators = []string{BinpackingEstimatorName} // Estimator calculates the number of nodes of given type needed to schedule pods. +// It returns the number of new nodes needed as well as the list of pods it managed +// to schedule on those nodes. type Estimator interface { - Estimate([]*apiv1.Pod, *schedulerframework.NodeInfo) int + Estimate([]*apiv1.Pod, *schedulerframework.NodeInfo, cloudprovider.NodeGroup) (int, []*apiv1.Pod) } // EstimatorBuilder creates a new estimator object. type EstimatorBuilder func(simulator.PredicateChecker, simulator.ClusterSnapshot) Estimator // NewEstimatorBuilder creates a new estimator object from flag. -func NewEstimatorBuilder(name string) (EstimatorBuilder, error) { +func NewEstimatorBuilder(name string, limiter EstimationLimiter) (EstimatorBuilder, error) { switch name { case BinpackingEstimatorName: return func( predicateChecker simulator.PredicateChecker, clusterSnapshot simulator.ClusterSnapshot) Estimator { - return NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot) + return NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter) }, nil } return nil, fmt.Errorf("unknown estimator: %s", name) } + +// EstimationLimiter controls how many nodes can be added by Estimator. +// A limiter can be used to prevent costly estimation if an actual ability to +// scale-up is limited by external factors. +type EstimationLimiter interface { + // StartEstimation is called at the start of estimation. + StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup) + // EndEstimation is called at the end of estimation. + EndEstimation() + // PermissionToAddNode is called by an estimator when it wants to add additional + // nodes to simulation. If permission is not granted the Estimator is expected + // not to add any more nodes in this simulation. + // There is no requirement for the Estimator to stop calculations, it's + // just not expected to add any more nodes. + PermissionToAddNode() bool +} diff --git a/cluster-autoscaler/estimator/threshold_based_limiter.go b/cluster-autoscaler/estimator/threshold_based_limiter.go new file mode 100644 index 000000000000..4381721d0c53 --- /dev/null +++ b/cluster-autoscaler/estimator/threshold_based_limiter.go @@ -0,0 +1,64 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "time" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + klog "k8s.io/klog/v2" +) + +type thresholdBasedEstimationLimiter struct { + maxDuration time.Duration + maxNodes int + nodes int + start time.Time +} + +func (tbel *thresholdBasedEstimationLimiter) StartEstimation([]*apiv1.Pod, cloudprovider.NodeGroup) { + tbel.start = time.Now() + tbel.nodes = 0 +} + +func (*thresholdBasedEstimationLimiter) EndEstimation() {} + +func (tbel *thresholdBasedEstimationLimiter) PermissionToAddNode() bool { + if tbel.maxNodes > 0 && tbel.nodes >= tbel.maxNodes { + klog.V(4).Infof("Capping binpacking after exceeding threshold of %i nodes", tbel.maxNodes) + return false + } + timeDefined := tbel.maxDuration > 0 && tbel.start != time.Time{} + if timeDefined && time.Now().After(tbel.start.Add(tbel.maxDuration)) { + klog.V(4).Infof("Capping binpacking after exceeding max duration of %v", tbel.maxDuration) + return false + } + tbel.nodes++ + return true +} + +// NewThresholdBasedEstimationLimiter returns an EstimationLimiter that will prevent estimation +// after either a node count- of time-based threshold is reached. This is meant to prevent cases +// where binpacking of hundreds or thousands of nodes takes extremely long time rendering CA +// incredibly slow or even completely crashing it. +func NewThresholdBasedEstimationLimiter(maxNodes int, maxDuration time.Duration) EstimationLimiter { + return &thresholdBasedEstimationLimiter{ + maxNodes: maxNodes, + maxDuration: maxDuration, + } +} diff --git a/cluster-autoscaler/estimator/threshold_based_limiter_test.go b/cluster-autoscaler/estimator/threshold_based_limiter_test.go new file mode 100644 index 000000000000..e80b586f3ebb --- /dev/null +++ b/cluster-autoscaler/estimator/threshold_based_limiter_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package estimator + +import ( + "testing" + "time" + + apiv1 "k8s.io/api/core/v1" + + "github.com/stretchr/testify/assert" +) + +type limiterOperation func(*testing.T, EstimationLimiter) + +func expectDeny(t *testing.T, l EstimationLimiter) { + assert.Equal(t, false, l.PermissionToAddNode()) +} + +func expectAllow(t *testing.T, l EstimationLimiter) { + assert.Equal(t, true, l.PermissionToAddNode()) +} + +func resetLimiter(t *testing.T, l EstimationLimiter) { + l.EndEstimation() + l.StartEstimation([]*apiv1.Pod{}, nil) +} + +func TestThresholdBasedLimiter(t *testing.T) { + testCases := []struct { + name string + maxNodes int + maxDuration time.Duration + startDelta time.Duration + operations []limiterOperation + expectNodeCount int + }{ + { + name: "no limiting happens", + maxNodes: 20, + operations: []limiterOperation{ + expectAllow, + expectAllow, + expectAllow, + }, + expectNodeCount: 3, + }, + { + name: "time based trigger fires", + maxNodes: 20, + maxDuration: 5 * time.Second, + startDelta: -10 * time.Second, + operations: []limiterOperation{ + expectDeny, + expectDeny, + }, + expectNodeCount: 0, + }, + { + name: "sequence of additions works until the threshold is hit", + maxNodes: 3, + operations: []limiterOperation{ + expectAllow, + expectAllow, + expectAllow, + expectDeny, + }, + expectNodeCount: 3, + }, + { + name: "node counter is reset", + maxNodes: 2, + operations: []limiterOperation{ + expectAllow, + expectAllow, + expectDeny, + resetLimiter, + expectAllow, + }, + expectNodeCount: 1, + }, + { + name: "timer is reset", + maxNodes: 20, + maxDuration: 5 * time.Second, + startDelta: -10 * time.Second, + operations: []limiterOperation{ + expectDeny, + resetLimiter, + expectAllow, + expectAllow, + }, + expectNodeCount: 2, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + limiter := &thresholdBasedEstimationLimiter{ + maxNodes: tc.maxNodes, + maxDuration: tc.maxDuration, + } + limiter.StartEstimation([]*apiv1.Pod{}, nil) + + if tc.startDelta != time.Duration(0) { + limiter.start = limiter.start.Add(tc.startDelta) + } + + for _, op := range tc.operations { + op(t, limiter) + } + assert.Equal(t, tc.expectNodeCount, limiter.nodes) + limiter.EndEstimation() + }) + } +} diff --git a/cluster-autoscaler/expander/factory/expander_factory.go b/cluster-autoscaler/expander/factory/expander_factory.go index a79f7cfdefc3..a0e0b7fe0d5a 100644 --- a/cluster-autoscaler/expander/factory/expander_factory.go +++ b/cluster-autoscaler/expander/factory/expander_factory.go @@ -28,8 +28,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/expander/waste" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - "k8s.io/klog/v2" - kube_client "k8s.io/client-go/kubernetes" ) @@ -71,7 +69,6 @@ func ExpanderStrategyFromStrings(expanderFlags []string, cloudProvider cloudprov lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace) filters = append(filters, priority.NewFilter(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder)) case expander.GRPCExpanderName: - klog.V(1).Info("GRPC expander chosen") filters = append(filters, grpcplugin.NewFilter(GRPCExpanderCert, GRPCExpanderURL)) default: return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag) diff --git a/cluster-autoscaler/expander/grpcplugin/README.md b/cluster-autoscaler/expander/grpcplugin/README.md new file mode 100644 index 000000000000..4fd24408fd21 --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/README.md @@ -0,0 +1,41 @@ +# gRPC Expander for Cluster Autoscaler + +## Introduction +This expander functions as a gRPC client, and will pass expansion options to an external gRPC server. +The external server will use this information to make a decision on which Node Group to expand, and return an option to expand. + +## Motivation + +This expander gives users very fine grained control over which option they'd like to expand. +The gRPC server must be implemented by the user, but the logic can be developed out of band with Cluster Autoscaler. +There are a wide variety of use cases here. Some examples are as follows: +* A tiered weighted random strategy can be implemented, instead of a static priority ladder offered by the priority expander. +* A strategy to encapsulate business logic specific to a user but not all users of Cluster Autoscaler +* A strategy to take into account the dynamic fluctuating prices of the spot instance market + +## Configuration options +As using this expander requires communication with another service, users must specify a few options as CLI arguments. + +```yaml +--grpcExpanderUrl +``` +URL of the gRPC Expander server, for CA to communicate with. +```yaml +--grpcExpanderCert +``` +Location of the volume mounted certificate of the gRPC server if it is configured to communicate over TLS + +## gRPC Expander Server Setup +The gRPC server can be set up in many ways, but a simple example is described below. +An example of a barebones gRPC Exapnder Server can be found in the `example` directory under `fake_grpc_server.go` file. This is meant to be copied elsewhere and deployed as a separate +service. Note that the `protos/expander.pb.go` generated protobuf code will also need to be copied and used to serialize/deserizle the Options passed from CA. +Communication between Cluster Autoscaler and the gRPC Server will occur over native kube-proxy. To use this, note the Service and Namespace the gRPC server is deployed in. + +Deploy the gRPC Expander Server as a separate app, listening on a specifc port number. +Start Cluster Autoscaler with the `--grpcExapnderURl=SERVICE_NAME.NAMESPACE_NAME.svc.cluster.local:PORT_NUMBER` flag, as well as `--grpcExpanderCert` pointed at the location of the volume mounted certificate of the gRPC server. + +## Details + +The gRPC client currently transforms nodeInfo objects passed into the expander to v1.Node objects to save rpc call throughput. As such, the gRPC server will not have access to daemonsets and static pods running on each node. + + diff --git a/cluster-autoscaler/expander/grpcplugin/example/fake_grpc_server.go b/cluster-autoscaler/expander/grpcplugin/example/fake_grpc_server.go new file mode 100644 index 000000000000..05fd5f47f06b --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/example/fake_grpc_server.go @@ -0,0 +1,104 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package example + +import ( + "context" + "fmt" + "log" + "net" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos" +) + +// This code is meant to be used as starter code, deployed as a separate app, not in Cluster Autoscaler. +// This serves as the gRPC Expander Server counterpart to the client which lives in this repo +// main.go of said application should simply pass in paths to (optional)cert, (optional)private key, and port, and call Serve to start listening +// copy the protos/expander.pb.go to your other application's repo, so it has access to the protobuf definitions + +// Serve should be called by the main() function in main.go of the Expander Server repo to start serving +func Serve(certPath string, keyPath string, port uint) { + + var grpcServer *grpc.Server + + // If credentials are passed in, use them + if certPath != "" && keyPath != "" { + log.Printf("Using certFile: %v and keyFile: %v", certPath, keyPath) + tlsCredentials, err := credentials.NewServerTLSFromFile(certPath, keyPath) + if err != nil { + log.Fatal("cannot load TLS credentials: ", err) + } + grpcServer = grpc.NewServer(grpc.Creds(tlsCredentials)) + } else { + grpcServer = grpc.NewServer() + } + + netListener := getNetListener(port) + + expanderServerImpl := NewExpanderServerImpl() + + protos.RegisterExpanderServer(grpcServer, expanderServerImpl) + + // start the server + log.Println("Starting server on port ", port) + if err := grpcServer.Serve(netListener); err != nil { + log.Fatalf("failed to serve: %s", err) + } +} + +func getNetListener(port uint) net.Listener { + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + log.Fatalf("failed to listen: %v", err) + panic(fmt.Sprintf("failed to listen: %v", err)) + } + + return lis +} + +// ExpanderServerImpl is an implementation of Expander Server from proto definition +type ExpanderServerImpl struct{} + +// NewExpanderServerImpl is this Expander's implementation of the server +func NewExpanderServerImpl() *ExpanderServerImpl { + return &ExpanderServerImpl{} +} + +// BestOptions method filters out the best options of all options passed from the gRPC Client in CA, according to the defined strategy. +func (ServerImpl *ExpanderServerImpl) BestOptions(ctx context.Context, req *protos.BestOptionsRequest) (*protos.BestOptionsResponse, error) { + opts := req.GetOptions() + log.Printf("Received BestOption Request with %v options", len(opts)) + + // This strategy simply chooses the Option with the longest NodeGroupID name, but can be replaced with any arbitrary logic + longest := 0 + var choice *protos.Option + for _, opt := range opts { + log.Println(opt.NodeGroupId) + if len(opt.NodeGroupId) > longest { + choice = opt + } + } + + log.Print("returned bestOptions with option: ", choice.NodeGroupId) + + // Return just one option for now + return &protos.BestOptionsResponse{ + Options: []*protos.Option{choice}, + }, nil +} diff --git a/cluster-autoscaler/expander/grpcplugin/example/main.go b/cluster-autoscaler/expander/grpcplugin/example/main.go new file mode 100644 index 000000000000..5401416baa01 --- /dev/null +++ b/cluster-autoscaler/expander/grpcplugin/example/main.go @@ -0,0 +1,30 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package example + +import "flag" + +func main() { + + certPath := flag.String("cert-path", "", "Path to cert file for gRPC Expander Server") + keyPath := flag.String("key-path", "", "Path to private key for gRPC Expander Server") + port := flag.Uint("port", 7000, "Port number for server to listen on") + + flag.Parse() + + Serve(*certPath, *keyPath, *port) +} diff --git a/cluster-autoscaler/expander/grpcplugin/grpc_client.go b/cluster-autoscaler/expander/grpcplugin/grpc_client.go index 261e5b80403a..7800bd270136 100644 --- a/cluster-autoscaler/expander/grpcplugin/grpc_client.go +++ b/cluster-autoscaler/expander/grpcplugin/grpc_client.go @@ -31,6 +31,8 @@ import ( "google.golang.org/grpc/credentials" ) +const gRPCTimeout = 5 * time.Second + type grpcclientstrategy struct { grpcClient protos.ExpanderClient } @@ -47,21 +49,20 @@ func NewFilter(expanderCert string, expanderUrl string) expander.Filter { func createGRPCClient(expanderCert string, expanderUrl string) protos.ExpanderClient { var dialOpt grpc.DialOption - // if no Cert file specified, use insecure if expanderCert == "" { - dialOpt = grpc.WithInsecure() - } else { - creds, err := credentials.NewClientTLSFromFile(expanderCert, "") - if err != nil { - log.Fatalf("Failed to create TLS credentials %v", err) - return nil - } - dialOpt = grpc.WithTransportCredentials(creds) + log.Fatalf("GRPC Expander Cert not specified, insecure connections not allowed") + return nil + } + creds, err := credentials.NewClientTLSFromFile(expanderCert, "") + if err != nil { + log.Fatalf("Failed to create TLS credentials %v", err) + return nil } - klog.V(2).Info("Dialing ", expanderUrl, " dialopt: ", dialOpt) + dialOpt = grpc.WithTransportCredentials(creds) + klog.V(2).Infof("Dialing: %s with dialopt: %v", expanderUrl, dialOpt) conn, err := grpc.Dial(expanderUrl, dialOpt) if err != nil { - log.Fatalf("fail to dial server: %v", err) + log.Fatalf("Fail to dial server: %v", err) return nil } return protos.NewExpanderClient(conn) @@ -69,67 +70,69 @@ func createGRPCClient(expanderCert string, expanderUrl string) protos.ExpanderCl func (g *grpcclientstrategy) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option { if g.grpcClient == nil { - log.Fatalf("Incorrect gRPC client config, filtering no options") + klog.Errorf("Incorrect gRPC client config, filtering no options") return expansionOptions } // Transform inputs to gRPC inputs - nodeGroupIDOptionMap := make(map[string]expander.Option) - grpcOptionsSlice := []*protos.Option{} - populateOptionsForGRPC(expansionOptions, nodeGroupIDOptionMap, &grpcOptionsSlice) - grpcNodeInfoMap := make(map[string]*v1.Node) - populateNodeInfoForGRPC(nodeInfo, grpcNodeInfoMap) + grpcOptionsSlice, nodeGroupIDOptionMap := populateOptionsForGRPC(expansionOptions) + grpcNodeMap := populateNodeInfoForGRPC(nodeInfo) // call gRPC server to get BestOption - klog.V(2).Info("GPRC call of best options to server with ", len(nodeGroupIDOptionMap), " options") - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + klog.V(2).Infof("GPRC call of best options to server with %v options", len(nodeGroupIDOptionMap)) + ctx, cancel := context.WithTimeout(context.Background(), gRPCTimeout) defer cancel() - bestOptionsResponse, err := g.grpcClient.BestOptions(ctx, &protos.BestOptionsRequest{Options: grpcOptionsSlice, NodeInfoMap: grpcNodeInfoMap}) + bestOptionsResponse, err := g.grpcClient.BestOptions(ctx, &protos.BestOptionsRequest{Options: grpcOptionsSlice, NodeMap: grpcNodeMap}) if err != nil { - klog.V(2).Info("GRPC call timed out, no options filtered") + klog.V(4).Info("GRPC call timed out, no options filtered") return expansionOptions } if bestOptionsResponse == nil || bestOptionsResponse.Options == nil { - klog.V(2).Info("GRPC returned nil bestOptions, no options filtered") + klog.V(4).Info("GRPC returned nil bestOptions, no options filtered") return expansionOptions } // Transform back options slice options := transformAndSanitizeOptionsFromGRPC(bestOptionsResponse.Options, nodeGroupIDOptionMap) if options == nil { - klog.V(2).Info("Unable to sanitize GPRC returned bestOptions, no options filtered") + klog.V(4).Info("Unable to sanitize GPRC returned bestOptions, no options filtered") return expansionOptions } return options } // populateOptionsForGRPC creates a map of nodegroup ID and options, as well as a slice of Options objects for the gRPC call -func populateOptionsForGRPC(expansionOptions []expander.Option, nodeGroupIDOptionMap map[string]expander.Option, grpcOptionsSlice *[]*protos.Option) { +func populateOptionsForGRPC(expansionOptions []expander.Option) ([]*protos.Option, map[string]expander.Option) { + grpcOptionsSlice := []*protos.Option{} + nodeGroupIDOptionMap := make(map[string]expander.Option) for _, option := range expansionOptions { nodeGroupIDOptionMap[option.NodeGroup.Id()] = option - *grpcOptionsSlice = append(*grpcOptionsSlice, newOptionMessage(option.NodeGroup.Id(), int32(option.NodeCount), option.Debug, option.Pods)) + grpcOptionsSlice = append(grpcOptionsSlice, newOptionMessage(option.NodeGroup.Id(), int32(option.NodeCount), option.Debug, option.Pods)) } + return grpcOptionsSlice, nodeGroupIDOptionMap } -// populateNodeInfoForGRPC modifies the nodeInfo object, and replaces it with the v1.Node to pass through grpc -func populateNodeInfoForGRPC(nodeInfos map[string]*schedulerframework.NodeInfo, grpcNodeInfoMap map[string]*v1.Node) { +// populateNodeInfoForGRPC looks at the corresponding v1.Node object per NodeInfo object, and populates the grpcNodeInfoMap with these to pass over grpc +func populateNodeInfoForGRPC(nodeInfos map[string]*schedulerframework.NodeInfo) map[string]*v1.Node { + grpcNodeInfoMap := make(map[string]*v1.Node) for nodeId, nodeInfo := range nodeInfos { grpcNodeInfoMap[nodeId] = nodeInfo.Node() } + return grpcNodeInfoMap } func transformAndSanitizeOptionsFromGRPC(bestOptionsResponseOptions []*protos.Option, nodeGroupIDOptionMap map[string]expander.Option) []expander.Option { var options []expander.Option for _, option := range bestOptionsResponseOptions { if option == nil { - klog.Errorf("gRPC server returned nil Option") - return nil + klog.Errorf("GRPC server returned nil Option") + continue } if _, ok := nodeGroupIDOptionMap[option.NodeGroupId]; ok { options = append(options, nodeGroupIDOptionMap[option.NodeGroupId]) } else { - klog.Errorf("gRPC server returned invalid nodeGroup ID: ", option.NodeGroupId) - return nil + klog.Errorf("GRPC server returned invalid nodeGroup ID: ", option.NodeGroupId) + continue } } return options diff --git a/cluster-autoscaler/expander/grpcplugin/grpc_client_test.go b/cluster-autoscaler/expander/grpcplugin/grpc_client_test.go index 45a86a248230..65b94a17d54b 100644 --- a/cluster-autoscaler/expander/grpcplugin/grpc_client_test.go +++ b/cluster-autoscaler/expander/grpcplugin/grpc_client_test.go @@ -87,20 +87,41 @@ var ( ) func TestPopulateOptionsForGrpc(t *testing.T) { - nodeGroupIDOptionMap := make(map[string]expander.Option) - grpcOptionsSlice := []*protos.Option{} - populateOptionsForGRPC(options, nodeGroupIDOptionMap, &grpcOptionsSlice) - - expectedOptionsSlice := []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge} - assert.Equal(t, expectedOptionsSlice, grpcOptionsSlice) - - expectedNodeGroupIDOptionMap := map[string]expander.Option{ - eoT2Micro.NodeGroup.Id(): eoT2Micro, - eoT2Large.NodeGroup.Id(): eoT2Large, - eoT3Large.NodeGroup.Id(): eoT3Large, - eoM44XLarge.NodeGroup.Id(): eoM44XLarge, + testCases := []struct { + desc string + opts []expander.Option + expectedOpts []*protos.Option + expectedMap map[string]expander.Option + }{ + { + desc: "empty options", + opts: []expander.Option{}, + expectedOpts: []*protos.Option{}, + expectedMap: map[string]expander.Option{}, + }, + { + desc: "one option", + opts: []expander.Option{eoT2Micro}, + expectedOpts: []*protos.Option{&grpcEoT2Micro}, + expectedMap: map[string]expander.Option{eoT2Micro.NodeGroup.Id(): eoT2Micro}, + }, + { + desc: "many options", + opts: options, + expectedOpts: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}, + expectedMap: map[string]expander.Option{ + eoT2Micro.NodeGroup.Id(): eoT2Micro, + eoT2Large.NodeGroup.Id(): eoT2Large, + eoT3Large.NodeGroup.Id(): eoT3Large, + eoM44XLarge.NodeGroup.Id(): eoM44XLarge, + }, + }, + } + for _, tc := range testCases { + grpcOptionsSlice, nodeGroupIDOptionMap := populateOptionsForGRPC(tc.opts) + assert.Equal(t, tc.expectedOpts, grpcOptionsSlice) + assert.Equal(t, tc.expectedMap, nodeGroupIDOptionMap) } - assert.Equal(t, expectedNodeGroupIDOptionMap, nodeGroupIDOptionMap) } func makeFakeNodeInfos() map[string]*schedulerframework.NodeInfo { @@ -114,9 +135,8 @@ func makeFakeNodeInfos() map[string]*schedulerframework.NodeInfo { } func TestPopulateNodeInfoForGRPC(t *testing.T) { - grpcNodeInfoMap := make(map[string]*v1.Node) nodeInfos := makeFakeNodeInfos() - populateNodeInfoForGRPC(nodeInfos, grpcNodeInfoMap) + grpcNodeInfoMap := populateNodeInfoForGRPC(nodeInfos) expectedGrpcNodeInfoMap := make(map[string]*v1.Node) for i, opt := range options { @@ -140,7 +160,7 @@ func TestValidTransformAndSanitizeOptionsFromGRPC(t *testing.T) { assert.Equal(t, expectedOptions, ret) } -func TestInvalidTransformAndSanitizeOptionsFromGRPC(t *testing.T) { +func TestAnInvalidTransformAndSanitizeOptionsFromGRPC(t *testing.T) { responseOptionsSlice := []*protos.Option{&grpcEoT2Micro, &grpcEoT3Large, &grpcEoM44XLarge} nodeGroupIDOptionMap := map[string]expander.Option{ eoT2Micro.NodeGroup.Id(): eoT2Micro, @@ -149,7 +169,7 @@ func TestInvalidTransformAndSanitizeOptionsFromGRPC(t *testing.T) { } ret := transformAndSanitizeOptionsFromGRPC(responseOptionsSlice, nodeGroupIDOptionMap) - assert.Equal(t, []expander.Option(nil), ret) + assert.Equal(t, []expander.Option{eoT2Micro, eoT3Large}, ret) } func TestBestOptionsValid(t *testing.T) { @@ -164,8 +184,8 @@ func TestBestOptionsValid(t *testing.T) { grpcNodeInfoMap[opt.NodeGroup.Id()] = nodes[i] } expectedBestOptionsReq := &protos.BestOptionsRequest{ - Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}, - NodeInfoMap: grpcNodeInfoMap, + Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}, + NodeMap: grpcNodeInfoMap, } mockClient.EXPECT().BestOptions( @@ -242,13 +262,12 @@ func TestBestOptionsErrors(t *testing.T) { }, } for _, tc := range testCases { - grpcNodeInfoMap := make(map[string]*v1.Node) - populateNodeInfoForGRPC(tc.nodeInfo, grpcNodeInfoMap) + grpcNodeInfoMap := populateNodeInfoForGRPC(tc.nodeInfo) mockClient.EXPECT().BestOptions( gomock.Any(), gomock.Eq( &protos.BestOptionsRequest{ - Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}, - NodeInfoMap: grpcNodeInfoMap, + Options: []*protos.Option{&grpcEoT2Micro, &grpcEoT2Large, &grpcEoT3Large, &grpcEoM44XLarge}, + NodeMap: grpcNodeInfoMap, })).Return(&tc.mockResponse, tc.errResponse) resp := g.BestOptions(options, tc.nodeInfo) diff --git a/cluster-autoscaler/expander/grpcplugin/protos/expander.pb.go b/cluster-autoscaler/expander/grpcplugin/protos/expander.pb.go index ee90d1ee826f..3c071d2f37d7 100644 --- a/cluster-autoscaler/expander/grpcplugin/protos/expander.pb.go +++ b/cluster-autoscaler/expander/grpcplugin/protos/expander.pb.go @@ -43,7 +43,7 @@ type BestOptionsRequest struct { Options []*Option `protobuf:"bytes,1,rep,name=options,proto3" json:"options,omitempty"` // key is node id from options - NodeInfoMap map[string]*v1.Node `protobuf:"bytes,2,rep,name=nodeInfoMap,proto3" json:"nodeInfoMap,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + NodeMap map[string]*v1.Node `protobuf:"bytes,2,rep,name=nodeMap,proto3" json:"nodeMap,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (x *BestOptionsRequest) Reset() { @@ -85,9 +85,9 @@ func (x *BestOptionsRequest) GetOptions() []*Option { return nil } -func (x *BestOptionsRequest) GetNodeInfoMap() map[string]*v1.Node { +func (x *BestOptionsRequest) GetNodeMap() map[string]*v1.Node { if x != nil { - return x.NodeInfoMap + return x.NodeMap } return nil } @@ -220,18 +220,17 @@ var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_rawDesc = 0x65, 0x78, 0x70, 0x61, 0x6e, 0x64, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x67, 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x1a, 0x22, 0x6b, 0x38, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x67, - 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xef, + 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xdf, 0x01, 0x0a, 0x12, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x12, 0x51, 0x0a, 0x0b, 0x6e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, 0x6f, 0x4d, - 0x61, 0x70, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x70, - 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, 0x66, - 0x6f, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0b, 0x6e, 0x6f, 0x64, 0x65, 0x49, - 0x6e, 0x66, 0x6f, 0x4d, 0x61, 0x70, 0x1a, 0x58, 0x0a, 0x10, 0x4e, 0x6f, 0x64, 0x65, 0x49, 0x6e, - 0x66, 0x6f, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, + 0x6f, 0x6e, 0x73, 0x12, 0x45, 0x0a, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x70, 0x18, 0x02, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x70, 0x6c, 0x75, 0x67, 0x69, + 0x6e, 0x2e, 0x42, 0x65, 0x73, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x2e, 0x4e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, + 0x79, 0x52, 0x07, 0x6e, 0x6f, 0x64, 0x65, 0x4d, 0x61, 0x70, 0x1a, 0x54, 0x0a, 0x0c, 0x4e, 0x6f, + 0x64, 0x65, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2e, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x6b, 0x38, 0x73, 0x2e, 0x69, 0x6f, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, @@ -278,16 +277,16 @@ var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_goTypes = (*BestOptionsRequest)(nil), // 0: grpcplugin.BestOptionsRequest (*BestOptionsResponse)(nil), // 1: grpcplugin.BestOptionsResponse (*Option)(nil), // 2: grpcplugin.Option - nil, // 3: grpcplugin.BestOptionsRequest.NodeInfoMapEntry + nil, // 3: grpcplugin.BestOptionsRequest.NodeMapEntry (*v1.Pod)(nil), // 4: k8s.io.api.core.v1.Pod (*v1.Node)(nil), // 5: k8s.io.api.core.v1.Node } var file_cluster_autoscaler_expander_grpcplugin_protos_expander_proto_depIdxs = []int32{ 2, // 0: grpcplugin.BestOptionsRequest.options:type_name -> grpcplugin.Option - 3, // 1: grpcplugin.BestOptionsRequest.nodeInfoMap:type_name -> grpcplugin.BestOptionsRequest.NodeInfoMapEntry + 3, // 1: grpcplugin.BestOptionsRequest.nodeMap:type_name -> grpcplugin.BestOptionsRequest.NodeMapEntry 2, // 2: grpcplugin.BestOptionsResponse.options:type_name -> grpcplugin.Option 4, // 3: grpcplugin.Option.pod:type_name -> k8s.io.api.core.v1.Pod - 5, // 4: grpcplugin.BestOptionsRequest.NodeInfoMapEntry.value:type_name -> k8s.io.api.core.v1.Node + 5, // 4: grpcplugin.BestOptionsRequest.NodeMapEntry.value:type_name -> k8s.io.api.core.v1.Node 0, // 5: grpcplugin.Expander.BestOptions:input_type -> grpcplugin.BestOptionsRequest 1, // 6: grpcplugin.Expander.BestOptions:output_type -> grpcplugin.BestOptionsResponse 6, // [6:7] is the sub-list for method output_type diff --git a/cluster-autoscaler/expander/grpcplugin/protos/expander.proto b/cluster-autoscaler/expander/grpcplugin/protos/expander.proto index eb8dd63c1c02..5a08e8ff301b 100644 --- a/cluster-autoscaler/expander/grpcplugin/protos/expander.proto +++ b/cluster-autoscaler/expander/grpcplugin/protos/expander.proto @@ -2,7 +2,6 @@ syntax = "proto3"; package grpcplugin; import "k8s.io/api/core/v1/generated.proto"; -//import "google/protobuf/struct.proto"; option go_package = "cluster-autoscaler/expander/grpcplugin/protos"; @@ -17,7 +16,7 @@ service Expander { message BestOptionsRequest { repeated Option options = 1; // key is node id from options - map nodeInfoMap = 2; + map nodeMap = 2; } message BestOptionsResponse { repeated Option options = 1; diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 2a59620af34a..afa071078849 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -183,7 +183,19 @@ var ( daemonSetEvictionForOccupiedNodes = flag.Bool("daemonset-eviction-for-occupied-nodes", true, "DaemonSet pods will be gracefully terminated from non-empty nodes") userAgent = flag.String("user-agent", "cluster-autoscaler", "User agent used for HTTP calls.") - emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.") + emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.") + initialNodeGroupBackoffDuration = flag.Duration("initial-node-group-backoff-duration", 5*time.Minute, + "initialNodeGroupBackoffDuration is the duration of first backoff after a new node failed to start.") + maxNodeGroupBackoffDuration = flag.Duration("max-node-group-backoff-duration", 30*time.Minute, + "maxNodeGroupBackoffDuration is the maximum backoff duration for a NodeGroup after new nodes failed to start.") + nodeGroupBackoffResetTimeout = flag.Duration("node-group-backoff-reset-timeout", 3*time.Hour, + "nodeGroupBackoffResetTimeout is the time after last failed scale-up when the backoff duration is reset.") + maxScaleDownParallelismFlag = flag.Int("max-scale-down-parallelism", 10, "Maximum number of nodes (both empty and needing drain) that can be deleted in parallel.") + maxDrainParallelismFlag = flag.Int("max-drain-parallelism", 1, "Maximum number of nodes needing drain, that can be drained and deleted in parallel.") + gceExpanderEphemeralStorageSupport = flag.Bool("gce-expander-ephemeral-storage-support", false, "Whether scale-up takes ephemeral storage resources into account for GCE cloud provider") + recordDuplicatedEvents = flag.Bool("record-duplicated-events", false, "enable duplication of similar events within a 5 minute window.") + maxNodesPerScaleUp = flag.Int("max-nodes-per-scaleup", 1000, "Max nodes added in a single scale-up. This is intended strictly for optimizing CA algorithm latency and not a tool to rate-limit scale-up throughput.") + maxNodeGroupBinpackingDuration = flag.Duration("max-nodegroup-binpacking-duration", 10*time.Second, "Maximum time that will be spent in binpacking simulation for each NodeGroup.") ) func createAutoscalingOptions() config.AutoscalingOptions { @@ -263,6 +275,15 @@ func createAutoscalingOptions() config.AutoscalingOptions { DaemonSetEvictionForEmptyNodes: *daemonSetEvictionForEmptyNodes, DaemonSetEvictionForOccupiedNodes: *daemonSetEvictionForOccupiedNodes, UserAgent: *userAgent, + InitialNodeGroupBackoffDuration: *initialNodeGroupBackoffDuration, + MaxNodeGroupBackoffDuration: *maxNodeGroupBackoffDuration, + NodeGroupBackoffResetTimeout: *nodeGroupBackoffResetTimeout, + MaxScaleDownParallelism: *maxScaleDownParallelismFlag, + MaxDrainParallelism: *maxDrainParallelismFlag, + GceExpanderEphemeralStorageSupport: *gceExpanderEphemeralStorageSupport, + RecordDuplicatedEvents: *recordDuplicatedEvents, + MaxNodesPerScaleUp: *maxNodesPerScaleUp, + MaxNodeGroupBinpackingDuration: *maxNodeGroupBinpackingDuration, } } diff --git a/cluster-autoscaler/vendor/github.com/golang/mock/mockgen/model/model.go b/cluster-autoscaler/vendor/github.com/golang/mock/mockgen/model/model.go index d06d5162282a..2c6a62ceb268 100644 --- a/cluster-autoscaler/vendor/github.com/golang/mock/mockgen/model/model.go +++ b/cluster-autoscaler/vendor/github.com/golang/mock/mockgen/model/model.go @@ -71,7 +71,7 @@ func (intf *Interface) addImports(im map[string]bool) { } } -// AddMethod adds a new method, deduplicating by method name. +// AddMethod adds a new method, de-duplicating by method name. func (intf *Interface) AddMethod(m *Method) { for _, me := range intf.Methods { if me.Name == m.Name { @@ -260,11 +260,10 @@ func (mt *MapType) addImports(im map[string]bool) { // NamedType is an exported type in a package. type NamedType struct { Package string // may be empty - Type string // TODO: should this be typed Type? + Type string } func (nt *NamedType) String(pm map[string]string, pkgOverride string) string { - // TODO: is this right? if pkgOverride == nt.Package { return nt.Type }