Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit refresh rate of GCE MIG instances. #5665

Merged
merged 1 commit into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ func main() {
NodeGroupAutoDiscovery: *nodeGroupAutoDiscoveryFlag,
NodeGroups: *nodeGroupsFlag,
ClusterName: *clusterName,
ConcurrentGceRefreshes: 1,
UserAgent: "user-agent",
GCEOptions: config.GCEOptions{
ConcurrentRefreshes: 1,
},
UserAgent: "user-agent",
}
cloudProvider := cloudBuilder.NewCloudProvider(autoscalingOptions)
srv := wrapper.NewCloudProviderGrpcWrapper(cloudProvider)
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,12 @@ func BuildGCE(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover
defer config.Close()
}

manager, err := CreateGceManager(config, do, opts.Regional, opts.ConcurrentGceRefreshes, opts.UserAgent)
manager, err := CreateGceManager(config, do, opts.Regional, opts.GCEOptions.ConcurrentRefreshes, opts.UserAgent, opts.GCEOptions.MigInstancesMinRefreshWaitTime)
if err != nil {
klog.Fatalf("Failed to create GCE Manager: %v", err)
}

pricingModel := NewGcePriceModel(NewGcePriceInfo(), opts.GceExpanderEphemeralStorageSupport)
pricingModel := NewGcePriceModel(NewGcePriceInfo(), opts.GCEOptions.ExpanderEphemeralStorageSupport)
provider, err := BuildGceCloudProvider(manager, rl, pricingModel)
if err != nil {
klog.Fatalf("Failed to create GCE cloud provider: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/cloudprovider/gce/gce_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type gceManagerImpl struct {
}

// CreateGceManager constructs GceManager object.
func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, regional bool, concurrentGceRefreshes int, userAgent string) (GceManager, error) {
func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, regional bool, concurrentGceRefreshes int, userAgent string, migInstancesMinRefreshWaitTime time.Duration) (GceManager, error) {
// Create Google Compute Engine token.
var err error
tokenSource := google.ComputeTokenSource("")
Expand Down Expand Up @@ -183,7 +183,7 @@ func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGr
cache: cache,
GceService: gceService,
migLister: migLister,
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes),
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes, migInstancesMinRefreshWaitTime),
location: location,
regional: regional,
projectId: projectId,
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/cloudprovider/gce/gce_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa
manager := &gceManagerImpl{
cache: cache,
migLister: migLister,
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, 1),
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, 1, 0*time.Second),
GceService: gceService,
projectId: projectId,
regional: regional,
Expand Down
52 changes: 39 additions & 13 deletions cluster-autoscaler/cloudprovider/gce/mig_info_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path"
"strings"
"sync"
"time"

gce "google.golang.org/api/compute/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
Expand Down Expand Up @@ -52,24 +53,40 @@ type MigInfoProvider interface {
GetMigMachineType(migRef GceRef) (MachineType, error)
}

type timeProvider interface {
Now() time.Time
}

type cachingMigInfoProvider struct {
migInfoMutex sync.Mutex
cache *GceCache
migLister MigLister
gceClient AutoscalingGceClient
projectId string
concurrentGceRefreshes int
migInstanceMutex sync.Mutex
migInfoMutex sync.Mutex
cache *GceCache
migLister MigLister
gceClient AutoscalingGceClient
projectId string
concurrentGceRefreshes int
migInstanceMutex sync.Mutex
migInstancesMinRefreshWaitTime time.Duration
migInstancesLastRefreshedInfo map[string]time.Time
timeProvider timeProvider
}

type realTime struct{}

func (r *realTime) Now() time.Time {
return time.Now()
}

// NewCachingMigInfoProvider creates an instance of caching MigInfoProvider
func NewCachingMigInfoProvider(cache *GceCache, migLister MigLister, gceClient AutoscalingGceClient, projectId string, concurrentGceRefreshes int) MigInfoProvider {
func NewCachingMigInfoProvider(cache *GceCache, migLister MigLister, gceClient AutoscalingGceClient, projectId string, concurrentGceRefreshes int, migInstancesMinRefreshWaitTime time.Duration) MigInfoProvider {
return &cachingMigInfoProvider{
cache: cache,
migLister: migLister,
gceClient: gceClient,
projectId: projectId,
concurrentGceRefreshes: concurrentGceRefreshes,
cache: cache,
migLister: migLister,
gceClient: gceClient,
projectId: projectId,
concurrentGceRefreshes: concurrentGceRefreshes,
migInstancesMinRefreshWaitTime: migInstancesMinRefreshWaitTime,
migInstancesLastRefreshedInfo: make(map[string]time.Time),
timeProvider: &realTime{},
}
}

Expand Down Expand Up @@ -158,12 +175,21 @@ func (c *cachingMigInfoProvider) findMigWithMatchingBasename(instanceRef GceRef)
}

func (c *cachingMigInfoProvider) fillMigInstances(migRef GceRef) error {
if val, ok := c.migInstancesLastRefreshedInfo[migRef.String()]; ok {
// do not regenerate MIG instances cache if last refresh happened recently.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log info that we're serving stale data from cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if c.timeProvider.Now().Sub(val) < c.migInstancesMinRefreshWaitTime {
klog.V(4).Infof("Not regenerating MIG instances cache for %s, as it was refreshed in last MinRefreshWaitTime (%s).", migRef.String(), c.migInstancesMinRefreshWaitTime)
return nil
}
}
klog.V(4).Infof("Regenerating MIG instances cache for %s", migRef.String())
instances, err := c.gceClient.FetchMigInstances(migRef)
if err != nil {
c.migLister.HandleMigIssue(migRef, err)
return err
}
// only save information for successful calls, given the errors above may be transient.
c.migInstancesLastRefreshedInfo[migRef.String()] = c.timeProvider.Now()
return c.cache.SetMigInstances(migRef, instances)
}

Expand Down
110 changes: 102 additions & 8 deletions cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"regexp"
"testing"
"time"

"github.com/stretchr/testify/assert"
gce "google.golang.org/api/compute/v1"
Expand Down Expand Up @@ -235,7 +236,7 @@ func TestMigInfoProviderGetMigForInstance(t *testing.T) {
fetchMigs: fetchMigsConst(nil),
}
migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)

mig, err := provider.GetMigForInstance(instanceRef)

Expand Down Expand Up @@ -307,7 +308,7 @@ func TestGetMigInstances(t *testing.T) {
fetchMigInstances: tc.fetchMigInstances,
}
migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)
instances, err := provider.GetMigInstances(mig.GceRef())
cachedInstances, cached := tc.cache.GetMigInstances(mig.GceRef())

Expand Down Expand Up @@ -471,7 +472,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) {
fetchMigInstances: tc.fetchMigInstances,
}
migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)
err := provider.RegenerateMigInstancesCache()

assert.Equal(t, tc.expectedErr, err)
Expand Down Expand Up @@ -550,7 +551,7 @@ func TestGetMigTargetSize(t *testing.T) {
fetchMigTargetSize: tc.fetchMigTargetSize,
}
migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)

targetSize, err := provider.GetMigTargetSize(mig.GceRef())
cachedTargetSize, found := tc.cache.GetMigTargetSize(mig.GceRef())
Expand Down Expand Up @@ -632,7 +633,7 @@ func TestGetMigBasename(t *testing.T) {
fetchMigBasename: tc.fetchMigBasename,
}
migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)

basename, err := provider.GetMigBasename(mig.GceRef())
cachedBasename, found := tc.cache.GetMigBasename(mig.GceRef())
Expand Down Expand Up @@ -714,7 +715,7 @@ func TestGetMigInstanceTemplateName(t *testing.T) {
fetchMigTemplateName: tc.fetchMigTemplateName,
}
migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)

templateName, err := provider.GetMigInstanceTemplateName(mig.GceRef())
cachedTemplateName, found := tc.cache.GetMigInstanceTemplateName(mig.GceRef())
Expand Down Expand Up @@ -820,7 +821,7 @@ func TestGetMigInstanceTemplate(t *testing.T) {
fetchMigTemplate: tc.fetchMigTemplate,
}
migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)

template, err := provider.GetMigInstanceTemplate(mig.GceRef())
cachedTemplate, found := tc.cache.GetMigInstanceTemplate(mig.GceRef())
Expand Down Expand Up @@ -915,7 +916,7 @@ func TestGetMigMachineType(t *testing.T) {
fetchMachineType: tc.fetchMachineType,
}
migLister := NewMigLister(cache)
provider := NewCachingMigInfoProvider(cache, migLister, client, mig.GceRef().Project, 1)
provider := NewCachingMigInfoProvider(cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second)
machine, err := provider.GetMigMachineType(mig.GceRef())
if tc.expectError {
assert.Error(t, err)
Expand All @@ -928,6 +929,91 @@ func TestGetMigMachineType(t *testing.T) {
}
}

func TestMultipleGetMigInstanceCallsLimited(t *testing.T) {
mig := &gceMig{
gceRef: GceRef{
Project: "project",
Zone: "zone",
Name: "base-instance-name",
},
}
instance := cloudprovider.Instance{
Id: "gce://project/zone/base-instance-name-abcd",
}
instanceRef, err := GceRefFromProviderId(instance.Id)
assert.Nil(t, err)
instance2 := cloudprovider.Instance{
Id: "gce://project/zone/base-instance-name-abcd2",
}
instanceRef2, err := GceRefFromProviderId(instance2.Id)
assert.Nil(t, err)
now := time.Now()
for name, tc := range map[string]struct {
refreshRateDuration time.Duration
firstCallTime time.Time
secondCallTime time.Time
expectedCallsToFetchMigInstances int
}{
"0s refresh rate duration, refetch expected": {
refreshRateDuration: 0 * time.Second,
firstCallTime: now,
secondCallTime: now,
expectedCallsToFetchMigInstances: 2,
},
"5s refresh rate duration, 0.01s between calls, no refetch expected": {
refreshRateDuration: 5 * time.Second,
firstCallTime: now,
secondCallTime: now.Add(10 * time.Millisecond),
expectedCallsToFetchMigInstances: 1,
},
"0.01s refresh rate duration, 0.01s between calls, refetch expected": {
refreshRateDuration: 10 * time.Millisecond,
firstCallTime: now,
secondCallTime: now.Add(11 * time.Millisecond),
expectedCallsToFetchMigInstances: 2,
},
} {
t.Run(name, func(t *testing.T) {
cache := emptyCache()
cache.migs = map[GceRef]Mig{
mig.gceRef: mig,
}
cache.migBaseNameCache = map[GceRef]string{mig.GceRef(): "base-instance-name"}
callCounter := make(map[GceRef]int)
client := &mockAutoscalingGceClient{
fetchMigInstances: fetchMigInstancesWithCounter(nil, callCounter),
}
migLister := NewMigLister(cache)
ft := &fakeTime{}
provider := &cachingMigInfoProvider{
cache: cache,
migLister: migLister,
gceClient: client,
projectId: projectId,
concurrentGceRefreshes: 1,
migInstancesMinRefreshWaitTime: tc.refreshRateDuration,
migInstancesLastRefreshedInfo: make(map[string]time.Time),
timeProvider: ft,
}
ft.now = tc.firstCallTime
_, err = provider.GetMigForInstance(instanceRef)
assert.NoError(t, err)
ft.now = tc.secondCallTime
_, err = provider.GetMigForInstance(instanceRef2)
assert.NoError(t, err)
assert.Equal(t, tc.expectedCallsToFetchMigInstances, callCounter[mig.GceRef()])
})
}
}

type fakeTime struct {
now time.Time
}

func (f *fakeTime) Now() time.Time {
return f.now
}

func emptyCache() *GceCache {
return &GceCache{
migs: map[GceRef]Mig{mig.GceRef(): mig},
Expand All @@ -936,6 +1022,7 @@ func emptyCache() *GceCache {
migBaseNameCache: make(map[GceRef]string),
instanceTemplateNameCache: make(map[GceRef]string),
instanceTemplatesCache: make(map[GceRef]*gce.InstanceTemplate),
instancesFromUnknownMig: make(map[GceRef]bool),
}
}

Expand All @@ -959,6 +1046,13 @@ func fetchMigInstancesConst(instances []cloudprovider.Instance) func(GceRef) ([]
}
}

func fetchMigInstancesWithCounter(instances []cloudprovider.Instance, migCounter map[GceRef]int) func(GceRef) ([]cloudprovider.Instance, error) {
return func(ref GceRef) ([]cloudprovider.Instance, error) {
migCounter[ref] = migCounter[ref] + 1
return instances, nil
}
}

func fetchMigInstancesMapping(instancesMapping map[GceRef][]cloudprovider.Instance) func(GceRef) ([]cloudprovider.Instance, error) {
return func(migRef GceRef) ([]cloudprovider.Instance, error) {
return instancesMapping[migRef], nil
Expand Down
16 changes: 12 additions & 4 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ type NodeGroupAutoscalingOptions struct {
ScaleDownUnreadyTime time.Duration
}

// GCEOptions contain autoscaling options specific to GCE cloud provider.
type GCEOptions struct {
// ConcurrentRefreshes is the maximum number of concurrently refreshed instance groups or instance templates.
ConcurrentRefreshes int
// MigInstancesMinRefreshWaitTime is the minimum time which needs to pass before GCE MIG instances from a given MIG can be refreshed.
MigInstancesMinRefreshWaitTime time.Duration
// ExpanderEphemeralStorageSupport is whether scale-up takes ephemeral storage resources into account.
ExpanderEphemeralStorageSupport bool
}

const (
// DefaultMaxAllocatableDifferenceRatio describes how Node.Status.Allocatable can differ between groups in the same NodeGroupSet
DefaultMaxAllocatableDifferenceRatio = 0.05
Expand Down Expand Up @@ -194,8 +204,8 @@ type AutoscalingOptions struct {
BalancingLabels []string
// AWSUseStaticInstanceList tells if AWS cloud provider use static instance type list or dynamically fetch from remote APIs.
AWSUseStaticInstanceList bool
// ConcurrentGceRefreshes is the maximum number of concurrently refreshed instance groups or instance templates.
ConcurrentGceRefreshes int
// GCEOptions contain autoscaling options specific to GCE cloud provider.
GCEOptions GCEOptions
// Path to kube configuration if available
KubeConfigPath string
// Burst setting for kubernetes client
Expand Down Expand Up @@ -223,8 +233,6 @@ type AutoscalingOptions struct {
MaxScaleDownParallelism int
// MaxDrainParallelism is the maximum number of nodes needing drain, that can be drained and deleted in parallel.
MaxDrainParallelism int
// GceExpanderEphemeralStorageSupport is whether scale-up takes ephemeral storage resources into account.
GceExpanderEphemeralStorageSupport bool
// RecordDuplicatedEvents controls whether events should be duplicated within a 5 minute window.
RecordDuplicatedEvents bool
// MaxNodesPerScaleUp controls how many nodes can be added in a single scale-up.
Expand Down
Loading