Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🤖 Sync from open-cluster-management-io/config-policy-controller: #127 #482

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deploy/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ spec:
- "--enable-lease=true"
- "--log-level=2"
- "--v=0"
- "--client-max-qps=35"
- "--client-burst=50"
imagePullPolicy: Always
env:
- name: WATCH_NAMESPACE
Expand Down
2 changes: 2 additions & 0 deletions deploy/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ spec:
- --enable-lease=true
- --log-level=2
- --v=0
- --client-max-qps=35
- --client-burst=50
command:
- config-policy-controller
env:
Expand Down
262 changes: 181 additions & 81 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,61 +67,54 @@ func init() {
utilruntime.Must(extensionsv1beta1.AddToScheme(scheme))
}

type ctrlOpts struct {
clusterName string
hubConfigPath string
targetKubeConfig string
metricsAddr string
probeAddr string
clientQPS float32
clientBurst uint
frequency uint
decryptionConcurrency uint8
evaluationConcurrency uint8
enableLease bool
enableLeaderElection bool
legacyLeaderElection bool
enableMetrics bool
}

func main() {
klog.InitFlags(nil)

subcommand := ""
if len(os.Args) >= 2 {
subcommand = os.Args[1]
}

switch subcommand {
case "controller":
break // normal mode - just continue execution
case "trigger-uninstall":
handleTriggerUninstall()

return
default:
fmt.Fprintln(os.Stderr, "expected 'controller' or 'trigger-uninstall' subcommands")
os.Exit(1)
}

zflags := zaputil.FlagConfig{
LevelName: "log-level",
EncoderName: "log-encoder",
}

zflags.Bind(flag.CommandLine)

controllerFlagSet := pflag.NewFlagSet("controller", pflag.ExitOnError)

var clusterName, hubConfigPath, targetKubeConfig, metricsAddr, probeAddr string
var frequency uint
var decryptionConcurrency, evaluationConcurrency uint8
var enableLease, enableLeaderElection, legacyLeaderElection, enableMetrics bool

controllerFlagSet.UintVar(&frequency, "update-frequency", 10,
"The status update frequency (in seconds) of a mutation policy")
controllerFlagSet.BoolVar(&enableLease, "enable-lease", false,
"If enabled, the controller will start the lease controller to report its status")
controllerFlagSet.StringVar(&clusterName, "cluster-name", "acm-managed-cluster", "Name of the cluster")
controllerFlagSet.StringVar(&hubConfigPath, "hub-kubeconfig-path", "/var/run/klusterlet/kubeconfig",
"Path to the hub kubeconfig")
controllerFlagSet.StringVar(
&targetKubeConfig,
"target-kubeconfig-path",
"",
"A path to an alternative kubeconfig for policy evaluation and enforcement.",
)
controllerFlagSet.StringVar(
&metricsAddr, "metrics-bind-address", "localhost:8383", "The address the metrics endpoint binds to.",
)
controllerFlagSet.StringVar(
&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.",
)
controllerFlagSet.BoolVar(&enableLeaderElection, "leader-elect", true,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.")
controllerFlagSet.BoolVar(&legacyLeaderElection, "legacy-leader-elect", false,
"Use a legacy leader election method for controller manager instead of the lease API.")
controllerFlagSet.Uint8Var(
&decryptionConcurrency,
"decryption-concurrency",
5,
"The max number of concurrent policy template decryptions",
)
controllerFlagSet.Uint8Var(
&evaluationConcurrency,
"evaluation-concurrency",
// Set a low default to not add too much load to the Kubernetes API server in resource constrained deployments.
2,
"The max number of concurrent configuration policy evaluations",
)
controllerFlagSet.BoolVar(&enableMetrics, "enable-metrics", true, "Disable custom metrics collection")
zflags.Bind(flag.CommandLine)
controllerFlagSet.AddGoFlagSet(flag.CommandLine)

opts := parseOpts(controllerFlagSet, os.Args[2:])

ctrlZap, err := zflags.BuildForCtrl()
if err != nil {
Expand All @@ -145,25 +138,7 @@ func main() {
klog.SetLogger(zapr.NewLogger(klogZap).WithName("klog"))
}

subcommand := ""
if len(os.Args) >= 2 {
subcommand = os.Args[1]
}

switch subcommand {
case "controller":
controllerFlagSet.AddGoFlagSet(flag.CommandLine)
_ = controllerFlagSet.Parse(os.Args[2:])
case "trigger-uninstall":
handleTriggerUninstall()

return
default:
fmt.Fprintln(os.Stderr, "expected 'controller' or 'trigger-uninstall' subcommands")
os.Exit(1)
}

if evaluationConcurrency < 1 {
if opts.evaluationConcurrency < 1 {
panic("The --evaluation-concurrency option cannot be less than 1")
}

Expand All @@ -176,6 +151,9 @@ func main() {
os.Exit(1)
}

cfg.Burst = int(opts.clientBurst)
cfg.QPS = opts.clientQPS

// Set a field selector so that a watch on CRDs will be limited to just the configuration policy CRD.
cacheSelectors := cache.SelectorsByObject{
&extensionsv1.CustomResourceDefinition{}: {
Expand Down Expand Up @@ -229,11 +207,11 @@ func main() {

// Set default manager options
options := manager.Options{
MetricsBindAddress: metricsAddr,
MetricsBindAddress: opts.metricsAddr,
Scheme: scheme,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
HealthProbeBindAddress: opts.probeAddr,
LeaderElection: opts.enableLeaderElection,
LeaderElectionID: "config-policy-controller.open-cluster-management.io",
NewCache: cache.BuilderWithOptions(cache.Options{SelectorsByObject: cacheSelectors}),
// Disable the cache for Secrets to avoid a watch getting created when the `policy-encryption-key`
Expand Down Expand Up @@ -267,7 +245,7 @@ func main() {
),
}

if legacyLeaderElection {
if opts.legacyLeaderElection {
// If legacyLeaderElection is enabled, then that means the lease API is not available.
// In this case, use the legacy leader election method of a ConfigMap.
log.Info("Using the legacy leader election of configmaps")
Expand All @@ -290,40 +268,41 @@ func main() {
var targetK8sDynamicClient dynamic.Interface
var targetK8sConfig *rest.Config

if targetKubeConfig == "" {
if opts.targetKubeConfig == "" {
targetK8sConfig = cfg
targetK8sClient = kubernetes.NewForConfigOrDie(targetK8sConfig)
targetK8sDynamicClient = dynamic.NewForConfigOrDie(targetK8sConfig)
} else {
var err error

targetK8sConfig, err = clientcmd.BuildConfigFromFlags("", targetKubeConfig)
targetK8sConfig, err = clientcmd.BuildConfigFromFlags("", opts.targetKubeConfig)
if err != nil {
log.Error(err, "Failed to load the target kubeconfig", "path", targetKubeConfig)
log.Error(err, "Failed to load the target kubeconfig", "path", opts.targetKubeConfig)
os.Exit(1)
}

targetK8sClient = kubernetes.NewForConfigOrDie(targetK8sConfig)
targetK8sDynamicClient = dynamic.NewForConfigOrDie(targetK8sConfig)

log.Info(
"Overrode the target Kubernetes cluster for policy evaluation and enforcement", "path", targetKubeConfig,
"Overrode the target Kubernetes cluster for policy evaluation and enforcement",
"path", opts.targetKubeConfig,
)
}

instanceName, _ := os.Hostname() // on an error, instanceName will be empty, which is ok

reconciler := controllers.ConfigurationPolicyReconciler{
Client: mgr.GetClient(),
DecryptionConcurrency: decryptionConcurrency,
EvaluationConcurrency: evaluationConcurrency,
DecryptionConcurrency: opts.decryptionConcurrency,
EvaluationConcurrency: opts.evaluationConcurrency,
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor(controllers.ControllerName),
InstanceName: instanceName,
TargetK8sClient: targetK8sClient,
TargetK8sDynamicClient: targetK8sDynamicClient,
TargetK8sConfig: targetK8sConfig,
EnableMetrics: enableMetrics,
EnableMetrics: opts.enableMetrics,
}
if err = reconciler.SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create controller", "controller", "ConfigurationPolicy")
Expand All @@ -345,17 +324,17 @@ func main() {
managerCtx, managerCancel := context.WithCancel(context.Background())

// PeriodicallyExecConfigPolicies is the go-routine that periodically checks the policies
log.V(1).Info("Perodically processing Configuration Policies", "frequency", frequency)
log.V(1).Info("Perodically processing Configuration Policies", "frequency", opts.frequency)

go func() {
reconciler.PeriodicallyExecConfigPolicies(terminatingCtx, frequency, mgr.Elected())
reconciler.PeriodicallyExecConfigPolicies(terminatingCtx, opts.frequency, mgr.Elected())
managerCancel()
}()

// This lease is not related to leader election. This is to report the status of the controller
// to the addon framework. This can be seen in the "status" section of the ManagedClusterAddOn
// resource objects.
if enableLease {
if opts.enableLease {
operatorNs, err := common.GetOperatorNamespace()
if err != nil {
if errors.Is(err, common.ErrNoNamespace) || errors.Is(err, common.ErrRunLocal) {
Expand All @@ -373,11 +352,11 @@ func main() {
kubernetes.NewForConfigOrDie(cfg), "config-policy-controller", operatorNs,
)

hubCfg, err := clientcmd.BuildConfigFromFlags("", hubConfigPath)
hubCfg, err := clientcmd.BuildConfigFromFlags("", opts.hubConfigPath)
if err != nil {
log.Error(err, "Could not load hub config, lease updater not set with config")
} else {
leaseUpdater = leaseUpdater.WithHubLeaseConfig(hubCfg, clusterName)
leaseUpdater = leaseUpdater.WithHubLeaseConfig(hubCfg, opts.clusterName)
}

go leaseUpdater.Start(context.TODO())
Expand Down Expand Up @@ -437,7 +416,7 @@ func handleTriggerUninstall() {
// Get a config to talk to the apiserver
cfg, err := config.GetConfig()
if err != nil {
log.Error(err, "Failed to get config")
klog.Errorf("Failed to get config: %s", err)
os.Exit(1)
}

Expand All @@ -447,3 +426,124 @@ func handleTriggerUninstall() {
os.Exit(1)
}
}

func parseOpts(flags *pflag.FlagSet, args []string) *ctrlOpts {
opts := &ctrlOpts{}

flags.UintVar(
&opts.frequency,
"update-frequency",
10,
"The status update frequency (in seconds) of a mutation policy",
)

flags.BoolVar(
&opts.enableLease,
"enable-lease",
false,
"If enabled, the controller will start the lease controller to report its status",
)

flags.StringVar(
&opts.clusterName,
"cluster-name",
"acm-managed-cluster",
"Name of the cluster",
)

flags.StringVar(
&opts.hubConfigPath,
"hub-kubeconfig-path",
"/var/run/klusterlet/kubeconfig",
"Path to the hub kubeconfig",
)

flags.StringVar(
&opts.targetKubeConfig,
"target-kubeconfig-path",
"",
"A path to an alternative kubeconfig for policy evaluation and enforcement.",
)

flags.StringVar(
&opts.metricsAddr,
"metrics-bind-address",
"localhost:8383",
"The address the metrics endpoint binds to.",
)

flags.StringVar(
&opts.probeAddr,
"health-probe-bind-address",
":8081",
"The address the probe endpoint binds to.",
)

flags.BoolVar(
&opts.enableLeaderElection,
"leader-elect",
true,
"Enable leader election for controller manager. "+
"Enabling this will ensure there is only one active controller manager.",
)

flags.BoolVar(
&opts.legacyLeaderElection,
"legacy-leader-elect",
false,
"Use a legacy leader election method for controller manager instead of the lease API.",
)

flags.Uint8Var(
&opts.decryptionConcurrency,
"decryption-concurrency",
5,
"The max number of concurrent policy template decryptions",
)

flags.Uint8Var(
&opts.evaluationConcurrency,
"evaluation-concurrency",
// Set a low default to not add too much load to the Kubernetes API server in resource constrained deployments.
2,
"The max number of concurrent configuration policy evaluations",
)

flags.BoolVar(
&opts.enableMetrics,
"enable-metrics",
true,
"Disable custom metrics collection",
)

flags.Float32Var(
&opts.clientQPS,
"client-max-qps",
30, // 15 * concurrency is recommended
"The max queries per second that will be made against the kubernetes API server. "+
"Will scale with concurrency, if not explicitly set.",
)

flags.UintVar(
&opts.clientBurst,
"client-burst",
45, // the controller-runtime defaults are 20:30 (qps:burst) - this matches that ratio
"The maximum burst before client requests will be throttled. "+
"Will scale with concurrency, if not explicitly set.",
)

_ = flags.Parse(args)

// Scale QPS and Burst with concurrency, when they aren't explicitly set.
if flags.Changed("evaluation-concurrency") {
if !flags.Changed("client-max-qps") {
opts.clientQPS = float32(opts.evaluationConcurrency) * 15
}

if !flags.Changed("client-burst") {
opts.clientBurst = uint(opts.evaluationConcurrency)*22 + 1
}
}

return opts
}