-
Notifications
You must be signed in to change notification settings - Fork 331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable leader election by default. #1476
Changes from all commits
591ba44
c679dd5
004bb4f
7339b84
672516d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,22 +33,16 @@ import ( | |
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/labels" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
"k8s.io/apimachinery/pkg/watch" | ||
"k8s.io/client-go/kubernetes/scheme" | ||
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/clientcmd" | ||
"k8s.io/client-go/tools/leaderelection" | ||
"k8s.io/client-go/tools/leaderelection/resourcelock" | ||
"k8s.io/client-go/tools/record" | ||
|
||
"go.uber.org/zap" | ||
apierrors "k8s.io/apimachinery/pkg/api/errors" | ||
kubeclient "knative.dev/pkg/client/injection/kube/client" | ||
"knative.dev/pkg/configmap" | ||
"knative.dev/pkg/controller" | ||
"knative.dev/pkg/injection" | ||
kle "knative.dev/pkg/leaderelection" | ||
"knative.dev/pkg/leaderelection" | ||
"knative.dev/pkg/logging" | ||
"knative.dev/pkg/metrics" | ||
"knative.dev/pkg/profiling" | ||
|
@@ -108,110 +102,65 @@ func GetLoggingConfig(ctx context.Context) (*logging.Config, error) { | |
} | ||
|
||
// GetLeaderElectionConfig gets the leader election config. | ||
func GetLeaderElectionConfig(ctx context.Context) (*kle.Config, error) { | ||
leaderElectionConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(kle.ConfigMapName(), metav1.GetOptions{}) | ||
func GetLeaderElectionConfig(ctx context.Context) (*leaderelection.Config, error) { | ||
leaderElectionConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(leaderelection.ConfigMapName(), metav1.GetOptions{}) | ||
if apierrors.IsNotFound(err) { | ||
return kle.NewConfigFromConfigMap(nil) | ||
return leaderelection.NewConfigFromConfigMap(nil) | ||
} else if err != nil { | ||
return nil, err | ||
} | ||
return kle.NewConfigFromConfigMap(leaderElectionConfigMap) | ||
return leaderelection.NewConfigFromConfigMap(leaderElectionConfigMap) | ||
} | ||
|
||
// Main runs the generic main flow for non-webhook controllers with a new | ||
// context. Use WebhookMainWith* if you need to serve webhooks. | ||
// Main runs the generic main flow with a new context. | ||
// If any of the contructed controllers are AdmissionControllers or Conversion webhooks, | ||
// then a webhook is started to serve them. | ||
func Main(component string, ctors ...injection.ControllerConstructor) { | ||
// Set up signals so we handle the first shutdown signal gracefully. | ||
MainWithContext(signals.NewContext(), component, ctors...) | ||
} | ||
|
||
// MainWithContext runs the generic main flow for non-webhook controllers. Use | ||
// WebhookMainWithContext if you need to serve webhooks. | ||
func MainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) { | ||
MainWithConfig(ctx, component, ParseAndGetConfigOrDie(), ctors...) | ||
} | ||
|
||
// MainWithConfig runs the generic main flow for non-webhook controllers. Use | ||
// WebhookMainWithConfig if you need to serve webhooks. | ||
func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) { | ||
log.Printf("Registering %d clients", len(injection.Default.GetClients())) | ||
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories())) | ||
log.Printf("Registering %d informers", len(injection.Default.GetInformers())) | ||
log.Printf("Registering %d controllers", len(ctors)) | ||
|
||
MemStatsOrDie(ctx) | ||
|
||
// Adjust our client's rate limits based on the number of controllers we are running. | ||
cfg.QPS = float32(len(ctors)) * rest.DefaultQPS | ||
cfg.Burst = len(ctors) * rest.DefaultBurst | ||
ctx = injection.WithConfig(ctx, cfg) | ||
|
||
ctx, informers := injection.Default.SetupInformers(ctx, cfg) | ||
// Legacy aliases for back-compat. | ||
var ( | ||
WebhookMainWithContext = MainWithContext | ||
WebhookMainWithConfig = MainWithConfig | ||
) | ||
|
||
logger, atomicLevel := SetupLoggerOrDie(ctx, component) | ||
defer flush(logger) | ||
ctx = logging.WithLogger(ctx, logger) | ||
profilingHandler := profiling.NewHandler(logger, false) | ||
profilingServer := profiling.NewServer(profilingHandler) | ||
eg, egCtx := errgroup.WithContext(ctx) | ||
eg.Go(profilingServer.ListenAndServe) | ||
go func() { | ||
// This will block until either a signal arrives or one of the grouped functions | ||
// returns an error. | ||
<-egCtx.Done() | ||
|
||
profilingServer.Shutdown(context.Background()) | ||
if err := eg.Wait(); err != nil && err != http.ErrServerClosed { | ||
logger.Errorw("Error while running server", zap.Error(err)) | ||
} | ||
}() | ||
CheckK8sClientMinimumVersionOrDie(ctx, logger) | ||
// MainWithContext runs the generic main flow for controllers and | ||
// webhooks. Use MainWithContext if you do not need to serve webhooks. | ||
func MainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) { | ||
|
||
run := func(ctx context.Context) { | ||
cmw := SetupConfigMapWatchOrDie(ctx, logger) | ||
controllers, _ := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...) | ||
WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component) | ||
WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component) | ||
// TODO(mattmoor): Remove this once HA is stable. | ||
disableHighAvailability := flag.Bool("disable-ha", false, | ||
"Whether to disable high-availability functionality for this component. This flag will be deprecated "+ | ||
"and removed when we have promoted this feature to stable, so do not pass it without filing an "+ | ||
"issue upstream!") | ||
|
||
logger.Info("Starting configuration manager...") | ||
if err := cmw.Start(ctx.Done()); err != nil { | ||
logger.Fatalw("Failed to start configuration manager", zap.Error(err)) | ||
} | ||
logger.Info("Starting informers...") | ||
if err := controller.StartInformers(ctx.Done(), informers...); err != nil { | ||
logger.Fatalw("Failed to start informers", zap.Error(err)) | ||
} | ||
logger.Info("Starting controllers...") | ||
go controller.StartAll(ctx, controllers...) | ||
// HACK: This parses flags, so the above should be set once this runs. | ||
cfg := ParseAndGetConfigOrDie() | ||
|
||
<-ctx.Done() | ||
if *disableHighAvailability { | ||
ctx = WithHADisabled(ctx) | ||
} | ||
|
||
// Set up leader election config | ||
leaderElectionConfig, err := GetLeaderElectionConfig(ctx) | ||
if err != nil { | ||
logger.Fatalw("Error loading leader election configuration", zap.Error(err)) | ||
} | ||
leConfig := leaderElectionConfig.GetComponentConfig(component) | ||
MainWithConfig(ctx, component, cfg, ctors...) | ||
} | ||
|
||
if !leConfig.LeaderElect { | ||
logger.Infof("%v will not run in leader-elected mode", component) | ||
run(ctx) | ||
} else { | ||
RunLeaderElected(ctx, logger, run, leConfig) | ||
} | ||
type haDisabledKey struct{} | ||
|
||
// WithHADisabled signals to MainWithConfig that it should not set up an appropriate leader elector for this component. | ||
func WithHADisabled(ctx context.Context) context.Context { | ||
return context.WithValue(ctx, haDisabledKey{}, struct{}{}) | ||
} | ||
|
||
// WebhookMainWithContext runs the generic main flow for controllers and | ||
// webhooks. Use MainWithContext if you do not need to serve webhooks. | ||
func WebhookMainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) { | ||
WebhookMainWithConfig(ctx, component, ParseAndGetConfigOrDie(), ctors...) | ||
// IsHADisabled checks the context for the desired to disabled leader elector. | ||
func IsHADisabled(ctx context.Context) bool { | ||
return ctx.Value(haDisabledKey{}) != nil | ||
} | ||
|
||
// WebhookMainWithConfig runs the generic main flow for controllers and webhooks | ||
// with the given config. Use MainWithConfig if you do not need to serve | ||
// webhooks. | ||
func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) { | ||
// MainWithConfig runs the generic main flow for controllers and webhooks | ||
// with the given config. | ||
func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If leader election mode is enabled, before this change:
After this change, all controllers are running on bucket level leader election mode. Is my understading right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
log.Printf("Registering %d clients", len(injection.Default.GetClients())) | ||
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories())) | ||
log.Printf("Registering %d informers", len(injection.Default.GetInformers())) | ||
|
@@ -238,10 +187,11 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf | |
if err != nil { | ||
logger.Fatalf("Error loading leader election configuration: %v", err) | ||
} | ||
leConfig := leaderElectionConfig.GetComponentConfig(component) | ||
if leConfig.LeaderElect { | ||
|
||
if !IsHADisabled(ctx) { | ||
// Signal that we are executing in a context with leader election. | ||
ctx = kle.WithDynamicLeaderElectorBuilder(ctx, kubeclient.Get(ctx), leConfig) | ||
ctx = leaderelection.WithDynamicLeaderElectorBuilder(ctx, kubeclient.Get(ctx), | ||
leaderElectionConfig.GetComponentConfig(component)) | ||
} | ||
|
||
controllers, webhooks := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...) | ||
|
@@ -251,6 +201,14 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf | |
eg, egCtx := errgroup.WithContext(ctx) | ||
eg.Go(profilingServer.ListenAndServe) | ||
|
||
// Many of the webhooks rely on configuration, e.g. configurable defaults, feature flags. | ||
// So make sure that we have synchonized our configuration state before launching the | ||
// webhooks, so that things are properly initialized. | ||
logger.Info("Starting configuration manager...") | ||
if err := cmw.Start(ctx.Done()); err != nil { | ||
logger.Fatalw("Failed to start configuration manager", zap.Error(err)) | ||
} | ||
|
||
// If we have one or more admission controllers, then start the webhook | ||
// and pass them in. | ||
var wh *webhook.Webhook | ||
|
@@ -267,10 +225,6 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf | |
}) | ||
} | ||
|
||
logger.Info("Starting configuration manager...") | ||
if err := cmw.Start(ctx.Done()); err != nil { | ||
logger.Fatalw("Failed to start configuration manager", zap.Error(err)) | ||
} | ||
logger.Info("Starting informers...") | ||
if err := controller.StartInformers(ctx.Done(), informers...); err != nil { | ||
logger.Fatalw("Failed to start informers", zap.Error(err)) | ||
|
@@ -414,7 +368,7 @@ func ControllersAndWebhooksFromCtors(ctx context.Context, | |
|
||
// Check whether the context has been infused with a leader elector builder. | ||
// If it has, then every reconciler we plan to start MUST implement LeaderAware. | ||
leEnabled := kle.HasLeaderElection(ctx) | ||
leEnabled := leaderelection.HasLeaderElection(ctx) | ||
|
||
controllers := make([]*controller.Impl, 0, len(ctors)) | ||
webhooks := make([]interface{}, 0) | ||
|
@@ -437,66 +391,3 @@ func ControllersAndWebhooksFromCtors(ctx context.Context, | |
|
||
return controllers, webhooks | ||
} | ||
|
||
// RunLeaderElected runs the given function in leader elected mode. The function | ||
// will be run only once the leader election lock is obtained. | ||
func RunLeaderElected(ctx context.Context, logger *zap.SugaredLogger, run func(context.Context), leConfig kle.ComponentConfig) { | ||
recorder := controller.GetEventRecorder(ctx) | ||
if recorder == nil { | ||
// Create event broadcaster | ||
logger.Debug("Creating event broadcaster") | ||
eventBroadcaster := record.NewBroadcaster() | ||
watches := []watch.Interface{ | ||
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), | ||
eventBroadcaster.StartRecordingToSink( | ||
&typedcorev1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events(system.Namespace())}), | ||
} | ||
recorder = eventBroadcaster.NewRecorder( | ||
scheme.Scheme, corev1.EventSource{Component: leConfig.Component}) | ||
go func() { | ||
<-ctx.Done() | ||
for _, w := range watches { | ||
w.Stop() | ||
} | ||
}() | ||
} | ||
|
||
// Create a unique identifier so that two controllers on the same host don't | ||
// race. | ||
id, err := kle.UniqueID() | ||
if err != nil { | ||
logger.Fatalw("Failed to get unique ID for leader election", zap.Error(err)) | ||
} | ||
logger.Infof("%v will run in leader-elected mode with id %v", leConfig.Component, id) | ||
|
||
// rl is the resource used to hold the leader election lock. | ||
rl, err := resourcelock.New(leConfig.ResourceLock, | ||
system.Namespace(), // use namespace we are running in | ||
leConfig.Component, // component is used as the resource name | ||
kubeclient.Get(ctx).CoreV1(), | ||
kubeclient.Get(ctx).CoordinationV1(), | ||
resourcelock.ResourceLockConfig{ | ||
Identity: id, | ||
EventRecorder: recorder, | ||
}) | ||
if err != nil { | ||
logger.Fatalw("Error creating lock", zap.Error(err)) | ||
} | ||
|
||
// Execute the `run` function when we have the lock. | ||
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ | ||
Lock: rl, | ||
LeaseDuration: leConfig.LeaseDuration, | ||
RenewDeadline: leConfig.RenewDeadline, | ||
RetryPeriod: leConfig.RetryPeriod, | ||
Callbacks: leaderelection.LeaderCallbacks{ | ||
OnStartedLeading: run, | ||
OnStoppedLeading: func() { | ||
logger.Fatal("Leader election lost") | ||
}, | ||
}, | ||
ReleaseOnCancel: true, | ||
// TODO: use health check watchdog, knative/pkg#1048 | ||
Name: leConfig.Component, | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue rather?