Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chunk target operatation for aws targetGroup #256

Merged
merged 1 commit into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 110 additions & 73 deletions pkg/providers/v1/aws_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ var (
defaultNlbHealthCheckThreshold = int64(3)
defaultHealthCheckPort = "traffic-port"
defaultHealthCheckPath = "/"

// Defaults for ELB Target operations
defaultRegisterTargetsChunkSize = 100
defaultDeregisterTargetsChunkSize = 100
)

func isNLB(annotations map[string]string) bool {
Expand Down Expand Up @@ -561,6 +565,7 @@ func (c *Cloud) deleteListenerV2(listener *elbv2.Listener) error {
// ensureTargetGroup creates a target group with a set of instances.
func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, serviceName types.NamespacedName, mapping nlbPortMapping, instances []string, vpcID string, tags map[string]string) (*elbv2.TargetGroup, error) {
dirty := false
expectedTargets := c.computeTargetGroupExpectedTargets(instances, mapping.TrafficPort)
if targetGroup == nil {
targetType := "instance"
name := c.buildTargetGroupName(serviceName, mapping.FrontendPort, mapping.TrafficPort, mapping.TrafficProtocol, targetType, mapping)
Expand Down Expand Up @@ -607,86 +612,23 @@ func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, serviceName ty
}
}

registerInput := &elbv2.RegisterTargetsInput{
TargetGroupArn: result.TargetGroups[0].TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for _, instanceID := range instances {
registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{
Id: aws.String(string(instanceID)),
Port: aws.Int64(mapping.TrafficPort),
})
}

_, err = c.elbv2.RegisterTargets(registerInput)
if err != nil {
return nil, fmt.Errorf("error registering targets for load balancer: %q", err)
tg := result.TargetGroups[0]
tgARN := aws.StringValue(tg.TargetGroupArn)
if err := c.ensureTargetGroupTargets(tgARN, expectedTargets, nil); err != nil {
return nil, err
}

return result.TargetGroups[0], nil
return tg, nil
}

// handle instances in service
{
healthResponse, err := c.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: targetGroup.TargetGroupArn})
tgARN := aws.StringValue(targetGroup.TargetGroupArn)
actualTargets, err := c.obtainTargetGroupActualTargets(tgARN)
if err != nil {
return nil, fmt.Errorf("error describing target group health: %q", err)
}
actualIDs := []string{}
for _, healthDescription := range healthResponse.TargetHealthDescriptions {
if aws.StringValue(healthDescription.TargetHealth.State) == elbv2.TargetHealthStateEnumHealthy {
actualIDs = append(actualIDs, *healthDescription.Target.Id)
} else if healthDescription.TargetHealth.Reason != nil {
switch aws.StringValue(healthDescription.TargetHealth.Reason) {
case elbv2.TargetHealthReasonEnumTargetDeregistrationInProgress:
// We don't need to count this instance in service if it is
// on its way out
default:
actualIDs = append(actualIDs, *healthDescription.Target.Id)
}
}
}

actual := sets.NewString(actualIDs...)
expected := sets.NewString(instances...)

additions := expected.Difference(actual)
removals := actual.Difference(expected)

if len(additions) > 0 {
registerInput := &elbv2.RegisterTargetsInput{
TargetGroupArn: targetGroup.TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for instanceID := range additions {
registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err := c.elbv2.RegisterTargets(registerInput)
if err != nil {
return nil, fmt.Errorf("error registering new targets in target group: %q", err)
}
dirty = true
return nil, err
}

if len(removals) > 0 {
deregisterInput := &elbv2.DeregisterTargetsInput{
TargetGroupArn: targetGroup.TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for instanceID := range removals {
deregisterInput.Targets = append(deregisterInput.Targets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err := c.elbv2.DeregisterTargets(deregisterInput)
if err != nil {
return nil, fmt.Errorf("error trying to deregister targets in target group: %q", err)
}
dirty = true
if err := c.ensureTargetGroupTargets(tgARN, expectedTargets, actualTargets); err != nil {
return nil, err
}
}

Expand Down Expand Up @@ -736,6 +678,101 @@ func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, serviceName ty
return targetGroup, nil
}

func (c *Cloud) ensureTargetGroupTargets(tgARN string, expectedTargets []*elbv2.TargetDescription, actualTargets []*elbv2.TargetDescription) error {
targetsToRegister, targetsToDeregister := c.diffTargetGroupTargets(expectedTargets, actualTargets)
if len(targetsToRegister) > 0 {
targetsToRegisterChunks := c.chunkTargetDescriptions(targetsToRegister, defaultRegisterTargetsChunkSize)
for _, targetsChunk := range targetsToRegisterChunks {
req := &elbv2.RegisterTargetsInput{
TargetGroupArn: aws.String(tgARN),
Targets: targetsChunk,
}
if _, err := c.elbv2.RegisterTargets(req); err != nil {
return fmt.Errorf("error trying to register targets in target group: %q", err)
}
}
}
if len(targetsToDeregister) > 0 {
targetsToDeregisterChunks := c.chunkTargetDescriptions(targetsToDeregister, defaultDeregisterTargetsChunkSize)
for _, targetsChunk := range targetsToDeregisterChunks {
req := &elbv2.DeregisterTargetsInput{
TargetGroupArn: aws.String(tgARN),
Targets: targetsChunk,
}
if _, err := c.elbv2.DeregisterTargets(req); err != nil {
return fmt.Errorf("error trying to deregister targets in target group: %q", err)
}
}
}
return nil
}

func (c *Cloud) computeTargetGroupExpectedTargets(instanceIDs []string, port int64) []*elbv2.TargetDescription {
expectedTargets := make([]*elbv2.TargetDescription, 0, len(instanceIDs))
for _, instanceID := range instanceIDs {
expectedTargets = append(expectedTargets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(port),
})
}
return expectedTargets
}

func (c *Cloud) obtainTargetGroupActualTargets(tgARN string) ([]*elbv2.TargetDescription, error) {
req := &elbv2.DescribeTargetHealthInput{
TargetGroupArn: aws.String(tgARN),
}
resp, err := c.elbv2.DescribeTargetHealth(req)
if err != nil {
return nil, fmt.Errorf("error describing target group health: %q", err)
}
actualTargets := make([]*elbv2.TargetDescription, 0, len(resp.TargetHealthDescriptions))
for _, targetDesc := range resp.TargetHealthDescriptions {
if targetDesc.TargetHealth.Reason != nil && aws.StringValue(targetDesc.TargetHealth.Reason) == elbv2.TargetHealthReasonEnumTargetDeregistrationInProgress {
continue
}
actualTargets = append(actualTargets, targetDesc.Target)
}
return actualTargets, nil
}

// diffTargetGroupTargets computes the targets to register and targets to deregister based on existingTargets and desired instances.
func (c *Cloud) diffTargetGroupTargets(expectedTargets []*elbv2.TargetDescription, actualTargets []*elbv2.TargetDescription) (targetsToRegister []*elbv2.TargetDescription, targetsToDeregister []*elbv2.TargetDescription) {
expectedTargetsByUID := make(map[string]*elbv2.TargetDescription, len(expectedTargets))
for _, target := range expectedTargets {
targetUID := fmt.Sprintf("%v:%v", aws.StringValue(target.Id), aws.Int64Value(target.Port))
expectedTargetsByUID[targetUID] = target
}
actualTargetsByUID := make(map[string]*elbv2.TargetDescription, len(actualTargets))
for _, target := range actualTargets {
targetUID := fmt.Sprintf("%v:%v", aws.StringValue(target.Id), aws.Int64Value(target.Port))
actualTargetsByUID[targetUID] = target
}

expectedTargetsUIDs := sets.StringKeySet(expectedTargetsByUID)
actualTargetsUIDs := sets.StringKeySet(actualTargetsByUID)
for _, targetUID := range expectedTargetsUIDs.Difference(actualTargetsUIDs).List() {
targetsToRegister = append(targetsToRegister, expectedTargetsByUID[targetUID])
}
for _, targetUID := range actualTargetsUIDs.Difference(expectedTargetsUIDs).List() {
targetsToDeregister = append(targetsToDeregister, actualTargetsByUID[targetUID])
}
return targetsToRegister, targetsToDeregister
}

// chunkTargetDescriptions will split slice of TargetDescription into chunks
func (c *Cloud) chunkTargetDescriptions(targets []*elbv2.TargetDescription, chunkSize int) [][]*elbv2.TargetDescription {
var chunks [][]*elbv2.TargetDescription
for i := 0; i < len(targets); i += chunkSize {
end := i + chunkSize
if end > len(targets) {
end = len(targets)
}
chunks = append(chunks, targets[i:end])
}
return chunks
}

// updateInstanceSecurityGroupsForNLB will adjust securityGroup's settings to allow inbound traffic into instances from clientCIDRs and portMappings.
// TIP: if either instances or clientCIDRs or portMappings are nil, then the securityGroup rules for lbName are cleared.
func (c *Cloud) updateInstanceSecurityGroupsForNLB(lbName string, instances map[InstanceID]*ec2.Instance, subnetCIDRs []string, clientCIDRs []string, portMappings []nlbPortMapping) error {
Expand Down
Loading