-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
QPS config #127
QPS config #127
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,61 +67,54 @@ func init() { | |
utilruntime.Must(extensionsv1beta1.AddToScheme(scheme)) | ||
} | ||
|
||
type ctrlOpts struct { | ||
clusterName string | ||
hubConfigPath string | ||
targetKubeConfig string | ||
metricsAddr string | ||
probeAddr string | ||
clientQPS float32 | ||
clientBurst uint | ||
frequency uint | ||
decryptionConcurrency uint8 | ||
evaluationConcurrency uint8 | ||
enableLease bool | ||
enableLeaderElection bool | ||
legacyLeaderElection bool | ||
enableMetrics bool | ||
} | ||
|
||
func main() { | ||
klog.InitFlags(nil) | ||
|
||
subcommand := "" | ||
if len(os.Args) >= 2 { | ||
subcommand = os.Args[1] | ||
} | ||
|
||
switch subcommand { | ||
case "controller": | ||
break // normal mode - just continue execution | ||
case "trigger-uninstall": | ||
handleTriggerUninstall() | ||
|
||
return | ||
default: | ||
fmt.Fprintln(os.Stderr, "expected 'controller' or 'trigger-uninstall' subcommands") | ||
os.Exit(1) | ||
} | ||
|
||
zflags := zaputil.FlagConfig{ | ||
LevelName: "log-level", | ||
EncoderName: "log-encoder", | ||
} | ||
|
||
zflags.Bind(flag.CommandLine) | ||
|
||
controllerFlagSet := pflag.NewFlagSet("controller", pflag.ExitOnError) | ||
|
||
var clusterName, hubConfigPath, targetKubeConfig, metricsAddr, probeAddr string | ||
var frequency uint | ||
var decryptionConcurrency, evaluationConcurrency uint8 | ||
var enableLease, enableLeaderElection, legacyLeaderElection, enableMetrics bool | ||
|
||
controllerFlagSet.UintVar(&frequency, "update-frequency", 10, | ||
"The status update frequency (in seconds) of a mutation policy") | ||
controllerFlagSet.BoolVar(&enableLease, "enable-lease", false, | ||
"If enabled, the controller will start the lease controller to report its status") | ||
controllerFlagSet.StringVar(&clusterName, "cluster-name", "acm-managed-cluster", "Name of the cluster") | ||
controllerFlagSet.StringVar(&hubConfigPath, "hub-kubeconfig-path", "/var/run/klusterlet/kubeconfig", | ||
"Path to the hub kubeconfig") | ||
controllerFlagSet.StringVar( | ||
&targetKubeConfig, | ||
"target-kubeconfig-path", | ||
"", | ||
"A path to an alternative kubeconfig for policy evaluation and enforcement.", | ||
) | ||
controllerFlagSet.StringVar( | ||
&metricsAddr, "metrics-bind-address", "localhost:8383", "The address the metrics endpoint binds to.", | ||
) | ||
controllerFlagSet.StringVar( | ||
&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.", | ||
) | ||
controllerFlagSet.BoolVar(&enableLeaderElection, "leader-elect", true, | ||
"Enable leader election for controller manager. "+ | ||
"Enabling this will ensure there is only one active controller manager.") | ||
controllerFlagSet.BoolVar(&legacyLeaderElection, "legacy-leader-elect", false, | ||
"Use a legacy leader election method for controller manager instead of the lease API.") | ||
controllerFlagSet.Uint8Var( | ||
&decryptionConcurrency, | ||
"decryption-concurrency", | ||
5, | ||
"The max number of concurrent policy template decryptions", | ||
) | ||
controllerFlagSet.Uint8Var( | ||
&evaluationConcurrency, | ||
"evaluation-concurrency", | ||
// Set a low default to not add too much load to the Kubernetes API server in resource constrained deployments. | ||
2, | ||
"The max number of concurrent configuration policy evaluations", | ||
) | ||
controllerFlagSet.BoolVar(&enableMetrics, "enable-metrics", true, "Disable custom metrics collection") | ||
zflags.Bind(flag.CommandLine) | ||
controllerFlagSet.AddGoFlagSet(flag.CommandLine) | ||
|
||
opts := parseOpts(controllerFlagSet, os.Args[2:]) | ||
|
||
ctrlZap, err := zflags.BuildForCtrl() | ||
if err != nil { | ||
|
@@ -145,25 +138,7 @@ func main() { | |
klog.SetLogger(zapr.NewLogger(klogZap).WithName("klog")) | ||
} | ||
|
||
subcommand := "" | ||
if len(os.Args) >= 2 { | ||
subcommand = os.Args[1] | ||
} | ||
|
||
switch subcommand { | ||
case "controller": | ||
controllerFlagSet.AddGoFlagSet(flag.CommandLine) | ||
_ = controllerFlagSet.Parse(os.Args[2:]) | ||
case "trigger-uninstall": | ||
handleTriggerUninstall() | ||
|
||
return | ||
default: | ||
fmt.Fprintln(os.Stderr, "expected 'controller' or 'trigger-uninstall' subcommands") | ||
os.Exit(1) | ||
} | ||
|
||
if evaluationConcurrency < 1 { | ||
if opts.evaluationConcurrency < 1 { | ||
panic("The --evaluation-concurrency option cannot be less than 1") | ||
} | ||
|
||
|
@@ -176,6 +151,9 @@ func main() { | |
os.Exit(1) | ||
} | ||
|
||
cfg.Burst = int(opts.clientBurst) | ||
cfg.QPS = opts.clientQPS | ||
|
||
// Set a field selector so that a watch on CRDs will be limited to just the configuration policy CRD. | ||
cacheSelectors := cache.SelectorsByObject{ | ||
&extensionsv1.CustomResourceDefinition{}: { | ||
|
@@ -229,11 +207,11 @@ func main() { | |
|
||
// Set default manager options | ||
options := manager.Options{ | ||
MetricsBindAddress: metricsAddr, | ||
MetricsBindAddress: opts.metricsAddr, | ||
Scheme: scheme, | ||
Port: 9443, | ||
HealthProbeBindAddress: probeAddr, | ||
LeaderElection: enableLeaderElection, | ||
HealthProbeBindAddress: opts.probeAddr, | ||
LeaderElection: opts.enableLeaderElection, | ||
LeaderElectionID: "config-policy-controller.open-cluster-management.io", | ||
NewCache: cache.BuilderWithOptions(cache.Options{SelectorsByObject: cacheSelectors}), | ||
// Disable the cache for Secrets to avoid a watch getting created when the `policy-encryption-key` | ||
|
@@ -267,7 +245,7 @@ func main() { | |
), | ||
} | ||
|
||
if legacyLeaderElection { | ||
if opts.legacyLeaderElection { | ||
// If legacyLeaderElection is enabled, then that means the lease API is not available. | ||
// In this case, use the legacy leader election method of a ConfigMap. | ||
log.Info("Using the legacy leader election of configmaps") | ||
|
@@ -290,40 +268,41 @@ func main() { | |
var targetK8sDynamicClient dynamic.Interface | ||
var targetK8sConfig *rest.Config | ||
|
||
if targetKubeConfig == "" { | ||
if opts.targetKubeConfig == "" { | ||
targetK8sConfig = cfg | ||
targetK8sClient = kubernetes.NewForConfigOrDie(targetK8sConfig) | ||
targetK8sDynamicClient = dynamic.NewForConfigOrDie(targetK8sConfig) | ||
} else { | ||
var err error | ||
|
||
targetK8sConfig, err = clientcmd.BuildConfigFromFlags("", targetKubeConfig) | ||
targetK8sConfig, err = clientcmd.BuildConfigFromFlags("", opts.targetKubeConfig) | ||
if err != nil { | ||
log.Error(err, "Failed to load the target kubeconfig", "path", targetKubeConfig) | ||
log.Error(err, "Failed to load the target kubeconfig", "path", opts.targetKubeConfig) | ||
os.Exit(1) | ||
} | ||
|
||
targetK8sClient = kubernetes.NewForConfigOrDie(targetK8sConfig) | ||
targetK8sDynamicClient = dynamic.NewForConfigOrDie(targetK8sConfig) | ||
|
||
log.Info( | ||
"Overrode the target Kubernetes cluster for policy evaluation and enforcement", "path", targetKubeConfig, | ||
"Overrode the target Kubernetes cluster for policy evaluation and enforcement", | ||
"path", opts.targetKubeConfig, | ||
) | ||
} | ||
|
||
instanceName, _ := os.Hostname() // on an error, instanceName will be empty, which is ok | ||
|
||
reconciler := controllers.ConfigurationPolicyReconciler{ | ||
Client: mgr.GetClient(), | ||
DecryptionConcurrency: decryptionConcurrency, | ||
EvaluationConcurrency: evaluationConcurrency, | ||
DecryptionConcurrency: opts.decryptionConcurrency, | ||
EvaluationConcurrency: opts.evaluationConcurrency, | ||
Scheme: mgr.GetScheme(), | ||
Recorder: mgr.GetEventRecorderFor(controllers.ControllerName), | ||
InstanceName: instanceName, | ||
TargetK8sClient: targetK8sClient, | ||
TargetK8sDynamicClient: targetK8sDynamicClient, | ||
TargetK8sConfig: targetK8sConfig, | ||
EnableMetrics: enableMetrics, | ||
EnableMetrics: opts.enableMetrics, | ||
} | ||
if err = reconciler.SetupWithManager(mgr); err != nil { | ||
log.Error(err, "Unable to create controller", "controller", "ConfigurationPolicy") | ||
|
@@ -345,17 +324,17 @@ func main() { | |
managerCtx, managerCancel := context.WithCancel(context.Background()) | ||
|
||
// PeriodicallyExecConfigPolicies is the go-routine that periodically checks the policies | ||
log.V(1).Info("Perodically processing Configuration Policies", "frequency", frequency) | ||
log.V(1).Info("Perodically processing Configuration Policies", "frequency", opts.frequency) | ||
|
||
go func() { | ||
reconciler.PeriodicallyExecConfigPolicies(terminatingCtx, frequency, mgr.Elected()) | ||
reconciler.PeriodicallyExecConfigPolicies(terminatingCtx, opts.frequency, mgr.Elected()) | ||
managerCancel() | ||
}() | ||
|
||
// This lease is not related to leader election. This is to report the status of the controller | ||
// to the addon framework. This can be seen in the "status" section of the ManagedClusterAddOn | ||
// resource objects. | ||
if enableLease { | ||
if opts.enableLease { | ||
operatorNs, err := common.GetOperatorNamespace() | ||
if err != nil { | ||
if errors.Is(err, common.ErrNoNamespace) || errors.Is(err, common.ErrRunLocal) { | ||
|
@@ -373,11 +352,11 @@ func main() { | |
kubernetes.NewForConfigOrDie(cfg), "config-policy-controller", operatorNs, | ||
) | ||
|
||
hubCfg, err := clientcmd.BuildConfigFromFlags("", hubConfigPath) | ||
hubCfg, err := clientcmd.BuildConfigFromFlags("", opts.hubConfigPath) | ||
if err != nil { | ||
log.Error(err, "Could not load hub config, lease updater not set with config") | ||
} else { | ||
leaseUpdater = leaseUpdater.WithHubLeaseConfig(hubCfg, clusterName) | ||
leaseUpdater = leaseUpdater.WithHubLeaseConfig(hubCfg, opts.clusterName) | ||
} | ||
|
||
go leaseUpdater.Start(context.TODO()) | ||
|
@@ -437,7 +416,7 @@ func handleTriggerUninstall() { | |
// Get a config to talk to the apiserver | ||
cfg, err := config.GetConfig() | ||
if err != nil { | ||
log.Error(err, "Failed to get config") | ||
klog.Errorf("Failed to get config: %s", err) | ||
os.Exit(1) | ||
} | ||
|
||
|
@@ -447,3 +426,124 @@ func handleTriggerUninstall() { | |
os.Exit(1) | ||
} | ||
} | ||
|
||
func parseOpts(flags *pflag.FlagSet, args []string) *ctrlOpts { | ||
opts := &ctrlOpts{} | ||
|
||
flags.UintVar( | ||
&opts.frequency, | ||
"update-frequency", | ||
10, | ||
"The status update frequency (in seconds) of a mutation policy", | ||
) | ||
|
||
flags.BoolVar( | ||
&opts.enableLease, | ||
"enable-lease", | ||
false, | ||
"If enabled, the controller will start the lease controller to report its status", | ||
) | ||
|
||
flags.StringVar( | ||
&opts.clusterName, | ||
"cluster-name", | ||
"acm-managed-cluster", | ||
"Name of the cluster", | ||
) | ||
|
||
flags.StringVar( | ||
&opts.hubConfigPath, | ||
"hub-kubeconfig-path", | ||
"/var/run/klusterlet/kubeconfig", | ||
"Path to the hub kubeconfig", | ||
) | ||
|
||
flags.StringVar( | ||
&opts.targetKubeConfig, | ||
"target-kubeconfig-path", | ||
"", | ||
"A path to an alternative kubeconfig for policy evaluation and enforcement.", | ||
) | ||
|
||
flags.StringVar( | ||
&opts.metricsAddr, | ||
"metrics-bind-address", | ||
"localhost:8383", | ||
"The address the metrics endpoint binds to.", | ||
) | ||
|
||
flags.StringVar( | ||
&opts.probeAddr, | ||
"health-probe-bind-address", | ||
":8081", | ||
"The address the probe endpoint binds to.", | ||
) | ||
|
||
flags.BoolVar( | ||
&opts.enableLeaderElection, | ||
"leader-elect", | ||
true, | ||
"Enable leader election for controller manager. "+ | ||
"Enabling this will ensure there is only one active controller manager.", | ||
) | ||
|
||
flags.BoolVar( | ||
&opts.legacyLeaderElection, | ||
"legacy-leader-elect", | ||
false, | ||
"Use a legacy leader election method for controller manager instead of the lease API.", | ||
) | ||
|
||
flags.Uint8Var( | ||
&opts.decryptionConcurrency, | ||
"decryption-concurrency", | ||
5, | ||
"The max number of concurrent policy template decryptions", | ||
) | ||
|
||
flags.Uint8Var( | ||
&opts.evaluationConcurrency, | ||
"evaluation-concurrency", | ||
// Set a low default to not add too much load to the Kubernetes API server in resource constrained deployments. | ||
2, | ||
"The max number of concurrent configuration policy evaluations", | ||
) | ||
|
||
flags.BoolVar( | ||
&opts.enableMetrics, | ||
"enable-metrics", | ||
true, | ||
"Disable custom metrics collection", | ||
) | ||
|
||
flags.Float32Var( | ||
&opts.clientQPS, | ||
"client-max-qps", | ||
30, // 15 * concurrency is recommended | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking that if the QPS is not explicitly set, then we can do 15 * evaluationConcurrency rather than rely on the user to know they should also adjust QPS. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was planning on doing that kind of configuration in the addon-controller, but I can do it here. |
||
"The max queries per second that will be made against the kubernetes API server. "+ | ||
"Will scale with concurrency, if not explicitly set.", | ||
) | ||
|
||
flags.UintVar( | ||
&opts.clientBurst, | ||
"client-burst", | ||
45, // the controller-runtime defaults are 20:30 (qps:burst) - this matches that ratio | ||
"The maximum burst before client requests will be throttled. "+ | ||
"Will scale with concurrency, if not explicitly set.", | ||
) | ||
|
||
_ = flags.Parse(args) | ||
|
||
// Scale QPS and Burst with concurrency, when they aren't explicitly set. | ||
if flags.Changed("evaluation-concurrency") { | ||
if !flags.Changed("client-max-qps") { | ||
opts.clientQPS = float32(opts.evaluationConcurrency) * 15 | ||
} | ||
|
||
if !flags.Changed("client-burst") { | ||
opts.clientBurst = uint(opts.evaluationConcurrency)*22 + 1 | ||
} | ||
} | ||
|
||
return opts | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where cfg.QPS used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cfg
gets assigned totargetK8sConfig
which then gets used to create Kubernetes API client instances forConfigurationPolicyReconciler
. It's also used by the controller-runtime manager to create a Kubernetes API client.