Skip to content

Commit

Permalink
Fix route controller create/delete spam: use instanceIDToNodeName in …
Browse files Browse the repository at this point in the history
…case node name != private DNS
  • Loading branch information
wongma7 committed Mar 17, 2022
1 parent e12142f commit ae1c793
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 9 deletions.
56 changes: 49 additions & 7 deletions pkg/providers/v1/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,12 +818,28 @@ func (p *awsSDKProvider) getCrossRequestRetryDelay(regionName string) *CrossRequ
return delayer
}

// InstanceIDIndexFunc indexes based on a Node's instance ID found in its spec.providerID
func InstanceIDIndexFunc(obj interface{}) ([]string, error) {
node, ok := obj.(*v1.Node)
if !ok {
return []string{""}, fmt.Errorf("%+v is not a Node", obj)
}
instanceID, err := KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
if err != nil {
return []string{""}, fmt.Errorf("error mapping node %q's provider ID %q to instance ID: err", node.Name, node.Spec.ProviderID, err)
}
return []string{string(instanceID)}, nil
}

// SetInformers implements InformerUser interface by setting up informer-fed caches for aws lib to
// leverage Kubernetes API for caching
func (c *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) {
klog.Infof("Setting up informers for Cloud")
c.nodeInformer = informerFactory.Core().V1().Nodes()
c.nodeInformerHasSynced = c.nodeInformer.Informer().HasSynced
c.nodeInformer.Informer().AddIndexers(cache.Indexers{
"instanceID": InstanceIDIndexFunc,
})
}

func (p *awsSDKProvider) Compute(regionName string) (EC2, error) {
Expand Down Expand Up @@ -1495,11 +1511,11 @@ func (c *Cloud) HasClusterID() bool {

// NodeAddresses is an implementation of Instances.NodeAddresses.
func (c *Cloud) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error) {
providerID, err := c.nodeNameToProviderID(name)
instanceID, err := c.nodeNameToInstanceID(name)
if err != nil {
return nil, fmt.Errorf("could not look up provider ID for node %q: %v", name, err)
return nil, fmt.Errorf("could not look up instance ID for node %q: %v", name, err)
}
return c.NodeAddressesByProviderID(ctx, string(providerID))
return c.NodeAddressesByProviderID(ctx, string(instanceID))
}

// extractIPv4NodeAddresses maps the instance information from EC2 to an array of NodeAddresses.
Expand Down Expand Up @@ -4994,10 +5010,10 @@ func (c *Cloud) findInstanceByNodeName(nodeName types.NodeName) (*ec2.Instance,
func (c *Cloud) getInstanceByNodeName(nodeName types.NodeName) (*ec2.Instance, error) {
var instance *ec2.Instance

// we leverage node cache to try to retrieve node's provider id first, as
// get instance by provider id is way more efficient than by filters in
// we leverage node cache to try to retrieve node's instance id first, as
// get instance by instance id is way more efficient than by filters in
// aws context
awsID, err := c.nodeNameToProviderID(nodeName)
awsID, err := c.nodeNameToInstanceID(nodeName)
if err != nil {
klog.V(3).Infof("Unable to convert node name %q to aws instanceID, fall back to findInstanceByNodeName: %v", nodeName, err)
instance, err = c.findInstanceByNodeName(nodeName)
Expand Down Expand Up @@ -5028,7 +5044,7 @@ func isFargateNode(nodeName string) bool {
return strings.HasPrefix(nodeName, fargateNodeNamePrefix)
}

func (c *Cloud) nodeNameToProviderID(nodeName types.NodeName) (InstanceID, error) {
func (c *Cloud) nodeNameToInstanceID(nodeName types.NodeName) (InstanceID, error) {
if strings.HasPrefix(string(nodeName), rbnNamePrefix) {
return InstanceID(nodeName), nil
}
Expand All @@ -5051,6 +5067,32 @@ func (c *Cloud) nodeNameToProviderID(nodeName types.NodeName) (InstanceID, error
return KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
}

func (c *Cloud) instanceIDToNodeName(instanceID InstanceID) (types.NodeName, error) {
if len(instanceID) == 0 {
return "", fmt.Errorf("no instanceID provided")
}

if c.nodeInformerHasSynced == nil || !c.nodeInformerHasSynced() {
return "", fmt.Errorf("node informer has not synced yet")
}

_, err := c.nodeInformer.Lister().Get(string(instanceID))
if err == nil {
return types.NodeName(instanceID), nil
}

nodes, err := c.nodeInformer.Informer().GetIndexer().IndexKeys("instanceID", string(instanceID))
if err != nil {
return "", fmt.Errorf("error getting node with instanceID %q: %v", string(instanceID), err)
} else if len(nodes) == 0 {
return "", fmt.Errorf("node with instanceID %q not found", string(instanceID))
} else if len(nodes) > 1 {
return "", fmt.Errorf("multiple nodes with instanceID %q found: %v", string(instanceID), nodes)
}

return types.NodeName(nodes[0]), nil
}

func checkMixedProtocol(ports []v1.ServicePort) error {
if len(ports) == 0 {
return nil
Expand Down
8 changes: 6 additions & 2 deletions pkg/providers/v1/aws_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,13 @@ func (c *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudpro
// Capture instance routes
instanceID := aws.StringValue(r.InstanceId)
if instanceID != "" {
instance, found := instances[instanceID]
_, found := instances[instanceID]
if found {
route.TargetNode = mapInstanceToNodeName(instance)
node, err := c.instanceIDToNodeName(InstanceID(instanceID))
if err != nil {
return nil, err
}
route.TargetNode = node
routes = append(routes, route)
} else {
klog.Warningf("unable to find instance ID %s in the list of instances being routed to", instanceID)
Expand Down

0 comments on commit ae1c793

Please sign in to comment.