Skip to content

Commit

Permalink
Introduce flags for fine-tuning maximum concurrent reconciles per res…
Browse files Browse the repository at this point in the history
…ource (#141)

This commit aims to improve the performance and scalability of the ACK
service controllers by introducing support for configuring the maximum
number of concurrent reconciles for individual resources. The primary
motivation behind this change is to address varying workload demands and
resource requirements in Kubernetes environements.

This patch introduces two new flags (soon exposed to in the helm chart
values):
- `--reconcile-default-max-concurrent-syncs`: allow users to specify the
  default maximum concurrency level for all the resources.
- `--reconcile-resource-max-concurrent-syncs`: enable users to define
  resource-specific maximum concurrency settings, overriding the default
  value.

A use case example would be the scenario of an admin wanting to manage
EKS resources using the ACK `eks-controller`. It is normal for each
cluster to have multiple nodegroups/PIAs/AccessEntries/FargateProfiles,
which can indicate the proportion of needed max concurrencies for each
resource.

For instance, you can configure the EKS Controller with a default
maximum of 2 concurrent reconciles for all the resources, and override
the maximum concurrency to 10 for `AccessEntries` and
`PodIdentityAssociations`

Signed-off-by: Amine Hilaly <hilalyamine@gmail.com>

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
  • Loading branch information
a-hilaly authored Mar 4, 2024
1 parent 861f7ed commit 20e1c0d
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 54 deletions.
154 changes: 102 additions & 52 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,26 @@ import (
)

const (
flagEnableLeaderElection = "enable-leader-election"
flagLeaderElectionNamespace = "leader-election-namespace"
flagMetricAddr = "metrics-addr"
flagHealthzAddr = "healthz-addr"
flagEnableDevLogging = "enable-development-logging"
flagAWSRegion = "aws-region"
flagAWSEndpointURL = "aws-endpoint-url"
flagAWSIdentityEndpointURL = "aws-identity-endpoint-url"
flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls"
flagLogLevel = "log-level"
flagResourceTags = "resource-tags"
flagWatchNamespace = "watch-namespace"
flagEnableWebhookServer = "enable-webhook-server"
flagWebhookServerAddr = "webhook-server-addr"
flagDeletionPolicy = "deletion-policy"
flagReconcileDefaultResyncSeconds = "reconcile-default-resync-seconds"
flagReconcileResourceResyncSeconds = "reconcile-resource-resync-seconds"
envVarAWSRegion = "AWS_REGION"
flagEnableLeaderElection = "enable-leader-election"
flagLeaderElectionNamespace = "leader-election-namespace"
flagMetricAddr = "metrics-addr"
flagHealthzAddr = "healthz-addr"
flagEnableDevLogging = "enable-development-logging"
flagAWSRegion = "aws-region"
flagAWSEndpointURL = "aws-endpoint-url"
flagAWSIdentityEndpointURL = "aws-identity-endpoint-url"
flagUnsafeAWSEndpointURLs = "allow-unsafe-aws-endpoint-urls"
flagLogLevel = "log-level"
flagResourceTags = "resource-tags"
flagWatchNamespace = "watch-namespace"
flagEnableWebhookServer = "enable-webhook-server"
flagWebhookServerAddr = "webhook-server-addr"
flagDeletionPolicy = "deletion-policy"
flagReconcileDefaultResyncSeconds = "reconcile-default-resync-seconds"
flagReconcileResourceResyncSeconds = "reconcile-resource-resync-seconds"
flagReconcileDefaultMaxConcurrency = "reconcile-default-max-concurrent-syncs"
flagReconcileResourceMaxConcurrency = "reconcile-resource-max-concurrent-syncs"
envVarAWSRegion = "AWS_REGION"
)

var (
Expand All @@ -74,24 +76,26 @@ var (

// Config contains configuration options for ACK service controllers
type Config struct {
MetricsAddr string
HealthzAddr string
EnableLeaderElection bool
LeaderElectionNamespace string
EnableDevelopmentLogging bool
AccountID string
Region string
IdentityEndpointURL string
EndpointURL string
AllowUnsafeEndpointURL bool
LogLevel string
ResourceTags []string
WatchNamespace string
EnableWebhookServer bool
WebhookServerAddr string
DeletionPolicy ackv1alpha1.DeletionPolicy
ReconcileDefaultResyncSeconds int
ReconcileResourceResyncSeconds []string
MetricsAddr string
HealthzAddr string
EnableLeaderElection bool
LeaderElectionNamespace string
EnableDevelopmentLogging bool
AccountID string
Region string
IdentityEndpointURL string
EndpointURL string
AllowUnsafeEndpointURL bool
LogLevel string
ResourceTags []string
WatchNamespace string
EnableWebhookServer bool
WebhookServerAddr string
DeletionPolicy ackv1alpha1.DeletionPolicy
ReconcileDefaultResyncSeconds int
ReconcileResourceResyncSeconds []string
ReconcileDefaultMaxConcurrency int
ReconcileResourceMaxConcurrency []string
}

// BindFlags defines CLI/runtime configuration options
Expand Down Expand Up @@ -202,6 +206,19 @@ func (cfg *Config) BindFlags() {
" configuration maps resource kinds to drift remediation periods in seconds. If provided, "+
" resource-specific resync periods take precedence over the default period.",
)
flag.IntVar(
&cfg.ReconcileDefaultMaxConcurrency, flagReconcileDefaultMaxConcurrency,
1,
"The default maximum number of concurrent reconciles for a resource reconciler. This value is used if no "+
"resource-specific override has been specified. Default is 1.",
)
flag.StringArrayVar(
&cfg.ReconcileResourceMaxConcurrency, flagReconcileResourceMaxConcurrency,
[]string{},
"A Key/Value list of strings representing the reconcile max concurrency configuration for each resource. This"+
" configuration maps resource kinds to maximum number of concurrent reconciles. If provided, "+
" resource-specific max concurrency takes precedence over the default max concurrency.",
)
}

// SetupLogger initializes the logger used in the service controller
Expand All @@ -222,7 +239,6 @@ func (cfg *Config) SetupLogger() {
// SetAWSAccountID uses sts GetCallerIdentity API to find AWS AccountId and set
// in Config
func (cfg *Config) SetAWSAccountID() error {

awsCfg := aws.Config{}
if cfg.IdentityEndpointURL != "" {
awsCfg.Endpoint = aws.String(cfg.IdentityEndpointURL)
Expand Down Expand Up @@ -297,6 +313,9 @@ func (cfg *Config) Validate(options ...Option) error {
if cfg.ReconcileDefaultResyncSeconds < 0 {
return fmt.Errorf("invalid value for flag '%s': resync seconds default must be greater than 0", flagReconcileDefaultResyncSeconds)
}
if cfg.ReconcileDefaultMaxConcurrency < 1 {
return fmt.Errorf("invalid value for flag '%s': max concurrency default must be greater than 0", flagReconcileDefaultMaxConcurrency)
}
return nil
}

Expand All @@ -309,28 +328,45 @@ func (cfg *Config) checkUnsafeEndpoint(endpoint *url.URL) error {
return nil
}

// validateReconcileConfigResources validates the --reconcile-resource-resync-seconds flag
// by checking the resource names and their corresponding duration.
// validateReconcileConfigResources validates the --reconcile-resource-resync-seconds and
// --reconcile-resource-max-concurrent-syncs flags. It ensures that the resource names provided
// in the flags are valid and managed by the controller.
func (cfg *Config) validateReconcileConfigResources(supportedGVKs []schema.GroupVersionKind) error {
validResourceNames := []string{}
for _, gvk := range supportedGVKs {
validResourceNames = append(validResourceNames, gvk.Kind)
}
for _, resourceResyncSecondsFlag := range cfg.ReconcileResourceResyncSeconds {
resourceName, _, err := parseReconcileFlagArgument(resourceResyncSecondsFlag)
if err != nil {
return fmt.Errorf("error parsing flag argument '%v': %v. Expected format: resource=seconds", resourceResyncSecondsFlag, err)
for _, resourceFlagArgument := range cfg.ReconcileResourceResyncSeconds {
if err := validateReconcileConfigResource(validResourceNames, resourceFlagArgument); err != nil {
return fmt.Errorf("invalid value for flag '%s': %v", flagReconcileResourceResyncSeconds, err)
}
if !ackutil.InStrings(resourceName, validResourceNames) {
return fmt.Errorf(
"error parsing flag argument '%v': resource '%v' is not managed by this controller. Expected one of %v",
resourceResyncSecondsFlag, resourceName, strings.Join(validResourceNames, ", "),
)
}
for _, resourceFlagArgument := range cfg.ReconcileResourceMaxConcurrency {
if err := validateReconcileConfigResource(validResourceNames, resourceFlagArgument); err != nil {
return fmt.Errorf("invalid value for flag '%s': %v", flagReconcileResourceMaxConcurrency, err)
}
}
return nil
}

// validateReconcileConfigResource validates a single flag argument of any flag that is used to configure
// resource-specific reconcile settings. It ensures that the resource name is valid and managed by the
// controller, and that the value is a positive integer. If the flag argument is not in the expected format
// or has invalid elements, an error is returned.
func validateReconcileConfigResource(validResourceNames []string, resourceFlagArgument string) error {
resourceName, _, err := parseReconcileFlagArgument(resourceFlagArgument)
if err != nil {
return fmt.Errorf("error parsing flag argument '%v': %v. Expected format: string=number", resourceFlagArgument, err)
}
if !ackutil.InStrings(resourceName, validResourceNames) {
return fmt.Errorf(
"error parsing flag argument '%v': resource '%v' is not managed by this controller. Expected one of %v",
resourceFlagArgument, resourceName, strings.Join(validResourceNames, ", "),
)
}
return nil
}

// ParseReconcileResourceResyncSeconds parses the values of the --reconcile-resource-resync-seconds
// flag and returns a map that maps resource names to resync periods.
// The flag arguments are expected to have the format "resource=seconds", where "resource" is the
Expand All @@ -346,6 +382,20 @@ func (cfg *Config) ParseReconcileResourceResyncSeconds() (map[string]time.Durati
return resourceResyncPeriods, nil
}

// GetReconcileResourceMaxConcurrency returns the maximum number of concurrent reconciles for a
// given resource name. If the resource name is not found in the --reconcile-resource-max-concurrent-syncs
// flag, the function returns the default maximum concurrency value.
func (cfg *Config) GetReconcileResourceMaxConcurrency(resourceName string) int {
for _, resourceMaxConcurrencyFlag := range cfg.ReconcileResourceMaxConcurrency {
// Parse the resource name and max concurrency from the flag argument
name, maxConcurrency, _ := parseReconcileFlagArgument(resourceMaxConcurrencyFlag)
if strings.EqualFold(name, resourceName) {
return maxConcurrency
}
}
return cfg.ReconcileDefaultMaxConcurrency
}

// parseReconcileFlagArgument parses a flag argument of the form "key=value" into
// its individual elements. The key must be a non-empty string and the value must be
// a non-empty positive integer. If the flag argument is not in the expected format
Expand All @@ -365,14 +415,14 @@ func parseReconcileFlagArgument(flagArgument string) (string, int, error) {
return "", 0, fmt.Errorf("missing value in flag argument")
}

resyncSeconds, err := strconv.Atoi(elements[1])
value, err := strconv.Atoi(elements[1])
if err != nil {
return "", 0, fmt.Errorf("invalid value in flag argument: %v", err)
}
if resyncSeconds < 0 {
return "", 0, fmt.Errorf("invalid value in flag argument: expected non-negative integer, got %d", resyncSeconds)
if value <= 0 {
return "", 0, fmt.Errorf("invalid value in flag argument: value must be greater than 0")
}
return elements[0], resyncSeconds, nil
return elements[0], value, nil
}

// GetWatchNamespaces returns a slice of namespaces to watch for custom resource events.
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func TestParseReconcileFlagArgument(t *testing.T) {
{"=value", "", 0, true, "missing key in flag argument"},
{"key=value1=value2", "", 0, true, "invalid flag argument format: expected key=value"},
{"key=a", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"a\": invalid syntax"},
{"key=-1", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -1"},
{"key=-123456", "", 0, true, "invalid value in flag argument: expected non-negative integer, got -123456"},
{"key=-1", "", 0, true, "invalid value in flag argument: value must be greater than 0"},
{"key=-123456", "", 0, true, "invalid value in flag argument: value must be greater than 0"},
{"key=1.1", "", 0, true, "invalid value in flag argument: strconv.Atoi: parsing \"1.1\": invalid syntax"},
}
for _, test := range tests {
Expand Down
6 changes: 6 additions & 0 deletions pkg/runtime/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
ctrlrt "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlrtcontroller "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"

ackv1alpha1 "github.com/aws-controllers-k8s/runtime/apis/core/v1alpha1"
Expand Down Expand Up @@ -99,12 +100,17 @@ func (r *resourceReconciler) BindControllerManager(mgr ctrlrt.Manager) error {
r.kc = mgr.GetClient()
r.apiReader = mgr.GetAPIReader()
rd := r.rmf.ResourceDescriptor()
maxConcurrentReconciles := r.cfg.GetReconcileResourceMaxConcurrency(rd.GroupVersionKind().Kind)
return ctrlrt.NewControllerManagedBy(
mgr,
).For(
rd.EmptyRuntimeObject(),
).WithEventFilter(
predicate.GenerationChangedPredicate{},
).WithOptions(
ctrlrtcontroller.Options{
MaxConcurrentReconciles: maxConcurrentReconciles,
},
).Complete(r)
}

Expand Down

0 comments on commit 20e1c0d

Please sign in to comment.