Skip to content

Commit

Permalink
Don't store contexts in lease pool structs
Browse files Browse the repository at this point in the history
The context shouldn't be stored in structs, but passed as the first
argument to functions and methods. Remove the context storage from the
lease pool and pass it to the methods where it's actually needed.

Signed-off-by: Tom Wieczorek <twieczorek@mirantis.com>
  • Loading branch information
twz123 committed Jul 10, 2024
1 parent bbe0301 commit 6ca6814
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 56 deletions.
12 changes: 5 additions & 7 deletions pkg/autopilot/controller/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name
leaseEventStatusCh := make(chan LeaseEventStatus, 10)
errorCh := make(chan error, 10)

go func(ctx context.Context) {
go func() {
defer close(leaseEventStatusCh)
defer close(errorCh)

Expand All @@ -76,21 +76,19 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name
return

default:
ctx, cancel := context.WithCancel(ctx)

leasePoolOpts := []leaderelection.LeaseOpt{
leaderelection.WithContext(ctx),
leaderelection.WithNamespace(namespace),
}

leasePool, err := leaderelection.NewLeasePool(ctx, lw.client, name, identity, leasePoolOpts...)
leasePool, err := leaderelection.NewLeasePool(lw.client, name, identity, leasePoolOpts...)
if err != nil {
errorCh <- fmt.Errorf("failed to create lease pool: %w", err)
cancel()
return
}

events, _, err := leasePool.Watch()
ctx, cancel := context.WithCancel(ctx)
events, err := leasePool.Watch(ctx)
if err != nil {
errorCh <- fmt.Errorf("failed to watch lease pool: %w", err)
cancel()
Expand All @@ -103,7 +101,7 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name
cancel()
}
}
}(ctx)
}()

return leaseEventStatusCh, errorCh
}
Expand Down
23 changes: 9 additions & 14 deletions pkg/component/controller/controllersleasecounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ type K0sControllersLeaseCounter struct {
KubeClientFactory kubeutil.ClientFactoryInterface
UpdateControllerCount func(uint)

cancelFunc context.CancelFunc
leaseCancel context.CancelFunc
cancelFunc context.CancelFunc
}

var _ manager.Component = (*K0sControllersLeaseCounter)(nil)
Expand All @@ -53,8 +52,7 @@ func (l *K0sControllersLeaseCounter) Init(_ context.Context) error {
}

// Run runs the leader elector to keep the lease object up-to-date.
func (l *K0sControllersLeaseCounter) Start(ctx context.Context) error {
ctx, l.cancelFunc = context.WithCancel(ctx)
func (l *K0sControllersLeaseCounter) Start(context.Context) error {
log := logrus.WithFields(logrus.Fields{"component": "controllerlease"})
client, err := l.KubeClientFactory.GetClient()
if err != nil {
Expand All @@ -69,18 +67,19 @@ func (l *K0sControllersLeaseCounter) Start(ctx context.Context) error {
}
leaseName := fmt.Sprintf("k0s-ctrl-%s", nodeName)

leasePool, err := leaderelection.NewLeasePool(ctx, client, leaseName, l.InvocationID,
leaderelection.WithLogger(log),
leaderelection.WithContext(ctx))
leasePool, err := leaderelection.NewLeasePool(client, leaseName, l.InvocationID,
leaderelection.WithLogger(log))
if err != nil {
return err
}
events, cancel, err := leasePool.Watch()

ctx, cancel := context.WithCancel(context.Background())
events, err := leasePool.Watch(ctx)
if err != nil {
cancel()
return err
}

l.leaseCancel = cancel
l.cancelFunc = cancel

go func() {
for {
Expand All @@ -102,10 +101,6 @@ func (l *K0sControllersLeaseCounter) Start(ctx context.Context) error {

// Stop stops the component
func (l *K0sControllersLeaseCounter) Stop() error {
if l.leaseCancel != nil {
l.leaseCancel()
}

if l.cancelFunc != nil {
l.cancelFunc()
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/component/controller/leaderelector/leasepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,20 @@ func (l *LeasePool) Init(_ context.Context) error {
return nil
}

func (l *LeasePool) Start(ctx context.Context) error {
func (l *LeasePool) Start(context.Context) error {
client, err := l.kubeClientFactory.GetClient()
if err != nil {
return fmt.Errorf("can't create kubernetes rest client for lease pool: %w", err)
}
leasePool, err := leaderelection.NewLeasePool(ctx, client, l.name, l.invocationID,
leaderelection.WithLogger(l.log),
leaderelection.WithContext(ctx))
leasePool, err := leaderelection.NewLeasePool(client, l.name, l.invocationID,
leaderelection.WithLogger(l.log))
if err != nil {
return err
}
events, cancel, err := leasePool.Watch()
ctx, cancel := context.WithCancel(context.Background())
events, err := leasePool.Watch(ctx)
if err != nil {
cancel()
return err
}
l.leaseCancel = cancel
Expand Down
22 changes: 5 additions & 17 deletions pkg/leaderelection/lease_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type LeaseConfiguration struct {
renewDeadline time.Duration
retryPeriod time.Duration
log logrus.FieldLogger
ctx context.Context
}

// A LeaseOpt is a function that modifies a LeaseConfiguration
Expand Down Expand Up @@ -90,14 +89,6 @@ func WithLogger(logger logrus.FieldLogger) LeaseOpt {
}
}

// WithContext allows the consumer to pass its own context, for example a cancelable context
func WithContext(ctx context.Context) LeaseOpt {
return func(config LeaseConfiguration) LeaseConfiguration {
config.ctx = ctx
return config
}
}

// WithNamespace specifies which namespace the lease should be created in, defaults to kube-node-lease
func WithNamespace(namespace string) LeaseOpt {
return func(config LeaseConfiguration) LeaseConfiguration {
Expand All @@ -107,14 +98,13 @@ func WithNamespace(namespace string) LeaseOpt {
}

// NewLeasePool creates a new LeasePool struct to interact with a lease
func NewLeasePool(ctx context.Context, client kubernetes.Interface, name, identity string, opts ...LeaseOpt) (*LeasePool, error) {
func NewLeasePool(client kubernetes.Interface, name, identity string, opts ...LeaseOpt) (*LeasePool, error) {

leaseConfig := LeaseConfiguration{
log: logrus.StandardLogger(),
duration: 60 * time.Second,
renewDeadline: 15 * time.Second,
retryPeriod: 5 * time.Second,
ctx: ctx,
namespace: "kube-node-lease",
name: name,
identity: identity,
Expand Down Expand Up @@ -148,9 +138,9 @@ func WithOutputChannels(channels *LeaseEvents) WatchOpt {
}

// Watch is the primary function of LeasePool, and starts the leader election process
func (p *LeasePool) Watch(opts ...WatchOpt) (*LeaseEvents, context.CancelFunc, error) {
func (p *LeasePool) Watch(ctx context.Context, opts ...WatchOpt) (*LeaseEvents, error) {
if p.events != nil {
return p.events, nil, nil
return p.events, nil
}

watchOptions := watchOptions{
Expand Down Expand Up @@ -195,19 +185,17 @@ func (p *LeasePool) Watch(opts ...WatchOpt) (*LeaseEvents, context.CancelFunc, e
}
le, err := leaderelection.NewLeaderElector(lec)
if err != nil {
return nil, nil, err
return nil, err
}
if lec.WatchDog != nil {
lec.WatchDog.SetLeaderElection(le)
}

ctx, cancel := context.WithCancel(p.config.ctx)

go func() {
for ctx.Err() == nil {
le.Run(ctx)
}
}()

return p.events, cancel, nil
return p.events, nil
}
33 changes: 20 additions & 13 deletions pkg/leaderelection/lease_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,18 @@ func TestLeasePoolWatcherTriggersOnLeaseAcquisition(t *testing.T) {

fakeClient := fake.NewSimpleClientset()

pool, err := NewLeasePool(context.TODO(), fakeClient, "test", identity, WithNamespace("test"))
pool, err := NewLeasePool(fakeClient, "test", identity, WithNamespace("test"))
require.NoError(t, err)

output := &LeaseEvents{
AcquiredLease: make(chan struct{}, 1),
LostLease: make(chan struct{}, 1),
}

events, cancel, err := pool.Watch(WithOutputChannels(output))
ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)
events, err := pool.Watch(ctx, WithOutputChannels(output))
require.NoError(t, err)
defer cancel()

done := make(chan struct{})
failed := make(chan struct{})
Expand Down Expand Up @@ -78,15 +79,17 @@ func TestLeasePoolTriggersLostLeaseWhenCancelled(t *testing.T) {

fakeClient := fake.NewSimpleClientset()

pool, err := NewLeasePool(context.TODO(), fakeClient, "test", identity, WithNamespace("test"))
pool, err := NewLeasePool(fakeClient, "test", identity, WithNamespace("test"))
require.NoError(t, err)

output := &LeaseEvents{
AcquiredLease: make(chan struct{}, 1),
LostLease: make(chan struct{}, 1),
}

events, cancel, err := pool.Watch(WithOutputChannels(output))
ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)
events, err := pool.Watch(ctx, WithOutputChannels(output))
require.NoError(t, err)

<-events.AcquiredLease
Expand Down Expand Up @@ -115,7 +118,7 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) {
}
}()

pool, err := NewLeasePool(context.TODO(), fakeClient, "test", identity,
pool, err := NewLeasePool(fakeClient, "test", identity,
WithNamespace("test"),
WithRetryPeriod(10*time.Millisecond),
)
Expand All @@ -127,9 +130,10 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) {
}

givenLeaderElectorError(nil)
events, cancel, err := pool.Watch(WithOutputChannels(output))
ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)
events, err := pool.Watch(ctx, WithOutputChannels(output))
require.NoError(t, err)
defer cancel()

<-events.AcquiredLease
t.Log("Acquired lease, disrupting leader election and waiting to loose the lease")
Expand All @@ -150,13 +154,13 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) {
func TestSecondWatcherAcquiresReleasedLease(t *testing.T) {
fakeClient := fake.NewSimpleClientset()

pool1, err := NewLeasePool(context.TODO(), fakeClient, "test", "pool1",
pool1, err := NewLeasePool(fakeClient, "test", "pool1",
WithNamespace("test"),
WithRetryPeriod(10*time.Millisecond),
)
require.NoError(t, err)

pool2, err := NewLeasePool(context.TODO(), fakeClient, "test", "pool2",
pool2, err := NewLeasePool(fakeClient, "test", "pool2",
WithNamespace("test"),
WithRetryPeriod(10*time.Millisecond),
)
Expand Down Expand Up @@ -185,15 +189,18 @@ func TestSecondWatcherAcquiresReleasedLease(t *testing.T) {
require.NoError(t, err)
t.Log("Pre-created acquired lease for first identity")

events1, cancel1, err := pool1.Watch(WithOutputChannels(&LeaseEvents{
ctx1, cancel1 := context.WithCancel(context.TODO())
t.Cleanup(cancel1)
events1, err := pool1.Watch(ctx1, WithOutputChannels(&LeaseEvents{
AcquiredLease: make(chan struct{}, 1),
LostLease: make(chan struct{}, 1),
}))
require.NoError(t, err)
defer cancel1()
t.Log("Started first lease pool")

events2, cancel2, err := pool2.Watch(WithOutputChannels(&LeaseEvents{
ctx2, cancel2 := context.WithCancel(context.TODO())
t.Cleanup(cancel2)
events2, err := pool2.Watch(ctx2, WithOutputChannels(&LeaseEvents{
AcquiredLease: make(chan struct{}, 1),
LostLease: make(chan struct{}, 1),
}))
Expand Down

0 comments on commit 6ca6814

Please sign in to comment.