Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable leader election by default. #1476

Merged
merged 5 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,6 @@ func TestStartAndShutdownWithLeaderAwareWithLostElection(t *testing.T) {
}
cc := leaderelection.ComponentConfig{
Component: "component",
LeaderElect: true,
ResourceLock: "leases",
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
Expand Down
187 changes: 37 additions & 150 deletions injection/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"

"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -118,100 +112,55 @@ func GetLeaderElectionConfig(ctx context.Context) (*kle.Config, error) {
return kle.NewConfigFromConfigMap(leaderElectionConfigMap)
mattmoor marked this conversation as resolved.
Show resolved Hide resolved
}

// Main runs the generic main flow for non-webhook controllers with a new
// context. Use WebhookMainWith* if you need to serve webhooks.
// Main runs the generic main flow with a new context.
// If any of the contructed controllers are AdmissionControllers or Conversion webhooks,
// then a webhook is started to serve them.
func Main(component string, ctors ...injection.ControllerConstructor) {
// Set up signals so we handle the first shutdown signal gracefully.
MainWithContext(signals.NewContext(), component, ctors...)
}

// MainWithContext runs the generic main flow for non-webhook controllers. Use
// WebhookMainWithContext if you need to serve webhooks.
func MainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) {
MainWithConfig(ctx, component, ParseAndGetConfigOrDie(), ctors...)
}

// MainWithConfig runs the generic main flow for non-webhook controllers. Use
// WebhookMainWithConfig if you need to serve webhooks.
func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) {
log.Printf("Registering %d clients", len(injection.Default.GetClients()))
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories()))
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))
log.Printf("Registering %d controllers", len(ctors))

MemStatsOrDie(ctx)

// Adjust our client's rate limits based on the number of controllers we are running.
cfg.QPS = float32(len(ctors)) * rest.DefaultQPS
cfg.Burst = len(ctors) * rest.DefaultBurst
ctx = injection.WithConfig(ctx, cfg)

ctx, informers := injection.Default.SetupInformers(ctx, cfg)
// Legacy aliases for back-compat.
var (
WebhookMainWithContext = MainWithContext
WebhookMainWithConfig = MainWithConfig
)

logger, atomicLevel := SetupLoggerOrDie(ctx, component)
defer flush(logger)
ctx = logging.WithLogger(ctx, logger)
profilingHandler := profiling.NewHandler(logger, false)
profilingServer := profiling.NewServer(profilingHandler)
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(profilingServer.ListenAndServe)
go func() {
// This will block until either a signal arrives or one of the grouped functions
// returns an error.
<-egCtx.Done()

profilingServer.Shutdown(context.Background())
if err := eg.Wait(); err != nil && err != http.ErrServerClosed {
logger.Errorw("Error while running server", zap.Error(err))
}
}()
CheckK8sClientMinimumVersionOrDie(ctx, logger)
// MainWithContext runs the generic main flow for controllers and
// webhooks. Use MainWithContext if you do not need to serve webhooks.
func MainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) {

run := func(ctx context.Context) {
cmw := SetupConfigMapWatchOrDie(ctx, logger)
controllers, _ := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...)
WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component)
WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component)
// TODO(mattmoor): Remove this once HA is stable.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue rather?

disableHighAvailability := flag.Bool("disable-ha", false,
"Whether to disable high-availability functionality for this component. This flag will be deprecated "+
"and removed when we have promoted this feature to stable, so do not pass it without filing an "+
"issue upstream!")

logger.Info("Starting configuration manager...")
if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatalw("Failed to start configuration manager", zap.Error(err))
}
logger.Info("Starting informers...")
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
logger.Fatalw("Failed to start informers", zap.Error(err))
}
logger.Info("Starting controllers...")
go controller.StartAll(ctx, controllers...)
// HACK: This parses flags, so the above should be set once this runs.
cfg := ParseAndGetConfigOrDie()

<-ctx.Done()
if *disableHighAvailability {
ctx = WithHADisabled(ctx)
}

// Set up leader election config
leaderElectionConfig, err := GetLeaderElectionConfig(ctx)
if err != nil {
logger.Fatalw("Error loading leader election configuration", zap.Error(err))
}
leConfig := leaderElectionConfig.GetComponentConfig(component)
MainWithConfig(ctx, component, cfg, ctors...)
}

if !leConfig.LeaderElect {
logger.Infof("%v will not run in leader-elected mode", component)
run(ctx)
} else {
RunLeaderElected(ctx, logger, run, leConfig)
}
type haDisabledKey struct{}

// WithHADisabled signals to MainWithConfig that it should not set up an appropriate leader elector for this component.
func WithHADisabled(ctx context.Context) context.Context {
return context.WithValue(ctx, haDisabledKey{}, struct{}{})
}

// WebhookMainWithContext runs the generic main flow for controllers and
// webhooks. Use MainWithContext if you do not need to serve webhooks.
func WebhookMainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) {
WebhookMainWithConfig(ctx, component, ParseAndGetConfigOrDie(), ctors...)
// IsHADisabled checks the context for the desired to disabled leader elector.
func IsHADisabled(ctx context.Context) bool {
return ctx.Value(haDisabledKey{}) != nil
}

// WebhookMainWithConfig runs the generic main flow for controllers and webhooks
// with the given config. Use MainWithConfig if you do not need to serve
// webhooks.
func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) {
// MainWithConfig runs the generic main flow for controllers and webhooks
// with the given config.
func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If leader election mode is enabled, before this change:

  • webhook is running on bucket level leader election mode.
  • other controllers are running on pod level leader election mode.

After this change, all controllers are running on bucket level leader election mode. Is my understading right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

log.Printf("Registering %d clients", len(injection.Default.GetClients()))
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories()))
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))
Expand All @@ -238,10 +187,11 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf
if err != nil {
logger.Fatalf("Error loading leader election configuration: %v", err)
}
leConfig := leaderElectionConfig.GetComponentConfig(component)
if leConfig.LeaderElect {

if !IsHADisabled(ctx) {
// Signal that we are executing in a context with leader election.
ctx = kle.WithDynamicLeaderElectorBuilder(ctx, kubeclient.Get(ctx), leConfig)
ctx = kle.WithDynamicLeaderElectorBuilder(ctx, kubeclient.Get(ctx),
leaderElectionConfig.GetComponentConfig(component))
}

controllers, webhooks := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...)
Expand Down Expand Up @@ -437,66 +387,3 @@ func ControllersAndWebhooksFromCtors(ctx context.Context,

return controllers, webhooks
}

// RunLeaderElected runs the given function in leader elected mode. The function
// will be run only once the leader election lock is obtained.
func RunLeaderElected(ctx context.Context, logger *zap.SugaredLogger, run func(context.Context), leConfig kle.ComponentConfig) {
recorder := controller.GetEventRecorder(ctx)
if recorder == nil {
// Create event broadcaster
logger.Debug("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
watches := []watch.Interface{
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof),
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events(system.Namespace())}),
}
recorder = eventBroadcaster.NewRecorder(
scheme.Scheme, corev1.EventSource{Component: leConfig.Component})
go func() {
<-ctx.Done()
for _, w := range watches {
w.Stop()
}
}()
}

// Create a unique identifier so that two controllers on the same host don't
// race.
id, err := kle.UniqueID()
if err != nil {
logger.Fatalw("Failed to get unique ID for leader election", zap.Error(err))
}
logger.Infof("%v will run in leader-elected mode with id %v", leConfig.Component, id)

// rl is the resource used to hold the leader election lock.
rl, err := resourcelock.New(leConfig.ResourceLock,
system.Namespace(), // use namespace we are running in
leConfig.Component, // component is used as the resource name
kubeclient.Get(ctx).CoreV1(),
kubeclient.Get(ctx).CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
logger.Fatalw("Error creating lock", zap.Error(err))
}

// Execute the `run` function when we have the lock.
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leConfig.LeaseDuration,
RenewDeadline: leConfig.RenewDeadline,
RetryPeriod: leConfig.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
logger.Fatal("Leader election lost")
},
},
ReleaseOnCancel: true,
// TODO: use health check watchdog, knative/pkg#1048
Name: leConfig.Component,
})
}
57 changes: 21 additions & 36 deletions leaderelection/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ func NewConfigFromMap(data map[string]string) (*Config, error) {
cm.AsDuration("retryPeriod", &config.RetryPeriod),

cm.AsUint32("buckets", &config.Buckets),

// enabledComponents are not validated here, because they are dependent on
// the component. Components should provide additional validation for this
// field.
cm.AsStringSet("enabledComponents", &config.EnabledComponents),
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -84,45 +79,42 @@ func NewConfigFromConfigMap(configMap *corev1.ConfigMap) (*Config, error) {
// contained within a single namespace. Typically these will correspond to a
// single source repository, viz: serving or eventing.
type Config struct {
ResourceLock string
Buckets uint32
LeaseDuration time.Duration
RenewDeadline time.Duration
RetryPeriod time.Duration
ResourceLock string
Buckets uint32
LeaseDuration time.Duration
RenewDeadline time.Duration
RetryPeriod time.Duration

// This field is deprecated and will be removed once downstream
// repositories have removed their validation of it.
// TODO(mattmoor): DO NOT SUBMIT open an issue to track.
EnabledComponents sets.String
}

func (c *Config) GetComponentConfig(name string) ComponentConfig {
if c.EnabledComponents.Has(name) {
return ComponentConfig{
Component: name,
LeaderElect: true,
Buckets: c.Buckets,
ResourceLock: c.ResourceLock,
LeaseDuration: c.LeaseDuration,
RenewDeadline: c.RenewDeadline,
RetryPeriod: c.RetryPeriod,
}
return ComponentConfig{
Component: name,
Buckets: c.Buckets,
ResourceLock: c.ResourceLock,
LeaseDuration: c.LeaseDuration,
RenewDeadline: c.RenewDeadline,
RetryPeriod: c.RetryPeriod,
}

return defaultComponentConfig(name)
}

func defaultConfig() *Config {
return &Config{
ResourceLock: "leases",
Buckets: 1,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
EnabledComponents: sets.NewString(),
ResourceLock: "leases",
Buckets: 1,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
}
}

// ComponentConfig represents the leader election config for a single component.
type ComponentConfig struct {
Component string
LeaderElect bool
Buckets uint32
ResourceLock string
LeaseDuration time.Duration
Expand Down Expand Up @@ -165,13 +157,6 @@ func newStatefulSetConfig() (*statefulSetConfig, error) {
return ssc, nil
}

func defaultComponentConfig(name string) ComponentConfig {
return ComponentConfig{
Component: name,
LeaderElect: false,
}
}

// ConfigMapName returns the name of the configmap to read for leader election
// settings.
func ConfigMapName() string {
Expand Down
Loading