Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Wieczorek <twieczorek@mirantis.com>
  • Loading branch information
twz123 committed Jul 9, 2024
1 parent dd41fa0 commit 96405b1
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 116 deletions.
26 changes: 13 additions & 13 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,19 @@ func (c *command) start(ctx context.Context) error {
logrus.Infof("using storage backend %s", nodeConfig.Spec.Storage.Type)
nodeComponents.Add(ctx, storageBackend)

enableKonnectivity := !c.SingleNode && !slices.Contains(c.DisableComponents, constant.KonnectivityServerComponentName)
disableEndpointReconciler := !slices.Contains(c.DisableComponents, constant.APIEndpointReconcilerComponentName) &&
nodeConfig.Spec.API.ExternalAddress != ""

nodeComponents.Add(ctx, &controller.APIServer{
ClusterConfig: nodeConfig,
K0sVars: c.K0sVars,
LogLevel: c.LogLevels.KubeAPIServer,
Storage: storageBackend,
EnableKonnectivity: enableKonnectivity,
DisableEndpointReconciler: disableEndpointReconciler,
})

controllerLeaseCounter := &controller.K0sControllersLeaseCounter{
InvocationID: c.K0sVars.InvocationID,
ClusterConfig: nodeConfig,
Expand All @@ -240,10 +253,6 @@ func (c *command) start(ctx context.Context) error {
})
}

enableKonnectivity := !c.SingleNode && !slices.Contains(c.DisableComponents, constant.KonnectivityServerComponentName)
disableEndpointReconciler := !slices.Contains(c.DisableComponents, constant.APIEndpointReconcilerComponentName) &&
nodeConfig.Spec.API.ExternalAddress != ""

if enableKonnectivity {
nodeComponents.Add(ctx, &controller.Konnectivity{
SingleNode: c.SingleNode,
Expand All @@ -256,15 +265,6 @@ func (c *command) start(ctx context.Context) error {
})
}

nodeComponents.Add(ctx, &controller.APIServer{
ClusterConfig: nodeConfig,
K0sVars: c.K0sVars,
LogLevel: c.LogLevels.KubeAPIServer,
Storage: storageBackend,
EnableKonnectivity: enableKonnectivity,
DisableEndpointReconciler: disableEndpointReconciler,
})

var leaderElector interface {
leaderelector.Interface
manager.Component
Expand Down
64 changes: 25 additions & 39 deletions pkg/autopilot/controller/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package controller
import (
"context"
"fmt"
"sync"

"github.com/k0sproject/k0s/pkg/autopilot/client"
"github.com/k0sproject/k0s/pkg/leaderelection"
Expand Down Expand Up @@ -95,9 +94,7 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name
return
}

watchWg := leadershipWatcher(ctx, leaseEventStatusCh, events)
watchWg.Wait()

leadershipWatcher(leaseEventStatusCh, events)
cancel()
}
}
Expand All @@ -115,48 +112,37 @@ func (lw *leaseWatcher) StartWatcher(ctx context.Context, namespace string, name
//
// To circumvent this problem, we take note when we have become a leader, and if we lose
// leadership at any point afterwards, this watcher goroutine will exit.
func leadershipWatcher(ctx context.Context, leaseEventStatusCh chan<- LeaseEventStatus, events *leaderelection.LeaseEvents) *sync.WaitGroup {
wg := &sync.WaitGroup{}
wg.Add(1)
func leadershipWatcher(leaseEventStatusCh chan<- LeaseEventStatus, events *leaderelection.LeaseEvents) {
previouslyHeldLeadership := false
var lastLeaseEventStatus LeaseEventStatus

for {
select {
case _, ok := <-events.AcquiredLease:
if !ok {
return
}

go func(ctx context.Context) {
defer wg.Done()
previouslyHeldLeadership := false
var lastLeaseEventStatus LeaseEventStatus
if lastLeaseEventStatus != LeaseAcquired {
lastLeaseEventStatus = LeaseAcquired
leaseEventStatusCh <- lastLeaseEventStatus

for {
select {
case _, ok := <-events.AcquiredLease:
if !ok {
return
}
previouslyHeldLeadership = true
}

if lastLeaseEventStatus != LeaseAcquired {
lastLeaseEventStatus = LeaseAcquired
leaseEventStatusCh <- lastLeaseEventStatus
case _, ok := <-events.LostLease:
if !ok {
return
}

previouslyHeldLeadership = true
}
if lastLeaseEventStatus != LeasePending {
lastLeaseEventStatus = LeasePending
leaseEventStatusCh <- lastLeaseEventStatus

case _, ok := <-events.LostLease:
if !ok {
if previouslyHeldLeadership {
return
}

if lastLeaseEventStatus != LeasePending {
lastLeaseEventStatus = LeasePending
leaseEventStatusCh <- lastLeaseEventStatus

if previouslyHeldLeadership {
return
}
}

case <-ctx.Done():
return
}
}
}(ctx)

return wg
}
}
63 changes: 27 additions & 36 deletions pkg/autopilot/controller/leases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,23 @@ func TestLeasesInitialPending(t *testing.T) {
}
}

func closeLeaseEvents(events *leaderelection.LeaseEvents) {
close(events.AcquiredLease)
close(events.LostLease)
}

// TestLeadershipWatcher runs through a table of tests that describe
// various lease acquired/lost scenarios
func TestLeadershipWatcher(t *testing.T) {
var tests = []struct {
name string
expectedEvents []LeaseEventStatus
eventSource func(events *leaderelection.LeaseEvents)
eventSource func(acquiredLease, lostLease chan<- struct{})
}{
{
"AcquiredThenLost",
[]LeaseEventStatus{
LeaseAcquired,
LeasePending,
},
func(events *leaderelection.LeaseEvents) {
sendEventAfter100ms(events.AcquiredLease)
sendEventAfter100ms(events.LostLease)
closeLeaseEvents(events)
func(acquiredLease, lostLease chan<- struct{}) {
sendEventAfter100ms(acquiredLease)
sendEventAfter100ms(lostLease)
},
},
{
Expand All @@ -89,10 +83,9 @@ func TestLeadershipWatcher(t *testing.T) {
LeasePending,
LeaseAcquired,
},
func(events *leaderelection.LeaseEvents) {
sendEventAfter100ms(events.LostLease)
sendEventAfter100ms(events.AcquiredLease)
closeLeaseEvents(events)
func(acquiredLease, lostLease chan<- struct{}) {
sendEventAfter100ms(lostLease)
sendEventAfter100ms(acquiredLease)
},
},
{
Expand All @@ -101,32 +94,29 @@ func TestLeadershipWatcher(t *testing.T) {
LeaseAcquired,
LeasePending,
},
func(events *leaderelection.LeaseEvents) {
sendEventAfter100ms(events.AcquiredLease)
sendEventAfter100ms(events.LostLease)
sendEventAfter100ms(events.AcquiredLease)
closeLeaseEvents(events)
func(acquiredLease, lostLease chan<- struct{}) {
sendEventAfter100ms(acquiredLease)
sendEventAfter100ms(lostLease)
sendEventAfter100ms(acquiredLease)
},
},
{
"DoubleLostMakesNoSense",
[]LeaseEventStatus{
LeasePending,
},
func(events *leaderelection.LeaseEvents) {
sendEventAfter100ms(events.LostLease)
closeLeaseEvents(events)
func(acquiredLease, lostLease chan<- struct{}) {
sendEventAfter100ms(lostLease)
},
},
{
"DoubleAcquireMakesNoSense",
[]LeaseEventStatus{
LeaseAcquired,
},
func(events *leaderelection.LeaseEvents) {
sendEventAfter100ms(events.AcquiredLease)
sendEventAfter100ms(events.AcquiredLease)
closeLeaseEvents(events)
func(acquiredLease, lostLease chan<- struct{}) {
sendEventAfter100ms(acquiredLease)
sendEventAfter100ms(acquiredLease)
},
},
}
Expand All @@ -135,19 +125,20 @@ func TestLeadershipWatcher(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
leaseEventStatusCh := make(chan LeaseEventStatus, 100)

events := &leaderelection.LeaseEvents{
AcquiredLease: make(chan struct{}),
LostLease: make(chan struct{}),
}
acquiredLease, lostLease := make(chan struct{}), make(chan struct{})

go test.eventSource(events)
go func() {
defer close(acquiredLease)
defer close(lostLease)
test.eventSource(acquiredLease, lostLease)
}()

ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(1*time.Second))
wg := leadershipWatcher(ctx, leaseEventStatusCh, events)
wg.Wait()
leadershipWatcher(leaseEventStatusCh, &leaderelection.LeaseEvents{
AcquiredLease: acquiredLease,
LostLease: lostLease,
})

close(leaseEventStatusCh)
cancel()

assert.Equal(t, test.expectedEvents, realizeLeaseEventStatus(leaseEventStatusCh))
})
Expand All @@ -162,7 +153,7 @@ func realizeLeaseEventStatus(ch chan LeaseEventStatus) []LeaseEventStatus {
return s
}

func sendEventAfter100ms(out chan struct{}) {
func sendEventAfter100ms(out chan<- struct{}) {
time.Sleep(100 * time.Millisecond)
out <- struct{}{}
}
9 changes: 2 additions & 7 deletions pkg/autopilot/controller/root_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,10 @@ func (c *rootController) Run(ctx context.Context) error {
case err := <-errorCh:
return err

case <-ctx.Done():
c.log.Info("Shutting down")
c.stopSubHandler(subControllerCancel, subControllerErrGroup, LeaseAcquired)

return nil

case leaseEventStatus, ok := <-leaseEventStatusCh:
if !ok {
c.log.Warn("lease event status channel closed")
c.log.Warn("Lease event status channel closed, shutting down")
c.stopSubHandler(subControllerCancel, subControllerErrGroup, LeaseAcquired)
return nil
}

Expand Down
37 changes: 28 additions & 9 deletions pkg/component/controller/controllersleasecounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"sync"
"time"

"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
Expand All @@ -37,7 +38,7 @@ type K0sControllersLeaseCounter struct {
ClusterConfig *v1beta1.ClusterConfig
KubeClientFactory kubeutil.ClientFactoryInterface

cancelFunc context.CancelFunc
stopFunc func()

subscribers []chan int
}
Expand Down Expand Up @@ -79,30 +80,48 @@ func (l *K0sControllersLeaseCounter) Start(context.Context) error {
cancel()
return err
}
l.cancelFunc = cancel

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-events.AcquiredLease:
case _, ok := <-events.AcquiredLease:
if !ok {
return
}
log.Info("acquired leader lease")
case <-events.LostLease:
case _, ok := <-events.LostLease:
if !ok {
return
}
log.Error("lost leader lease, this should not really happen!?!?!?")
case <-ctx.Done():
return
}
}
}()

go l.runLeaseCounter(ctx)
wg.Add(1)
go func() {
defer wg.Done()
l.runLeaseCounter(ctx)
}()

l.stopFunc = func() {
log.Info("Stopping K0sControllersLeaseCounter")
cancel()
wg.Wait()
log.Info("Stopped K0sControllersLeaseCounter")
}

return nil
}

// Stop stops the component
func (l *K0sControllersLeaseCounter) Stop() error {
if l.cancelFunc != nil {
l.cancelFunc()
if l.stopFunc != nil {
l.stopFunc()
}
return nil
}
Expand Down
Loading

0 comments on commit 96405b1

Please sign in to comment.