Skip to content

Commit

Permalink
Add a mutex to the telemetry component
Browse files Browse the repository at this point in the history
The Start/Stop methods and the Reconcile method may be called
concurrently by different goroutines. Ensure that there's no races when
starting and stopping the telemetry loop. Also replace the bespoke for
loop with wait.UntilWithContext, which is tailor-made for that purpose.

Signed-off-by: Tom Wieczorek <twieczorek@mirantis.com>
  • Loading branch information
twz123 committed Jul 12, 2024
1 parent bb1fe55 commit 2def786
Showing 1 changed file with 50 additions and 26 deletions.
76 changes: 50 additions & 26 deletions pkg/telemetry/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package telemetry

import (
"context"
"sync"
"time"

"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/config"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/segmentio/analytics-go"
Expand All @@ -37,8 +39,10 @@ type Component struct {
StorageType string
KubeClientFactory kubeutil.ClientFactoryInterface

log *logrus.Entry
stopCh chan struct{}
log *logrus.Entry

mu sync.Mutex
stop func()
}

var _ manager.Component = (*Component)(nil)
Expand All @@ -52,55 +56,75 @@ func (c *Component) Init(context.Context) error {
return nil
}

// Run runs work cycle
func (c *Component) Start(_ context.Context) error {
func (c *Component) Start(context.Context) error {
return nil
}

// Run does nothing
func (c *Component) Stop() error {
if c.stopCh != nil {
close(c.stopCh)
c.mu.Lock()
defer c.mu.Unlock()

if c.stop != nil {
c.stop()
c.stop = nil
}

return nil
}

// Reconcile detects changes in configuration and applies them to the component
func (c *Component) Reconcile(ctx context.Context, clusterCfg *v1beta1.ClusterConfig) error {
logrus.Debug("reconcile method called for: Telemetry")
func (c *Component) Reconcile(_ context.Context, clusterCfg *v1beta1.ClusterConfig) error {
c.mu.Lock()
defer c.mu.Unlock()

if !clusterCfg.Spec.Telemetry.IsEnabled() {
return c.Stop()
}
if c.stopCh != nil {
// We must have the worker stuff already running, do nothing
if c.stop == nil {
c.log.Debug("Telemetry remains disabled")
} else {
c.stop()
c.stop = nil
}

return nil
}

if c.stop != nil {
return nil // already running
}

clients, err := c.KubeClientFactory.GetClient()
if err != nil {
return err
}
go c.run(ctx, clients)

c.stop = c.start(clients)

return nil
}

func (c *Component) start(clients kubernetes.Interface) (stop func()) {
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})

go func() {
defer close(done)
c.log.Info("Starting to collect telemetry")
c.run(ctx, clients)
c.log.Info("Stopped to collect telemetry")
}()

return func() { cancel(); <-done }
}

func (c *Component) run(ctx context.Context, clients kubernetes.Interface) {
c.stopCh = make(chan struct{})
ticker := time.NewTicker(interval)
analyticsClient := analytics.New(segmentToken)

defer func() {
ticker.Stop()
if err := analyticsClient.Close(); err != nil {
c.log.WithError(err).Debug("Failed to close analytics client")
}
}()

for {
select {
case <-ticker.C:
c.sendTelemetry(ctx, analyticsClient, clients)
case <-c.stopCh:
return
}
}
wait.UntilWithContext(ctx, func(ctx context.Context) {
c.sendTelemetry(ctx, analyticsClient, clients)
}, interval)
}

0 comments on commit 2def786

Please sign in to comment.