Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't store contexts in lease pool structs #4733

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions pkg/autopilot/controller/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name
leaseEventStatusCh := make(chan LeaseEventStatus, 10)
errorCh := make(chan error, 10)

go func(ctx context.Context) {
go func() {
defer close(leaseEventStatusCh)
defer close(errorCh)

Expand All @@ -76,21 +76,19 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name
return

default:
ctx, cancel := context.WithCancel(ctx)

leasePoolOpts := []leaderelection.LeaseOpt{
leaderelection.WithContext(ctx),
leaderelection.WithNamespace(namespace),
}

leasePool, err := leaderelection.NewLeasePool(ctx, lw.client, name, identity, leasePoolOpts...)
leasePool, err := leaderelection.NewLeasePool(lw.client, name, identity, leasePoolOpts...)
if err != nil {
errorCh <- fmt.Errorf("failed to create lease pool: %w", err)
cancel()
return
}

events, _, err := leasePool.Watch()
ctx, cancel := context.WithCancel(ctx)
events, err := leasePool.Watch(ctx)
if err != nil {
errorCh <- fmt.Errorf("failed to watch lease pool: %w", err)
cancel()
Expand All @@ -103,7 +101,7 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name
cancel()
}
}
}(ctx)
}()

return leaseEventStatusCh, errorCh
}
Expand Down
23 changes: 9 additions & 14 deletions pkg/component/controller/controllersleasecounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ type K0sControllersLeaseCounter struct {
KubeClientFactory kubeutil.ClientFactoryInterface
UpdateControllerCount func(uint)

cancelFunc context.CancelFunc
leaseCancel context.CancelFunc
cancelFunc context.CancelFunc
}

var _ manager.Component = (*K0sControllersLeaseCounter)(nil)
Expand All @@ -53,8 +52,7 @@ func (l *K0sControllersLeaseCounter) Init(_ context.Context) error {
}

// Run runs the leader elector to keep the lease object up-to-date.
func (l *K0sControllersLeaseCounter) Start(ctx context.Context) error {
ctx, l.cancelFunc = context.WithCancel(ctx)
func (l *K0sControllersLeaseCounter) Start(context.Context) error {
log := logrus.WithFields(logrus.Fields{"component": "controllerlease"})
client, err := l.KubeClientFactory.GetClient()
if err != nil {
Expand All @@ -69,18 +67,19 @@ func (l *K0sControllersLeaseCounter) Start(ctx context.Context) error {
}
leaseName := fmt.Sprintf("k0s-ctrl-%s", nodeName)

leasePool, err := leaderelection.NewLeasePool(ctx, client, leaseName, l.InvocationID,
leaderelection.WithLogger(log),
leaderelection.WithContext(ctx))
leasePool, err := leaderelection.NewLeasePool(client, leaseName, l.InvocationID,
leaderelection.WithLogger(log))
if err != nil {
return err
}
events, cancel, err := leasePool.Watch()

ctx, cancel := context.WithCancel(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this change.
Why shouldn't the context.WithCancel be based on the one Startgets as an argument? If that context is Done I think this should be done too.

Copy link
Member Author

@twz123 twz123 Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the big issues that exists with the component model in k0s. See #4357 and its predecessor #1844.

So from my point of view, the context passed to Start should be used to cancel any async/blocking operations that are performed during startup. Any asynchronous operations that happen as a result of the component being started should be handled by Stop.

events, err := leasePool.Watch(ctx)
if err != nil {
cancel()
return err
}

l.leaseCancel = cancel
l.cancelFunc = cancel

go func() {
for {
Expand All @@ -102,10 +101,6 @@ func (l *K0sControllersLeaseCounter) Start(ctx context.Context) error {

// Stop stops the component
func (l *K0sControllersLeaseCounter) Stop() error {
if l.leaseCancel != nil {
l.leaseCancel()
}

if l.cancelFunc != nil {
l.cancelFunc()
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/component/controller/leaderelector/leasepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,20 @@ func (l *LeasePool) Init(_ context.Context) error {
return nil
}

func (l *LeasePool) Start(ctx context.Context) error {
func (l *LeasePool) Start(context.Context) error {
client, err := l.kubeClientFactory.GetClient()
if err != nil {
return fmt.Errorf("can't create kubernetes rest client for lease pool: %w", err)
}
leasePool, err := leaderelection.NewLeasePool(ctx, client, l.name, l.invocationID,
leaderelection.WithLogger(l.log),
leaderelection.WithContext(ctx))
leasePool, err := leaderelection.NewLeasePool(client, l.name, l.invocationID,
leaderelection.WithLogger(l.log))
if err != nil {
return err
}
events, cancel, err := leasePool.Watch()
ctx, cancel := context.WithCancel(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here.

events, err := leasePool.Watch(ctx)
if err != nil {
cancel()
return err
}
l.leaseCancel = cancel
Expand Down
22 changes: 5 additions & 17 deletions pkg/leaderelection/lease_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type LeaseConfiguration struct {
renewDeadline time.Duration
retryPeriod time.Duration
log logrus.FieldLogger
ctx context.Context
}

// A LeaseOpt is a function that modifies a LeaseConfiguration
Expand Down Expand Up @@ -90,14 +89,6 @@ func WithLogger(logger logrus.FieldLogger) LeaseOpt {
}
}

// WithContext allows the consumer to pass its own context, for example a cancelable context
func WithContext(ctx context.Context) LeaseOpt {
return func(config LeaseConfiguration) LeaseConfiguration {
config.ctx = ctx
return config
}
}

// WithNamespace specifies which namespace the lease should be created in, defaults to kube-node-lease
func WithNamespace(namespace string) LeaseOpt {
return func(config LeaseConfiguration) LeaseConfiguration {
Expand All @@ -107,14 +98,13 @@ func WithNamespace(namespace string) LeaseOpt {
}

// NewLeasePool creates a new LeasePool struct to interact with a lease
func NewLeasePool(ctx context.Context, client kubernetes.Interface, name, identity string, opts ...LeaseOpt) (*LeasePool, error) {
func NewLeasePool(client kubernetes.Interface, name, identity string, opts ...LeaseOpt) (*LeasePool, error) {

leaseConfig := LeaseConfiguration{
log: logrus.StandardLogger(),
duration: 60 * time.Second,
renewDeadline: 15 * time.Second,
retryPeriod: 5 * time.Second,
ctx: ctx,
namespace: "kube-node-lease",
name: name,
identity: identity,
Expand Down Expand Up @@ -148,9 +138,9 @@ func WithOutputChannels(channels *LeaseEvents) WatchOpt {
}

// Watch is the primary function of LeasePool, and starts the leader election process
func (p *LeasePool) Watch(opts ...WatchOpt) (*LeaseEvents, context.CancelFunc, error) {
func (p *LeasePool) Watch(ctx context.Context, opts ...WatchOpt) (*LeaseEvents, error) {
if p.events != nil {
return p.events, nil, nil
return p.events, nil
}

watchOptions := watchOptions{
Expand Down Expand Up @@ -195,19 +185,17 @@ func (p *LeasePool) Watch(opts ...WatchOpt) (*LeaseEvents, context.CancelFunc, e
}
le, err := leaderelection.NewLeaderElector(lec)
if err != nil {
return nil, nil, err
return nil, err
}
if lec.WatchDog != nil {
lec.WatchDog.SetLeaderElection(le)
}

ctx, cancel := context.WithCancel(p.config.ctx)

go func() {
for ctx.Err() == nil {
le.Run(ctx)
}
}()

return p.events, cancel, nil
return p.events, nil
}
33 changes: 20 additions & 13 deletions pkg/leaderelection/lease_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,18 @@ func TestLeasePoolWatcherTriggersOnLeaseAcquisition(t *testing.T) {

fakeClient := fake.NewSimpleClientset()

pool, err := NewLeasePool(context.TODO(), fakeClient, "test", identity, WithNamespace("test"))
pool, err := NewLeasePool(fakeClient, "test", identity, WithNamespace("test"))
require.NoError(t, err)

output := &LeaseEvents{
AcquiredLease: make(chan struct{}, 1),
LostLease: make(chan struct{}, 1),
}

events, cancel, err := pool.Watch(WithOutputChannels(output))
ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)
events, err := pool.Watch(ctx, WithOutputChannels(output))
require.NoError(t, err)
defer cancel()

done := make(chan struct{})
failed := make(chan struct{})
Expand Down Expand Up @@ -78,15 +79,17 @@ func TestLeasePoolTriggersLostLeaseWhenCancelled(t *testing.T) {

fakeClient := fake.NewSimpleClientset()

pool, err := NewLeasePool(context.TODO(), fakeClient, "test", identity, WithNamespace("test"))
pool, err := NewLeasePool(fakeClient, "test", identity, WithNamespace("test"))
require.NoError(t, err)

output := &LeaseEvents{
AcquiredLease: make(chan struct{}, 1),
LostLease: make(chan struct{}, 1),
}

events, cancel, err := pool.Watch(WithOutputChannels(output))
ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)
events, err := pool.Watch(ctx, WithOutputChannels(output))
require.NoError(t, err)

<-events.AcquiredLease
Expand Down Expand Up @@ -115,7 +118,7 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) {
}
}()

pool, err := NewLeasePool(context.TODO(), fakeClient, "test", identity,
pool, err := NewLeasePool(fakeClient, "test", identity,
WithNamespace("test"),
WithRetryPeriod(10*time.Millisecond),
)
Expand All @@ -127,9 +130,10 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) {
}

givenLeaderElectorError(nil)
events, cancel, err := pool.Watch(WithOutputChannels(output))
ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)
events, err := pool.Watch(ctx, WithOutputChannels(output))
require.NoError(t, err)
defer cancel()

<-events.AcquiredLease
t.Log("Acquired lease, disrupting leader election and waiting to loose the lease")
Expand All @@ -150,13 +154,13 @@ func TestLeasePoolWatcherReacquiresLostLease(t *testing.T) {
func TestSecondWatcherAcquiresReleasedLease(t *testing.T) {
fakeClient := fake.NewSimpleClientset()

pool1, err := NewLeasePool(context.TODO(), fakeClient, "test", "pool1",
pool1, err := NewLeasePool(fakeClient, "test", "pool1",
WithNamespace("test"),
WithRetryPeriod(10*time.Millisecond),
)
require.NoError(t, err)

pool2, err := NewLeasePool(context.TODO(), fakeClient, "test", "pool2",
pool2, err := NewLeasePool(fakeClient, "test", "pool2",
WithNamespace("test"),
WithRetryPeriod(10*time.Millisecond),
)
Expand Down Expand Up @@ -185,15 +189,18 @@ func TestSecondWatcherAcquiresReleasedLease(t *testing.T) {
require.NoError(t, err)
t.Log("Pre-created acquired lease for first identity")

events1, cancel1, err := pool1.Watch(WithOutputChannels(&LeaseEvents{
ctx1, cancel1 := context.WithCancel(context.TODO())
t.Cleanup(cancel1)
events1, err := pool1.Watch(ctx1, WithOutputChannels(&LeaseEvents{
AcquiredLease: make(chan struct{}, 1),
LostLease: make(chan struct{}, 1),
}))
require.NoError(t, err)
defer cancel1()
t.Log("Started first lease pool")

events2, cancel2, err := pool2.Watch(WithOutputChannels(&LeaseEvents{
ctx2, cancel2 := context.WithCancel(context.TODO())
t.Cleanup(cancel2)
events2, err := pool2.Watch(ctx2, WithOutputChannels(&LeaseEvents{
AcquiredLease: make(chan struct{}, 1),
LostLease: make(chan struct{}, 1),
}))
Expand Down
Loading