Skip to content

Commit

Permalink
Merge pull request #1103 from mmerkes/release-1.32
Browse files Browse the repository at this point in the history
1.32: Requires node topology labels to be set for known supported instance …
  • Loading branch information
k8s-ci-robot authored Feb 14, 2025
2 parents 147e836 + adfa1f2 commit 1979783
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 30 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ site/
e2e.test
.idea/
**/*.swp
.DS_Store
2 changes: 1 addition & 1 deletion pkg/providers/v1/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func newAWSCloud2(cfg config.CloudConfig, awsServices Services, provider config.
}
awsCloud.instanceCache.cloud = awsCloud
awsCloud.zoneCache.cloud = awsCloud
awsCloud.instanceTopologyManager = resourcemanagers.NewInstanceTopologyManager(ec2v2)
awsCloud.instanceTopologyManager = resourcemanagers.NewInstanceTopologyManager(ec2v2, &cfg)

tagged := cfg.Global.KubernetesClusterTag != "" || cfg.Global.KubernetesClusterID != ""
if cfg.Global.VPC != "" && (cfg.Global.SubnetID != "" || cfg.Global.RoleARN != "") && tagged {
Expand Down
11 changes: 10 additions & 1 deletion pkg/providers/v1/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package config

import (
"fmt"
"github.com/aws/aws-sdk-go/aws/request"
"strings"

"github.com/aws/aws-sdk-go/aws/request"

"github.com/aws/aws-sdk-go/aws/endpoints"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -62,6 +63,14 @@ type CloudConfig struct {

// NodeIPFamilies determines which IP addresses are added to node objects and their ordering.
NodeIPFamilies []string

// Override to regex validating whether or not instance types require instance topology
// to get a definitive response. This will impact whether or not the node controller will
// block on getting instance topology information for nodes.
// See pkg/resourcemanagers/topology.go for more details.
//
// WARNING: Updating the default behavior and corresponding unit tests would be a much safer option.
SupportedTopologyInstanceTypePattern string `json:"supportedTopologyInstanceTypePattern,omitempty" yaml:"supportedTopologyInstanceTypePattern,omitempty"`
}
// [ServiceOverride "1"]
// Service = s3
Expand Down
16 changes: 12 additions & 4 deletions pkg/providers/v1/instances_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,25 @@ func (c *Cloud) getAdditionalLabels(ctx context.Context, zoneName string, instan
// If topology labels are already set, skip.
if _, ok := existingLabels[LabelNetworkNodePrefix+"1"]; !ok {
nodeTopology, err := c.instanceTopologyManager.GetNodeTopology(ctx, instanceType, region, instanceID)
// We've seen some edge cases where this functionality is problematic, so swallowing errors and logging
// to avoid short-circuiting syncing nodes. If it's an intermittent issue, the labels will be added
// on subsequent attempts.

if err != nil {
klog.Warningf("Failed to get node topology. Moving on without setting labels: %q", err)
if c.instanceTopologyManager.DoesInstanceTypeRequireResponse(instanceType) {
klog.Errorf("Failed to get node topology for instance type %s and one is expected %v.", instanceType, err)
return nil, err
}

// We don't expect that there will be a response for these instance types anyway,
// so we're going to move on without setting the labels.
klog.Warningf("Failed to get node topology for instance type %s and ID %s. Moving on without setting labels. Ignoring %v",
instanceType, instanceID, err)
} else if nodeTopology != nil {
for index, networkNode := range nodeTopology.NetworkNodes {
layer := index + 1
label := LabelNetworkNodePrefix + strconv.Itoa(layer)
additionalLabels[label] = networkNode
}
} else {
klog.Infof("No instance topolopy for instance type %s available.", instanceType)
}
}

Expand Down
25 changes: 24 additions & 1 deletion pkg/providers/v1/instances_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,14 @@ func TestInstanceMetadata(t *testing.T) {
assert.Equal(t, map[string]string{}, result.AdditionalLabels)
})

t.Run("Should swallow errors if getting node topology fails", func(t *testing.T) {
t.Run("Should swallow errors if getting node topology fails if instance type not expected to be supported", func(t *testing.T) {
instance := makeInstance("i-00000000000000000", "192.168.0.1", "1.2.3.4", "instance-same.ec2.internal", "instance-same.ec2.external", nil, true)
c, _ := mockInstancesResp(&instance, []*ec2.Instance{&instance})
var mockedTopologyManager resourcemanagers.MockedInstanceTopologyManager
c.instanceTopologyManager = &mockedTopologyManager
mockedTopologyManager.On("GetNodeTopology", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil,
services.NewMockAPIError("InvalidParameterValue", "Nope."))
mockedTopologyManager.On("DoesInstanceTypeRequireResponse", mock.Anything).Return(false)
node := &v1.Node{
Spec: v1.NodeSpec{
ProviderID: fmt.Sprintf("aws:///us-west-2c/1abc-2def/%s", *instance.InstanceId),
Expand All @@ -261,6 +262,28 @@ func TestInstanceMetadata(t *testing.T) {
LabelZoneID: "az1",
}, result.AdditionalLabels)
})

t.Run("Should not swallow errors if getting node topology fails if instance type is expected to be supported", func(t *testing.T) {
instance := makeInstance("i-00000000000000000", "192.168.0.1", "1.2.3.4", "instance-same.ec2.internal", "instance-same.ec2.external", nil, true)
c, _ := mockInstancesResp(&instance, []*ec2.Instance{&instance})
var mockedTopologyManager resourcemanagers.MockedInstanceTopologyManager
c.instanceTopologyManager = &mockedTopologyManager
mockedTopologyManager.On("GetNodeTopology", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil,
services.NewMockAPIError("InvalidParameterValue", "Nope."))
mockedTopologyManager.On("DoesInstanceTypeRequireResponse", mock.Anything).Return(true)
node := &v1.Node{
Spec: v1.NodeSpec{
ProviderID: fmt.Sprintf("aws:///us-west-2c/1abc-2def/%s", *instance.InstanceId),
},
}

_, err := c.InstanceMetadata(context.TODO(), node)
if err == nil {
t.Error("Should error getting InstanceMetadata but succeeded.")
}

mockedTopologyManager.AssertNumberOfCalls(t, "GetNodeTopology", 1)
})
}

func getCloudWithMockedDescribeInstances(instanceExists bool, instanceState string) *Cloud {
Expand Down
65 changes: 54 additions & 11 deletions pkg/resourcemanagers/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,33 @@ import (
"context"
"errors"
"fmt"
"regexp"
"time"

"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/smithy-go"
"k8s.io/client-go/tools/cache"
"k8s.io/cloud-provider-aws/pkg/providers/v1/config"
"k8s.io/cloud-provider-aws/pkg/services"
"k8s.io/klog/v2"
)

const instanceTopologyManagerCacheTimeout = 24 * time.Hour

/*
We need to ensure that instance types that we expect a response will not successfully complete syncing unless
we get a response, so we can track known instance types that we expect to get a response for.
Supported instance types for DescribeInstanceTopology as of 2/6/25 from API documentation:
https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeInstanceTopology.html
hpc6a.48xlarge | hpc6id.32xlarge | hpc7a.12xlarge | hpc7a.24xlarge | hpc7a.48xlarge | hpc7a.96xlarge | hpc7g.4xlarge | hpc7g.8xlarge | hpc7g.16xlarge
p3dn.24xlarge | p4d.24xlarge | p4de.24xlarge | p5.48xlarge | p5e.48xlarge | p5en.48xlarge
trn1.2xlarge | trn1.32xlarge | trn1n.32xlarge | trn2.48xlarge | trn2u.48xlarge
*/
var defaultSupportedTopologyInstanceTypePattern = regexp.MustCompile(`^(hpc|trn|p|inf)[0-9]+[a-z]*(\.[0-9a-z]*)$`)

// stringKeyFunc is a string as cache key function
func topStringKeyFunc(obj interface{}) (string, error) {
// Type should already be a string, so just return as is.
Expand All @@ -46,26 +61,36 @@ func topStringKeyFunc(obj interface{}) (string, error) {
// InstanceTopologyManager enables mocking the InstanceTopologyManager.
type InstanceTopologyManager interface {
GetNodeTopology(ctx context.Context, instanceType string, region string, instanceID string) (*types.InstanceTopology, error)
DoesInstanceTypeRequireResponse(instanceType string) bool
}

// instanceTopologyManager manages getting instance topology for nodes.
type instanceTopologyManager struct {
ec2 services.Ec2SdkV2
unsupportedKeyStore cache.Store
ec2 services.Ec2SdkV2
unsupportedKeyStore cache.Store
supportedTopologyInstanceTypePattern *regexp.Regexp
}

// NewInstanceTopologyManager generates a new InstanceTopologyManager.
func NewInstanceTopologyManager(ec2 services.Ec2SdkV2) InstanceTopologyManager {
func NewInstanceTopologyManager(ec2 services.Ec2SdkV2, cfg *config.CloudConfig) InstanceTopologyManager {
var supportedTopologyInstanceTypePattern *regexp.Regexp
if cfg.Global.SupportedTopologyInstanceTypePattern != "" {
supportedTopologyInstanceTypePattern = regexp.MustCompile(cfg.Global.SupportedTopologyInstanceTypePattern)
} else {
supportedTopologyInstanceTypePattern = defaultSupportedTopologyInstanceTypePattern
}

return &instanceTopologyManager{
ec2: ec2,
ec2: ec2,
supportedTopologyInstanceTypePattern: supportedTopologyInstanceTypePattern,
// These should change very infrequently, if ever, so checking once a day sounds fair.
unsupportedKeyStore: cache.NewTTLStore(topStringKeyFunc, instanceTopologyManagerCacheTimeout),
}
}

// GetNodeTopology gets the instance topology for a node.
func (t *instanceTopologyManager) GetNodeTopology(ctx context.Context, instanceType string, region string, instanceID string) (*types.InstanceTopology, error) {
if t.mightSupportTopology(instanceType, region) {
if t.mightSupportTopology(instanceID, instanceType, region) {
request := &ec2.DescribeInstanceTopologyInput{InstanceIds: []string{instanceID}}
topologies, err := t.ec2.DescribeInstanceTopology(ctx, request)
if err != nil {
Expand All @@ -85,19 +110,26 @@ func (t *instanceTopologyManager) GetNodeTopology(ctx context.Context, instanceT
t.addUnsupported(region)
return nil, nil
case "RequestLimitExceeded":
// Gracefully handle request throttling
klog.Warningf("Exceeded ec2:DescribeInstanceTopology request limits. Try again later: %q", err)
return nil, nil
return nil, err
}
}

// Unhandled error
klog.Errorf("Error describing instance topology: %q", err)
return nil, err
} else if len(topologies) == 0 {
// If no topology is returned, track the instance type as unsupported
klog.Infof("Instance type %s unsupported for getting instance topology", instanceType)
t.addUnsupported(instanceType)
// If no topology is returned, track the instance type as unsupported if we don't require a response.
if t.DoesInstanceTypeRequireResponse(instanceType) {
// While the instance type could be unsupported, it's also possible that the instance is deleting or shut down
// and has no active instance topology. In this case, we don't want to track it as unsupported.
klog.Warningf("Instance %s of type %s has no instance topology listed but may be a supported type.", instanceID, instanceType)
// Track that the instance ID is does not include a response. This will prevent us from calling again unnecessarily.
t.addUnsupported(instanceID)
} else {
klog.Infof("Instance type %s unsupported for getting instance topology", instanceType)
t.addUnsupported(instanceType)
}
return nil, nil
}

Expand All @@ -106,14 +138,19 @@ func (t *instanceTopologyManager) GetNodeTopology(ctx context.Context, instanceT
return nil, nil
}

// DoesInstanceTypeRequireResponse verifies whether or not we expect an instance to have an instance topology response.
func (t *instanceTopologyManager) DoesInstanceTypeRequireResponse(instanceType string) bool {
return t.supportedTopologyInstanceTypePattern.MatchString(instanceType)
}

func (t *instanceTopologyManager) addUnsupported(key string) {
err := t.unsupportedKeyStore.Add(key)
if err != nil {
klog.Errorf("Failed to cache unsupported key %s: %q", key, err)
}
}

func (t *instanceTopologyManager) mightSupportTopology(instanceType string, region string) bool {
func (t *instanceTopologyManager) mightSupportTopology(instanceID string, instanceType string, region string) bool {
// In the case of fargate and possibly other variants, the instance type will be empty.
if len(instanceType) == 0 {
return false
Expand All @@ -125,6 +162,12 @@ func (t *instanceTopologyManager) mightSupportTopology(instanceType string, regi
klog.Errorf("Failed to get cached unsupported region: %q:", err)
}

if _, exists, err := t.unsupportedKeyStore.GetByKey(instanceID); exists {
return false
} else if err != nil {
klog.Errorf("Failed to get cached unsupported instance ID: %q:", err)
}

if _, exists, err := t.unsupportedKeyStore.GetByKey(instanceType); exists {
return false
} else if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/resourcemanagers/topology_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,9 @@ func (m *MockedInstanceTopologyManager) GetNodeTopology(ctx context.Context, ins
}
return args.Get(0).(*types.InstanceTopology), nil
}

// DoesInstanceTypeRequireResponse mocks InstanceTopologyManager.DoesInstanceTypeRequireResponse.
func (m *MockedInstanceTopologyManager) DoesInstanceTypeRequireResponse(instanceType string) bool {
args := m.Called(instanceType)
return args.Get(0).(bool)
}
Loading

0 comments on commit 1979783

Please sign in to comment.