Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QPS config #127

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deploy/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ spec:
- "--enable-lease=true"
- "--log-level=2"
- "--v=0"
- "--client-max-qps=35"
- "--client-burst=50"
imagePullPolicy: Always
env:
- name: WATCH_NAMESPACE
Expand Down
2 changes: 2 additions & 0 deletions deploy/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ spec:
- --enable-lease=true
- --log-level=2
- --v=0
- --client-max-qps=35
- --client-burst=50
command:
- config-policy-controller
env:
Expand Down
262 changes: 181 additions & 81 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,61 +67,54 @@ func init() {
utilruntime.Must(extensionsv1beta1.AddToScheme(scheme))
}

type ctrlOpts struct {
clusterName string
hubConfigPath string
targetKubeConfig string
metricsAddr string
probeAddr string
clientQPS float32
clientBurst uint
frequency uint
decryptionConcurrency uint8
evaluationConcurrency uint8
enableLease bool
enableLeaderElection bool
legacyLeaderElection bool
enableMetrics bool
}

func main() {
klog.InitFlags(nil)

subcommand := ""
if len(os.Args) >= 2 {
subcommand = os.Args[1]
}

switch subcommand {
case "controller":
break // normal mode - just continue execution
case "trigger-uninstall":
handleTriggerUninstall()

return
default:
fmt.Fprintln(os.Stderr, "expected 'controller' or 'trigger-uninstall' subcommands")
os.Exit(1)
}

zflags := zaputil.FlagConfig{
LevelName: "log-level",
EncoderName: "log-encoder",
}

zflags.Bind(flag.CommandLine)

controllerFlagSet := pflag.NewFlagSet("controller", pflag.ExitOnError)

var clusterName, hubConfigPath, targetKubeConfig, metricsAddr, probeAddr string
var frequency uint
var decryptionConcurrency, evaluationConcurrency uint8
var enableLease, enableLeaderElection, legacyLeaderElection, enableMetrics bool

controllerFlagSet.UintVar(&frequency, "update-frequency", 10,
"The status update frequency (in seconds) of a mutation policy")
controllerFlagSet.BoolVar(&enableLease, "enable-lease", false,
"If enabled, the controller will start the lease controller to report its status")
controllerFlagSet.StringVar(&clusterName, "cluster-name", "acm-managed-cluster", "Name of the cluster")
controllerFlagSet.StringVar(&hubConfigPath, "hub-kubeconfig-path", "/var/run/klusterlet/kubeconfig",
"Path to the hub kubeconfig")
controllerFlagSet.StringVar(
&targetKubeConfig,
"target-kubeconfig-path",
"",
"A path to an alternative kubeconfig for policy evaluation and enforcement.",
)
controllerFlagSet.StringVar(
&metricsAddr, "metrics-bind-address", "localhost:8383", "The address the metrics endpoint binds to.",
)
controllerFlagSet.StringVar(
&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.",
)
controllerFlagSet.BoolVar(&enableLeaderElection, "leader-elect", true,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
controllerFlagSet.BoolVar(&legacyLeaderElection, "legacy-leader-elect", false,
"Use a legacy leader election method for controller manager instead of the lease API.")
controllerFlagSet.Uint8Var(
&decryptionConcurrency,
"decryption-concurrency",
5,
"The max number of concurrent policy template decryptions",
)
controllerFlagSet.Uint8Var(
&evaluationConcurrency,
"evaluation-concurrency",
// Set a low default to not add too much load to the Kubernetes API server in resource constrained deployments.
2,
"The max number of concurrent configuration policy evaluations",
)
controllerFlagSet.BoolVar(&enableMetrics, "enable-metrics", true, "Disable custom metrics collection")
zflags.Bind(flag.CommandLine)
controllerFlagSet.AddGoFlagSet(flag.CommandLine)

opts := parseOpts(controllerFlagSet, os.Args[2:])

ctrlZap, err := zflags.BuildForCtrl()
if err != nil {
Expand All @@ -145,25 +138,7 @@ func main() {
klog.SetLogger(zapr.NewLogger(klogZap).WithName("klog"))
}

subcommand := ""
if len(os.Args) >= 2 {
subcommand = os.Args[1]
}

switch subcommand {
case "controller":
controllerFlagSet.AddGoFlagSet(flag.CommandLine)
_ = controllerFlagSet.Parse(os.Args[2:])
case "trigger-uninstall":
handleTriggerUninstall()

return
default:
fmt.Fprintln(os.Stderr, "expected 'controller' or 'trigger-uninstall' subcommands")
os.Exit(1)
}

if evaluationConcurrency < 1 {
if opts.evaluationConcurrency < 1 {
panic("The --evaluation-concurrency option cannot be less than 1")
}

Expand All @@ -176,6 +151,9 @@ func main() {
os.Exit(1)
}

cfg.Burst = int(opts.clientBurst)
cfg.QPS = opts.clientQPS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where cfg.QPS used?

Copy link
Member

@mprahl mprahl Apr 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cfg gets assigned to targetK8sConfig which then gets used to create Kubernetes API client instances for ConfigurationPolicyReconciler. It's also used by the controller-runtime manager to create a Kubernetes API client.


// Set a field selector so that a watch on CRDs will be limited to just the configuration policy CRD.
cacheSelectors := cache.SelectorsByObject{
&extensionsv1.CustomResourceDefinition{}: {
Expand Down Expand Up @@ -229,11 +207,11 @@ func main() {

// Set default manager options
options := manager.Options{
MetricsBindAddress: metricsAddr,
MetricsBindAddress: opts.metricsAddr,
Scheme: scheme,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
HealthProbeBindAddress: opts.probeAddr,
LeaderElection: opts.enableLeaderElection,
LeaderElectionID: "config-policy-controller.open-cluster-management.io",
NewCache: cache.BuilderWithOptions(cache.Options{SelectorsByObject: cacheSelectors}),
// Disable the cache for Secrets to avoid a watch getting created when the `policy-encryption-key`
Expand Down Expand Up @@ -267,7 +245,7 @@ func main() {
),
}

if legacyLeaderElection {
if opts.legacyLeaderElection {
// If legacyLeaderElection is enabled, then that means the lease API is not available.
// In this case, use the legacy leader election method of a ConfigMap.
log.Info("Using the legacy leader election of configmaps")
Expand All @@ -290,40 +268,41 @@ func main() {
var targetK8sDynamicClient dynamic.Interface
var targetK8sConfig *rest.Config

if targetKubeConfig == "" {
if opts.targetKubeConfig == "" {
targetK8sConfig = cfg
targetK8sClient = kubernetes.NewForConfigOrDie(targetK8sConfig)
targetK8sDynamicClient = dynamic.NewForConfigOrDie(targetK8sConfig)
} else {
var err error

targetK8sConfig, err = clientcmd.BuildConfigFromFlags("", targetKubeConfig)
targetK8sConfig, err = clientcmd.BuildConfigFromFlags("", opts.targetKubeConfig)
if err != nil {
log.Error(err, "Failed to load the target kubeconfig", "path", targetKubeConfig)
log.Error(err, "Failed to load the target kubeconfig", "path", opts.targetKubeConfig)
os.Exit(1)
}

targetK8sClient = kubernetes.NewForConfigOrDie(targetK8sConfig)
targetK8sDynamicClient = dynamic.NewForConfigOrDie(targetK8sConfig)

log.Info(
"Overrode the target Kubernetes cluster for policy evaluation and enforcement", "path", targetKubeConfig,
"Overrode the target Kubernetes cluster for policy evaluation and enforcement",
"path", opts.targetKubeConfig,
)
}

instanceName, _ := os.Hostname() // on an error, instanceName will be empty, which is ok

reconciler := controllers.ConfigurationPolicyReconciler{
Client: mgr.GetClient(),
DecryptionConcurrency: decryptionConcurrency,
EvaluationConcurrency: evaluationConcurrency,
DecryptionConcurrency: opts.decryptionConcurrency,
EvaluationConcurrency: opts.evaluationConcurrency,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor(controllers.ControllerName),
InstanceName: instanceName,
TargetK8sClient: targetK8sClient,
TargetK8sDynamicClient: targetK8sDynamicClient,
TargetK8sConfig: targetK8sConfig,
EnableMetrics: enableMetrics,
EnableMetrics: opts.enableMetrics,
}
if err = reconciler.SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create controller", "controller", "ConfigurationPolicy")
Expand All @@ -345,17 +324,17 @@ func main() {
managerCtx, managerCancel := context.WithCancel(context.Background())

// PeriodicallyExecConfigPolicies is the go-routine that periodically checks the policies
log.V(1).Info("Perodically processing Configuration Policies", "frequency", frequency)
log.V(1).Info("Perodically processing Configuration Policies", "frequency", opts.frequency)

go func() {
reconciler.PeriodicallyExecConfigPolicies(terminatingCtx, frequency, mgr.Elected())
reconciler.PeriodicallyExecConfigPolicies(terminatingCtx, opts.frequency, mgr.Elected())
managerCancel()
}()

// This lease is not related to leader election. This is to report the status of the controller
// to the addon framework. This can be seen in the "status" section of the ManagedClusterAddOn
// resource objects.
if enableLease {
if opts.enableLease {
operatorNs, err := common.GetOperatorNamespace()
if err != nil {
if errors.Is(err, common.ErrNoNamespace) || errors.Is(err, common.ErrRunLocal) {
Expand All @@ -373,11 +352,11 @@ func main() {
kubernetes.NewForConfigOrDie(cfg), "config-policy-controller", operatorNs,
)

hubCfg, err := clientcmd.BuildConfigFromFlags("", hubConfigPath)
hubCfg, err := clientcmd.BuildConfigFromFlags("", opts.hubConfigPath)
if err != nil {
log.Error(err, "Could not load hub config, lease updater not set with config")
} else {
leaseUpdater = leaseUpdater.WithHubLeaseConfig(hubCfg, clusterName)
leaseUpdater = leaseUpdater.WithHubLeaseConfig(hubCfg, opts.clusterName)
}

go leaseUpdater.Start(context.TODO())
Expand Down Expand Up @@ -437,7 +416,7 @@ func handleTriggerUninstall() {
// Get a config to talk to the apiserver
cfg, err := config.GetConfig()
if err != nil {
log.Error(err, "Failed to get config")
klog.Errorf("Failed to get config: %s", err)
os.Exit(1)
}

Expand All @@ -447,3 +426,124 @@ func handleTriggerUninstall() {
os.Exit(1)
}
}

func parseOpts(flags *pflag.FlagSet, args []string) *ctrlOpts {
opts := &ctrlOpts{}

flags.UintVar(
&opts.frequency,
"update-frequency",
10,
"The status update frequency (in seconds) of a mutation policy",
)

flags.BoolVar(
&opts.enableLease,
"enable-lease",
false,
"If enabled, the controller will start the lease controller to report its status",
)

flags.StringVar(
&opts.clusterName,
"cluster-name",
"acm-managed-cluster",
"Name of the cluster",
)

flags.StringVar(
&opts.hubConfigPath,
"hub-kubeconfig-path",
"/var/run/klusterlet/kubeconfig",
"Path to the hub kubeconfig",
)

flags.StringVar(
&opts.targetKubeConfig,
"target-kubeconfig-path",
"",
"A path to an alternative kubeconfig for policy evaluation and enforcement.",
)

flags.StringVar(
&opts.metricsAddr,
"metrics-bind-address",
"localhost:8383",
"The address the metrics endpoint binds to.",
)

flags.StringVar(
&opts.probeAddr,
"health-probe-bind-address",
":8081",
"The address the probe endpoint binds to.",
)

flags.BoolVar(
&opts.enableLeaderElection,
"leader-elect",
true,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.",
)

flags.BoolVar(
&opts.legacyLeaderElection,
"legacy-leader-elect",
false,
"Use a legacy leader election method for controller manager instead of the lease API.",
)

flags.Uint8Var(
&opts.decryptionConcurrency,
"decryption-concurrency",
5,
"The max number of concurrent policy template decryptions",
)

flags.Uint8Var(
&opts.evaluationConcurrency,
"evaluation-concurrency",
// Set a low default to not add too much load to the Kubernetes API server in resource constrained deployments.
2,
"The max number of concurrent configuration policy evaluations",
)

flags.BoolVar(
&opts.enableMetrics,
"enable-metrics",
true,
"Disable custom metrics collection",
)

flags.Float32Var(
&opts.clientQPS,
"client-max-qps",
30, // 15 * concurrency is recommended
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that if the QPS is not explicitly set, then we can do 15 * evaluationConcurrency rather than rely on the user to know they should also adjust QPS.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning on doing that kind of configuration in the addon-controller, but I can do it here.

"The max queries per second that will be made against the kubernetes API server. "+
"Will scale with concurrency, if not explicitly set.",
)

flags.UintVar(
&opts.clientBurst,
"client-burst",
45, // the controller-runtime defaults are 20:30 (qps:burst) - this matches that ratio
"The maximum burst before client requests will be throttled. "+
"Will scale with concurrency, if not explicitly set.",
)

_ = flags.Parse(args)

// Scale QPS and Burst with concurrency, when they aren't explicitly set.
if flags.Changed("evaluation-concurrency") {
if !flags.Changed("client-max-qps") {
opts.clientQPS = float32(opts.evaluationConcurrency) * 15
}

if !flags.Changed("client-burst") {
opts.clientBurst = uint(opts.evaluationConcurrency)*22 + 1
}
}

return opts
}