Skip to content

Commit

Permalink
fix the regression of IP mode support for fargate pods (kubernetes-si…
Browse files Browse the repository at this point in the history
  • Loading branch information
M00nF1sh authored and Timothy-Dougherty committed Nov 9, 2023
1 parent 21141e5 commit 503187e
Show file tree
Hide file tree
Showing 5 changed files with 722 additions and 27 deletions.
11 changes: 9 additions & 2 deletions pkg/k8s/node_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package k8s
import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"regexp"
"strings"
)

const (
toBeDeletedByCATaint = "ToBeDeletedByClusterAutoscaler"
)

var awsInstanceIDRegex = regexp.MustCompile("^i-[^/]*$")

// IsNodeReady returns whether node is ready.
func IsNodeReady(node *corev1.Node) bool {
nodeReadyCond := GetNodeCondition(node, corev1.NodeReady)
Expand Down Expand Up @@ -47,6 +50,10 @@ func ExtractNodeInstanceID(node *corev1.Node) (string, error) {
return "", errors.Errorf("providerID is not specified for node: %s", node.Name)
}

p := strings.Split(providerID, "/")
return p[len(p)-1], nil
providerIDParts := strings.Split(providerID, "/")
instanceID := providerIDParts[len(providerIDParts)-1]
if !awsInstanceIDRegex.MatchString(instanceID) {
return "", errors.Errorf("providerID %s is invalid for EC2 instances, node: %s", providerID, node.Name)
}
return instanceID, nil
}
16 changes: 15 additions & 1 deletion pkg/k8s/node_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func TestExtractNodeInstanceID(t *testing.T) {
wantErr: errors.New("providerID is not specified for node: my-node-name"),
},
{
name: "node with providerID",
name: "node by EC2 instance",
args: args{
node: &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -225,6 +225,20 @@ func TestExtractNodeInstanceID(t *testing.T) {
},
want: "i-abcdefg0",
},
{
name: "node by EKS Fargate",
args: args{
node: &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "fargate-ip-192-168-138-30.us-west-2.compute.internal",
},
Spec: corev1.NodeSpec{
ProviderID: "aws:///us-west-2b/368270442a-793d42d32c704bb793ca88a6a14ddd6e/fargate-ip-192-168-138-30.us-west-2.compute.internal",
},
},
},
wantErr: errors.New("providerID aws:///us-west-2b/368270442a-793d42d32c704bb793ca88a6a14ddd6e/fargate-ip-192-168-138-30.us-west-2.compute.internal is invalid for EC2 instances, node: fargate-ip-192-168-138-30.us-west-2.compute.internal"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
98 changes: 85 additions & 13 deletions pkg/networking/pod_eni_info_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apimachinery/pkg/util/sets"
"net"
"sigs.k8s.io/aws-load-balancer-controller/pkg/algorithm"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -20,6 +21,10 @@ import (

const (
defaultPodENIInfoCacheTTL = 10 * time.Minute
// EC2:DescribeNetworkInterface supports up to 200 filters per call.
describeNetworkInterfacesFiltersLimit = 200

labelEKSComputeType = "eks.amazonaws.com/compute-type"
)

// PodENIInfoResolver is responsible for resolve the AWS VPC ENI that supports pod network.
Expand All @@ -29,15 +34,17 @@ type PodENIInfoResolver interface {
}

// NewDefaultPodENIInfoResolver constructs new defaultPodENIInfoResolver.
func NewDefaultPodENIInfoResolver(k8sClient client.Client, ec2Client services.EC2, nodeInfoProvider NodeInfoProvider, logger logr.Logger) *defaultPodENIInfoResolver {
func NewDefaultPodENIInfoResolver(k8sClient client.Client, ec2Client services.EC2, nodeInfoProvider NodeInfoProvider, vpcID string, logger logr.Logger) *defaultPodENIInfoResolver {
return &defaultPodENIInfoResolver{
k8sClient: k8sClient,
ec2Client: ec2Client,
nodeInfoProvider: nodeInfoProvider,
logger: logger,
podENIInfoCache: cache.NewExpiring(),
podENIInfoCacheMutex: sync.RWMutex{},
podENIInfoCacheTTL: defaultPodENIInfoCacheTTL,
k8sClient: k8sClient,
ec2Client: ec2Client,
nodeInfoProvider: nodeInfoProvider,
vpcID: vpcID,
logger: logger,
podENIInfoCache: cache.NewExpiring(),
podENIInfoCacheMutex: sync.RWMutex{},
podENIInfoCacheTTL: defaultPodENIInfoCacheTTL,
describeNetworkInterfacesIPChunkSize: describeNetworkInterfacesFiltersLimit - 1, // we used 1 filter for VPC.
}
}

Expand All @@ -51,6 +58,8 @@ type defaultPodENIInfoResolver struct {
ec2Client services.EC2
// nodeInfoProvider
nodeInfoProvider NodeInfoProvider
// vpcID
vpcID string
// logger
logger logr.Logger

Expand All @@ -62,6 +71,9 @@ type defaultPodENIInfoResolver struct {
// TTL for each cache entries.
// Note: we assume pod's ENI information(e.g. securityGroups) haven't changed per podENICacheTTL.
podENIInfoCacheTTL time.Duration

// chunkSize when describe network interface with IPAddress filter.
describeNetworkInterfacesIPChunkSize int
}

func (r *defaultPodENIInfoResolver) Resolve(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error) {
Expand Down Expand Up @@ -129,7 +141,8 @@ func (r *defaultPodENIInfoResolver) saveENIInfosToCache(pods []k8s.PodInfo, eniI
func (r *defaultPodENIInfoResolver) resolveViaCascadedLookup(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error) {
resolveFuncs := []func(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error){
r.resolveViaPodENIAnnotation,
r.resolveViaVPCIPAddress,
r.resolveViaNodeENIs,
r.resolveViaVPCENIs,
// TODO, add support for kubenet CNI plugin(kops) by resolve via routeTable.
}

Expand All @@ -151,7 +164,8 @@ func (r *defaultPodENIInfoResolver) resolveViaCascadedLookup(ctx context.Context
return eniInfoByPodKey, nil
}

// resolveViaPodENIAnnotation tries to resolve a pod ENI via the branch ENI annotation.
// resolveViaPodENIAnnotation tries to resolve pod ENI by lookup pod's ENIInfo annotation.
// with aws-vpc-cni CNI plugin's SecurityGroups for pods feature, podIP is supported by branchENI, whose information is exposed as pod annotation.
func (r *defaultPodENIInfoResolver) resolveViaPodENIAnnotation(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error) {
podKeysByENIID := make(map[string][]types.NamespacedName)
for _, pod := range pods {
Expand Down Expand Up @@ -191,8 +205,9 @@ func (r *defaultPodENIInfoResolver) resolveViaPodENIAnnotation(ctx context.Conte
return eniInfoByPodKey, nil
}

// resolveViaVPCIPAddress tries to resolve Pod ENI through the Pod IPAddress within VPC.
func (r *defaultPodENIInfoResolver) resolveViaVPCIPAddress(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error) {
// resolveViaNodeENIs tries to resolve Pod ENI by matching podIP against ENIs on EC2 node's ENIs.
// with aws-vpc-cni CNI plugin, podIP can be supported by either IPv4Addresses or IPv4Prefixes on ENI.
func (r *defaultPodENIInfoResolver) resolveViaNodeENIs(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error) {
nodeKeysSet := make(map[types.NamespacedName]sets.Empty)
for _, pod := range pods {
nodeKey := types.NamespacedName{Name: pod.NodeName}
Expand All @@ -204,13 +219,20 @@ func (r *defaultPodENIInfoResolver) resolveViaVPCIPAddress(ctx context.Context,
if err := r.k8sClient.Get(ctx, nodeKey, node); err != nil {
return nil, err
}
// Fargate based nodes are not EC2 instances
if node.Labels[labelEKSComputeType] == "fargate" {
continue
}
nodes = append(nodes, node)
}
if len(nodes) == 0 {
return nil, nil
}

nodeInstanceByNodeKey, err := r.nodeInfoProvider.FetchNodeInstances(ctx, nodes)
if err != nil {
return nil, err
}

eniInfoByPodKey := make(map[types.NamespacedName]ENIInfo, len(pods))
for _, pod := range pods {
nodeKey := types.NamespacedName{Name: pod.NodeName}
Expand All @@ -226,6 +248,56 @@ func (r *defaultPodENIInfoResolver) resolveViaVPCIPAddress(ctx context.Context,
return eniInfoByPodKey, nil
}

// resolveViaVPCENIs tries to resolve pod ENI by matching podIP against ENIs in vpc.
// with EKS fargate pods, podIP is supported by an ENI in vpc.
func (r *defaultPodENIInfoResolver) resolveViaVPCENIs(ctx context.Context, pods []k8s.PodInfo) (map[types.NamespacedName]ENIInfo, error) {
podKeysByIP := make(map[string][]types.NamespacedName, len(pods))
for _, pod := range pods {
podKeysByIP[pod.PodIP] = append(podKeysByIP[pod.PodIP], pod.Key)
}
if len(podKeysByIP) == 0 {
return nil, nil
}

podIPs := sets.StringKeySet(podKeysByIP).List()
podIPChunks := algorithm.ChunkStrings(podIPs, r.describeNetworkInterfacesIPChunkSize)
eniByID := make(map[string]*ec2sdk.NetworkInterface)
for _, podIPChunk := range podIPChunks {
req := &ec2sdk.DescribeNetworkInterfacesInput{
Filters: []*ec2sdk.Filter{
{
Name: awssdk.String("vpc-id"),
Values: awssdk.StringSlice([]string{r.vpcID}),
},
{
Name: awssdk.String("addresses.private-ip-address"),
Values: awssdk.StringSlice(podIPChunk),
},
},
}
enis, err := r.ec2Client.DescribeNetworkInterfacesAsList(ctx, req)
if err != nil {
return nil, err
}
for _, eni := range enis {
eniID := awssdk.StringValue(eni.NetworkInterfaceId)
eniByID[eniID] = eni
}
}

eniInfoByPodKey := make(map[types.NamespacedName]ENIInfo)
for _, eni := range eniByID {
eniInfo := buildENIInfoViaENI(eni)
for _, addr := range eni.PrivateIpAddresses {
eniIP := awssdk.StringValue(addr.PrivateIpAddress)
for _, podKey := range podKeysByIP[eniIP] {
eniInfoByPodKey[podKey] = eniInfo
}
}
}
return eniInfoByPodKey, nil
}

// isPodSupportedByNodeENI checks whether pod is supported by specific nodeENI.
func (r *defaultPodENIInfoResolver) isPodSupportedByNodeENI(pod k8s.PodInfo, nodeENI *ec2sdk.InstanceNetworkInterface) bool {
for _, ipv4Address := range nodeENI.PrivateIpAddresses {
Expand Down
Loading

0 comments on commit 503187e

Please sign in to comment.