diff --git a/controller/controller_test.go b/controller/controller_test.go index 3556af61aa..f03afeeddd 100644 --- a/controller/controller_test.go +++ b/controller/controller_test.go @@ -918,7 +918,6 @@ func TestStartAndShutdownWithLeaderAwareWithLostElection(t *testing.T) { } cc := leaderelection.ComponentConfig{ Component: "component", - LeaderElect: true, ResourceLock: "leases", LeaseDuration: 15 * time.Second, RenewDeadline: 10 * time.Second, diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 6c4acf71ec..676e47de82 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -33,14 +33,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -48,7 +42,7 @@ import ( "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/injection" - kle "knative.dev/pkg/leaderelection" + "knative.dev/pkg/leaderelection" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "knative.dev/pkg/profiling" @@ -108,110 +102,65 @@ func GetLoggingConfig(ctx context.Context) (*logging.Config, error) { } // GetLeaderElectionConfig gets the leader election config. -func GetLeaderElectionConfig(ctx context.Context) (*kle.Config, error) { - leaderElectionConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(kle.ConfigMapName(), metav1.GetOptions{}) +func GetLeaderElectionConfig(ctx context.Context) (*leaderelection.Config, error) { + leaderElectionConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(leaderelection.ConfigMapName(), metav1.GetOptions{}) if apierrors.IsNotFound(err) { - return kle.NewConfigFromConfigMap(nil) + return leaderelection.NewConfigFromConfigMap(nil) } else if err != nil { return nil, err } - return kle.NewConfigFromConfigMap(leaderElectionConfigMap) + return leaderelection.NewConfigFromConfigMap(leaderElectionConfigMap) } -// Main runs the generic main flow for non-webhook controllers with a new -// context. Use WebhookMainWith* if you need to serve webhooks. +// Main runs the generic main flow with a new context. +// If any of the contructed controllers are AdmissionControllers or Conversion webhooks, +// then a webhook is started to serve them. func Main(component string, ctors ...injection.ControllerConstructor) { // Set up signals so we handle the first shutdown signal gracefully. MainWithContext(signals.NewContext(), component, ctors...) } -// MainWithContext runs the generic main flow for non-webhook controllers. Use -// WebhookMainWithContext if you need to serve webhooks. -func MainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) { - MainWithConfig(ctx, component, ParseAndGetConfigOrDie(), ctors...) -} - -// MainWithConfig runs the generic main flow for non-webhook controllers. Use -// WebhookMainWithConfig if you need to serve webhooks. -func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) { - log.Printf("Registering %d clients", len(injection.Default.GetClients())) - log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories())) - log.Printf("Registering %d informers", len(injection.Default.GetInformers())) - log.Printf("Registering %d controllers", len(ctors)) - - MemStatsOrDie(ctx) - - // Adjust our client's rate limits based on the number of controllers we are running. - cfg.QPS = float32(len(ctors)) * rest.DefaultQPS - cfg.Burst = len(ctors) * rest.DefaultBurst - ctx = injection.WithConfig(ctx, cfg) - - ctx, informers := injection.Default.SetupInformers(ctx, cfg) +// Legacy aliases for back-compat. +var ( + WebhookMainWithContext = MainWithContext + WebhookMainWithConfig = MainWithConfig +) - logger, atomicLevel := SetupLoggerOrDie(ctx, component) - defer flush(logger) - ctx = logging.WithLogger(ctx, logger) - profilingHandler := profiling.NewHandler(logger, false) - profilingServer := profiling.NewServer(profilingHandler) - eg, egCtx := errgroup.WithContext(ctx) - eg.Go(profilingServer.ListenAndServe) - go func() { - // This will block until either a signal arrives or one of the grouped functions - // returns an error. - <-egCtx.Done() - - profilingServer.Shutdown(context.Background()) - if err := eg.Wait(); err != nil && err != http.ErrServerClosed { - logger.Errorw("Error while running server", zap.Error(err)) - } - }() - CheckK8sClientMinimumVersionOrDie(ctx, logger) +// MainWithContext runs the generic main flow for controllers and +// webhooks. Use MainWithContext if you do not need to serve webhooks. +func MainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) { - run := func(ctx context.Context) { - cmw := SetupConfigMapWatchOrDie(ctx, logger) - controllers, _ := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...) - WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component) - WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component) + // TODO(mattmoor): Remove this once HA is stable. + disableHighAvailability := flag.Bool("disable-ha", false, + "Whether to disable high-availability functionality for this component. This flag will be deprecated "+ + "and removed when we have promoted this feature to stable, so do not pass it without filing an "+ + "issue upstream!") - logger.Info("Starting configuration manager...") - if err := cmw.Start(ctx.Done()); err != nil { - logger.Fatalw("Failed to start configuration manager", zap.Error(err)) - } - logger.Info("Starting informers...") - if err := controller.StartInformers(ctx.Done(), informers...); err != nil { - logger.Fatalw("Failed to start informers", zap.Error(err)) - } - logger.Info("Starting controllers...") - go controller.StartAll(ctx, controllers...) + // HACK: This parses flags, so the above should be set once this runs. + cfg := ParseAndGetConfigOrDie() - <-ctx.Done() + if *disableHighAvailability { + ctx = WithHADisabled(ctx) } - // Set up leader election config - leaderElectionConfig, err := GetLeaderElectionConfig(ctx) - if err != nil { - logger.Fatalw("Error loading leader election configuration", zap.Error(err)) - } - leConfig := leaderElectionConfig.GetComponentConfig(component) + MainWithConfig(ctx, component, cfg, ctors...) +} - if !leConfig.LeaderElect { - logger.Infof("%v will not run in leader-elected mode", component) - run(ctx) - } else { - RunLeaderElected(ctx, logger, run, leConfig) - } +type haDisabledKey struct{} + +// WithHADisabled signals to MainWithConfig that it should not set up an appropriate leader elector for this component. +func WithHADisabled(ctx context.Context) context.Context { + return context.WithValue(ctx, haDisabledKey{}, struct{}{}) } -// WebhookMainWithContext runs the generic main flow for controllers and -// webhooks. Use MainWithContext if you do not need to serve webhooks. -func WebhookMainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) { - WebhookMainWithConfig(ctx, component, ParseAndGetConfigOrDie(), ctors...) +// IsHADisabled checks the context for the desired to disabled leader elector. +func IsHADisabled(ctx context.Context) bool { + return ctx.Value(haDisabledKey{}) != nil } -// WebhookMainWithConfig runs the generic main flow for controllers and webhooks -// with the given config. Use MainWithConfig if you do not need to serve -// webhooks. -func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) { +// MainWithConfig runs the generic main flow for controllers and webhooks +// with the given config. +func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) { log.Printf("Registering %d clients", len(injection.Default.GetClients())) log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories())) log.Printf("Registering %d informers", len(injection.Default.GetInformers())) @@ -238,10 +187,11 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf if err != nil { logger.Fatalf("Error loading leader election configuration: %v", err) } - leConfig := leaderElectionConfig.GetComponentConfig(component) - if leConfig.LeaderElect { + + if !IsHADisabled(ctx) { // Signal that we are executing in a context with leader election. - ctx = kle.WithDynamicLeaderElectorBuilder(ctx, kubeclient.Get(ctx), leConfig) + ctx = leaderelection.WithDynamicLeaderElectorBuilder(ctx, kubeclient.Get(ctx), + leaderElectionConfig.GetComponentConfig(component)) } controllers, webhooks := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...) @@ -251,6 +201,14 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf eg, egCtx := errgroup.WithContext(ctx) eg.Go(profilingServer.ListenAndServe) + // Many of the webhooks rely on configuration, e.g. configurable defaults, feature flags. + // So make sure that we have synchonized our configuration state before launching the + // webhooks, so that things are properly initialized. + logger.Info("Starting configuration manager...") + if err := cmw.Start(ctx.Done()); err != nil { + logger.Fatalw("Failed to start configuration manager", zap.Error(err)) + } + // If we have one or more admission controllers, then start the webhook // and pass them in. var wh *webhook.Webhook @@ -267,10 +225,6 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf }) } - logger.Info("Starting configuration manager...") - if err := cmw.Start(ctx.Done()); err != nil { - logger.Fatalw("Failed to start configuration manager", zap.Error(err)) - } logger.Info("Starting informers...") if err := controller.StartInformers(ctx.Done(), informers...); err != nil { logger.Fatalw("Failed to start informers", zap.Error(err)) @@ -414,7 +368,7 @@ func ControllersAndWebhooksFromCtors(ctx context.Context, // Check whether the context has been infused with a leader elector builder. // If it has, then every reconciler we plan to start MUST implement LeaderAware. - leEnabled := kle.HasLeaderElection(ctx) + leEnabled := leaderelection.HasLeaderElection(ctx) controllers := make([]*controller.Impl, 0, len(ctors)) webhooks := make([]interface{}, 0) @@ -437,66 +391,3 @@ func ControllersAndWebhooksFromCtors(ctx context.Context, return controllers, webhooks } - -// RunLeaderElected runs the given function in leader elected mode. The function -// will be run only once the leader election lock is obtained. -func RunLeaderElected(ctx context.Context, logger *zap.SugaredLogger, run func(context.Context), leConfig kle.ComponentConfig) { - recorder := controller.GetEventRecorder(ctx) - if recorder == nil { - // Create event broadcaster - logger.Debug("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - watches := []watch.Interface{ - eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), - eventBroadcaster.StartRecordingToSink( - &typedcorev1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events(system.Namespace())}), - } - recorder = eventBroadcaster.NewRecorder( - scheme.Scheme, corev1.EventSource{Component: leConfig.Component}) - go func() { - <-ctx.Done() - for _, w := range watches { - w.Stop() - } - }() - } - - // Create a unique identifier so that two controllers on the same host don't - // race. - id, err := kle.UniqueID() - if err != nil { - logger.Fatalw("Failed to get unique ID for leader election", zap.Error(err)) - } - logger.Infof("%v will run in leader-elected mode with id %v", leConfig.Component, id) - - // rl is the resource used to hold the leader election lock. - rl, err := resourcelock.New(leConfig.ResourceLock, - system.Namespace(), // use namespace we are running in - leConfig.Component, // component is used as the resource name - kubeclient.Get(ctx).CoreV1(), - kubeclient.Get(ctx).CoordinationV1(), - resourcelock.ResourceLockConfig{ - Identity: id, - EventRecorder: recorder, - }) - if err != nil { - logger.Fatalw("Error creating lock", zap.Error(err)) - } - - // Execute the `run` function when we have the lock. - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: rl, - LeaseDuration: leConfig.LeaseDuration, - RenewDeadline: leConfig.RenewDeadline, - RetryPeriod: leConfig.RetryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: run, - OnStoppedLeading: func() { - logger.Fatal("Leader election lost") - }, - }, - ReleaseOnCancel: true, - // TODO: use health check watchdog, knative/pkg#1048 - Name: leConfig.Component, - }) -} diff --git a/leaderelection/config.go b/leaderelection/config.go index 880dd8c2b9..b694c4fd20 100644 --- a/leaderelection/config.go +++ b/leaderelection/config.go @@ -52,11 +52,6 @@ func NewConfigFromMap(data map[string]string) (*Config, error) { cm.AsDuration("retryPeriod", &config.RetryPeriod), cm.AsUint32("buckets", &config.Buckets), - - // enabledComponents are not validated here, because they are dependent on - // the component. Components should provide additional validation for this - // field. - cm.AsStringSet("enabledComponents", &config.EnabledComponents), ); err != nil { return nil, err } @@ -84,45 +79,42 @@ func NewConfigFromConfigMap(configMap *corev1.ConfigMap) (*Config, error) { // contained within a single namespace. Typically these will correspond to a // single source repository, viz: serving or eventing. type Config struct { - ResourceLock string - Buckets uint32 - LeaseDuration time.Duration - RenewDeadline time.Duration - RetryPeriod time.Duration + ResourceLock string + Buckets uint32 + LeaseDuration time.Duration + RenewDeadline time.Duration + RetryPeriod time.Duration + + // This field is deprecated and will be removed once downstream + // repositories have removed their validation of it. + // TODO(https://github.com/knative/pkg/issues/1478): Remove this field. EnabledComponents sets.String } func (c *Config) GetComponentConfig(name string) ComponentConfig { - if c.EnabledComponents.Has(name) { - return ComponentConfig{ - Component: name, - LeaderElect: true, - Buckets: c.Buckets, - ResourceLock: c.ResourceLock, - LeaseDuration: c.LeaseDuration, - RenewDeadline: c.RenewDeadline, - RetryPeriod: c.RetryPeriod, - } + return ComponentConfig{ + Component: name, + Buckets: c.Buckets, + ResourceLock: c.ResourceLock, + LeaseDuration: c.LeaseDuration, + RenewDeadline: c.RenewDeadline, + RetryPeriod: c.RetryPeriod, } - - return defaultComponentConfig(name) } func defaultConfig() *Config { return &Config{ - ResourceLock: "leases", - Buckets: 1, - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - EnabledComponents: sets.NewString(), + ResourceLock: "leases", + Buckets: 1, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, } } // ComponentConfig represents the leader election config for a single component. type ComponentConfig struct { Component string - LeaderElect bool Buckets uint32 ResourceLock string LeaseDuration time.Duration @@ -165,13 +157,6 @@ func newStatefulSetConfig() (*statefulSetConfig, error) { return ssc, nil } -func defaultComponentConfig(name string) ComponentConfig { - return ComponentConfig{ - Component: name, - LeaderElect: false, - } -} - // ConfigMapName returns the name of the configmap to read for leader election // settings. func ConfigMapName() string { diff --git a/leaderelection/config_test.go b/leaderelection/config_test.go index c9066ca4cd..44a1516a58 100644 --- a/leaderelection/config_test.go +++ b/leaderelection/config_test.go @@ -25,18 +25,16 @@ import ( "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/sets" "knative.dev/pkg/kmeta" ) func okConfig() *Config { return &Config{ - ResourceLock: "leases", - Buckets: 1, - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - EnabledComponents: sets.NewString(), + ResourceLock: "leases", + Buckets: 1, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, } } @@ -47,10 +45,9 @@ func okData() map[string]string { // values in this data come from the defaults suggested in the // code: // https://github.com/kubernetes/client-go/blob/kubernetes-1.16.0/tools/leaderelection/leaderelection.go - "leaseDuration": "15s", - "renewDeadline": "10s", - "retryPeriod": "2s", - "enabledComponents": "controller", + "leaseDuration": "15s", + "renewDeadline": "10s", + "retryPeriod": "2s", } } @@ -61,21 +58,9 @@ func TestNewConfigMapFromData(t *testing.T) { expected *Config err error }{{ - name: "disabled but OK config", - data: func() map[string]string { - data := okData() - delete(data, "enabledComponents") - return data - }(), + name: "OK config - controller enabled", + data: okData(), expected: okConfig(), - }, { - name: "OK config - controller enabled", - data: okData(), - expected: func() *Config { - config := okConfig() - config.EnabledComponents.Insert("controller") - return config - }(), }, { name: "OK config - controller enabled with multiple buckets", data: kmeta.UnionMaps(okData(), map[string]string{ @@ -83,7 +68,6 @@ func TestNewConfigMapFromData(t *testing.T) { }), expected: func() *Config { config := okConfig() - config.EnabledComponents.Insert("controller") config.Buckets = 5 return config }(), @@ -158,33 +142,18 @@ func TestGetComponentConfig(t *testing.T) { }{{ name: "component enabled", config: Config{ - ResourceLock: "leases", - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - EnabledComponents: sets.NewString(expectedName), + ResourceLock: "leases", + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, }, expected: ComponentConfig{ Component: expectedName, - LeaderElect: true, ResourceLock: "leases", LeaseDuration: 15 * time.Second, RenewDeadline: 10 * time.Second, RetryPeriod: 2 * time.Second, }, - }, { - name: "component disabled", - config: Config{ - ResourceLock: "leases", - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - EnabledComponents: sets.NewString("not-the-component"), - }, - expected: ComponentConfig{ - Component: expectedName, - LeaderElect: false, - }, }} for _, tc := range cases { diff --git a/leaderelection/context_test.go b/leaderelection/context_test.go index 601ccac546..0a66ae3f29 100644 --- a/leaderelection/context_test.go +++ b/leaderelection/context_test.go @@ -37,7 +37,6 @@ import ( func TestWithBuilder(t *testing.T) { cc := ComponentConfig{ Component: "the-component", - LeaderElect: true, Buckets: 1, ResourceLock: "leases", LeaseDuration: 15 * time.Second, @@ -149,9 +148,8 @@ func TestWithBuilder(t *testing.T) { func TestWithStatefulSetBuilder(t *testing.T) { cc := ComponentConfig{ - Component: "the-component", - LeaderElect: true, - Buckets: 1, + Component: "the-component", + Buckets: 1, } const podDNS = "ws://as-42.autoscaler.knative-testing.svc.cluster.local:8080" ctx := context.Background()