diff --git a/pkg/autopilot/controller/leases.go b/pkg/autopilot/controller/leases.go index 8a0c87cf585f..e9f56e56afcd 100644 --- a/pkg/autopilot/controller/leases.go +++ b/pkg/autopilot/controller/leases.go @@ -61,7 +61,7 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name leaseEventStatusCh := make(chan LeaseEventStatus, 10) errorCh := make(chan error, 10) - go func(ctx context.Context) { + go func() { defer close(leaseEventStatusCh) defer close(errorCh) @@ -76,21 +76,19 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name return default: - ctx, cancel := context.WithCancel(ctx) leasePoolOpts := []leaderelection.LeaseOpt{ - leaderelection.WithContext(ctx), leaderelection.WithNamespace(namespace), } - leasePool, err := leaderelection.NewLeasePool(ctx, lw.client, name, identity, leasePoolOpts...) + leasePool, err := leaderelection.NewLeasePool(lw.client, name, identity, leasePoolOpts...) if err != nil { errorCh <- fmt.Errorf("failed to create lease pool: %w", err) - cancel() return } - events, _, err := leasePool.Watch() + ctx, cancel := context.WithCancel(ctx) + events, err := leasePool.Watch(ctx) if err != nil { errorCh <- fmt.Errorf("failed to watch lease pool: %w", err) cancel() @@ -103,7 +101,7 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name cancel() } } - }(ctx) + }() return leaseEventStatusCh, errorCh } diff --git a/pkg/component/controller/controllersleasecounter.go b/pkg/component/controller/controllersleasecounter.go index ffbe76a97766..41f24c6365ad 100644 --- a/pkg/component/controller/controllersleasecounter.go +++ b/pkg/component/controller/controllersleasecounter.go @@ -41,8 +41,7 @@ type K0sControllersLeaseCounter struct { KubeClientFactory kubeutil.ClientFactoryInterface UpdateControllerCount func(uint) - cancelFunc context.CancelFunc - leaseCancel context.CancelFunc + cancelFunc context.CancelFunc } var _ manager.Component = (*K0sControllersLeaseCounter)(nil) @@ -53,8 +52,7 @@ func (l *K0sControllersLeaseCounter) Init(_ context.Context) error { } // Run runs the leader elector to keep the lease object up-to-date. -func (l *K0sControllersLeaseCounter) Start(ctx context.Context) error { - ctx, l.cancelFunc = context.WithCancel(ctx) +func (l *K0sControllersLeaseCounter) Start(context.Context) error { log := logrus.WithFields(logrus.Fields{"component": "controllerlease"}) client, err := l.KubeClientFactory.GetClient() if err != nil { @@ -69,18 +67,19 @@ func (l *K0sControllersLeaseCounter) Start(ctx context.Context) error { } leaseName := fmt.Sprintf("k0s-ctrl-%s", nodeName) - leasePool, err := leaderelection.NewLeasePool(ctx, client, leaseName, l.InvocationID, - leaderelection.WithLogger(log), - leaderelection.WithContext(ctx)) + leasePool, err := leaderelection.NewLeasePool(client, leaseName, l.InvocationID, + leaderelection.WithLogger(log)) if err != nil { return err } - events, cancel, err := leasePool.Watch() + + ctx, cancel := context.WithCancel(context.Background()) + events, err := leasePool.Watch(ctx) if err != nil { + cancel() return err } - - l.leaseCancel = cancel + l.cancelFunc = cancel go func() { for { @@ -102,10 +101,6 @@ func (l *K0sControllersLeaseCounter) Start(ctx context.Context) error { // Stop stops the component func (l *K0sControllersLeaseCounter) Stop() error { - if l.leaseCancel != nil { - l.leaseCancel() - } - if l.cancelFunc != nil { l.cancelFunc() } diff --git a/pkg/component/controller/leaderelector/leasepool.go b/pkg/component/controller/leaderelector/leasepool.go index 542bb867d981..65b60fc9a10f 100644 --- a/pkg/component/controller/leaderelector/leasepool.go +++ b/pkg/component/controller/leaderelector/leasepool.go @@ -61,19 +61,20 @@ func (l *LeasePool) Init(_ context.Context) error { return nil } -func (l *LeasePool) Start(ctx context.Context) error { +func (l *LeasePool) Start(context.Context) error { client, err := l.kubeClientFactory.GetClient() if err != nil { return fmt.Errorf("can't create kubernetes rest client for lease pool: %w", err) } - leasePool, err := leaderelection.NewLeasePool(ctx, client, l.name, l.invocationID, - leaderelection.WithLogger(l.log), - leaderelection.WithContext(ctx)) + leasePool, err := leaderelection.NewLeasePool(client, l.name, l.invocationID, + leaderelection.WithLogger(l.log)) if err != nil { return err } - events, cancel, err := leasePool.Watch() + ctx, cancel := context.WithCancel(context.Background()) + events, err := leasePool.Watch(ctx) if err != nil { + cancel() return err } l.leaseCancel = cancel diff --git a/pkg/leaderelection/lease_pool.go b/pkg/leaderelection/lease_pool.go index aa709b3d46b9..80313afb9e76 100644 --- a/pkg/leaderelection/lease_pool.go +++ b/pkg/leaderelection/lease_pool.go @@ -49,7 +49,6 @@ type LeaseConfiguration struct { renewDeadline time.Duration retryPeriod time.Duration log logrus.FieldLogger - ctx context.Context } // A LeaseOpt is a function that modifies a LeaseConfiguration @@ -90,14 +89,6 @@ func WithLogger(logger logrus.FieldLogger) LeaseOpt { } } -// WithContext allows the consumer to pass its own context, for example a cancelable context -func WithContext(ctx context.Context) LeaseOpt { - return func(config LeaseConfiguration) LeaseConfiguration { - config.ctx = ctx - return config - } -} - // WithNamespace specifies which namespace the lease should be created in, defaults to kube-node-lease func WithNamespace(namespace string) LeaseOpt { return func(config LeaseConfiguration) LeaseConfiguration { @@ -107,14 +98,13 @@ func WithNamespace(namespace string) LeaseOpt { } // NewLeasePool creates a new LeasePool struct to interact with a lease -func NewLeasePool(ctx context.Context, client kubernetes.Interface, name, identity string, opts ...LeaseOpt) (*LeasePool, error) { +func NewLeasePool(client kubernetes.Interface, name, identity string, opts ...LeaseOpt) (*LeasePool, error) { leaseConfig := LeaseConfiguration{ log: logrus.StandardLogger(), duration: 60 * time.Second, renewDeadline: 15 * time.Second, retryPeriod: 5 * time.Second, - ctx: ctx, namespace: "kube-node-lease", name: name, identity: identity, @@ -148,9 +138,9 @@ func WithOutputChannels(channels *LeaseEvents) WatchOpt { } // Watch is the primary function of LeasePool, and starts the leader election process -func (p *LeasePool) Watch(opts ...WatchOpt) (*LeaseEvents, context.CancelFunc, error) { +func (p *LeasePool) Watch(ctx context.Context, opts ...WatchOpt) (*LeaseEvents, error) { if p.events != nil { - return p.events, nil, nil + return p.events, nil } watchOptions := watchOptions{ @@ -195,19 +185,17 @@ func (p *LeasePool) Watch(opts ...WatchOpt) (*LeaseEvents, context.CancelFunc, e } le, err := leaderelection.NewLeaderElector(lec) if err != nil { - return nil, nil, err + return nil, err } if lec.WatchDog != nil { lec.WatchDog.SetLeaderElection(le) } - ctx, cancel := context.WithCancel(p.config.ctx) - go func() { for ctx.Err() == nil { le.Run(ctx) } }() - return p.events, cancel, nil + return p.events, nil } diff --git a/pkg/leaderelection/lease_pool_test.go b/pkg/leaderelection/lease_pool_test.go index 4cdf3147af87..5314a93db035 100644 --- a/pkg/leaderelection/lease_pool_test.go +++ b/pkg/leaderelection/lease_pool_test.go @@ -39,7 +39,7 @@ func TestLeasePoolWatcherTriggersOnLeaseAcquisition(t *testing.T) { fakeClient := fake.NewSimpleClientset() - pool, err := NewLeasePool(context.TODO(), fakeClient, "test", identity, WithNamespace("test")) + pool, err := NewLeasePool(fakeClient, "test", identity, WithNamespace("test")) require.NoError(t, err) output := &LeaseEvents{ @@ -47,9 +47,10 @@ func TestLeasePoolWatcherTriggersOnLeaseAcquisition(t *testing.T) { LostLease: make(chan struct{}, 1), } - events, cancel, err := pool.Watch(WithOutputChannels(output)) + ctx, cancel := context.WithCancel(context.TODO()) + t.Cleanup(cancel) + events, err := pool.Watch(ctx, WithOutputChannels(output)) require.NoError(t, err) - defer cancel() done := make(chan struct{}) failed := make(chan struct{}) @@ -78,7 +79,7 @@ func TestLeasePoolTriggersLostLeaseWhenCancelled(t *testing.T) { fakeClient := fake.NewSimpleClientset() - pool, err := NewLeasePool(context.TODO(), fakeClient, "test", identity, WithNamespace("test")) + pool, err := NewLeasePool(fakeClient, "test", identity, WithNamespace("test")) require.NoError(t, err) output := &LeaseEvents{ @@ -86,7 +87,9 @@ func TestLeasePoolTriggersLostLeaseWhenCancelled(t *testing.T) { LostLease: make(chan struct{}, 1), } - events, cancel, err := pool.Watch(WithOutputChannels(output)) + ctx, cancel := context.WithCancel(context.TODO()) + t.Cleanup(cancel) + events, err := pool.Watch(ctx, WithOutputChannels(output)) require.NoError(t, err) <-events.AcquiredLease @@ -115,7 +118,7 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) { } }() - pool, err := NewLeasePool(context.TODO(), fakeClient, "test", identity, + pool, err := NewLeasePool(fakeClient, "test", identity, WithNamespace("test"), WithRetryPeriod(10*time.Millisecond), ) @@ -127,9 +130,10 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) { } givenLeaderElectorError(nil) - events, cancel, err := pool.Watch(WithOutputChannels(output)) + ctx, cancel := context.WithCancel(context.TODO()) + t.Cleanup(cancel) + events, err := pool.Watch(ctx, WithOutputChannels(output)) require.NoError(t, err) - defer cancel() <-events.AcquiredLease t.Log("Acquired lease, disrupting leader election and waiting to loose the lease") @@ -150,13 +154,13 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) { func TestSecondWatcherAcquiresReleasedLease(t *testing.T) { fakeClient := fake.NewSimpleClientset() - pool1, err := NewLeasePool(context.TODO(), fakeClient, "test", "pool1", + pool1, err := NewLeasePool(fakeClient, "test", "pool1", WithNamespace("test"), WithRetryPeriod(10*time.Millisecond), ) require.NoError(t, err) - pool2, err := NewLeasePool(context.TODO(), fakeClient, "test", "pool2", + pool2, err := NewLeasePool(fakeClient, "test", "pool2", WithNamespace("test"), WithRetryPeriod(10*time.Millisecond), ) @@ -185,15 +189,18 @@ func TestSecondWatcherAcquiresReleasedLease(t *testing.T) { require.NoError(t, err) t.Log("Pre-created acquired lease for first identity") - events1, cancel1, err := pool1.Watch(WithOutputChannels(&LeaseEvents{ + ctx1, cancel1 := context.WithCancel(context.TODO()) + t.Cleanup(cancel1) + events1, err := pool1.Watch(ctx1, WithOutputChannels(&LeaseEvents{ AcquiredLease: make(chan struct{}, 1), LostLease: make(chan struct{}, 1), })) require.NoError(t, err) - defer cancel1() t.Log("Started first lease pool") - events2, cancel2, err := pool2.Watch(WithOutputChannels(&LeaseEvents{ + ctx2, cancel2 := context.WithCancel(context.TODO()) + t.Cleanup(cancel2) + events2, err := pool2.Watch(ctx2, WithOutputChannels(&LeaseEvents{ AcquiredLease: make(chan struct{}, 1), LostLease: make(chan struct{}, 1), }))