From aec447229135c141e2dd6007c96057db00032dcb Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 10 Jul 2024 08:10:50 +0200 Subject: [PATCH 1/8] Let all telemetry component methods have pointer receivers There are quite a few assignments in these methods that are lost if the receiver is not a pointer. Signed-off-by: Tom Wieczorek --- pkg/telemetry/component.go | 2 +- pkg/telemetry/telemetry.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/telemetry/component.go b/pkg/telemetry/component.go index 8bd96c3600fd..4ed87e804ee8 100644 --- a/pkg/telemetry/component.go +++ b/pkg/telemetry/component.go @@ -116,7 +116,7 @@ func (c *Component) Reconcile(ctx context.Context, clusterCfg *v1beta1.ClusterCo return nil } -func (c Component) run(ctx context.Context) { +func (c *Component) run(ctx context.Context) { c.stopCh = make(chan struct{}) ticker := time.NewTicker(interval) defer ticker.Stop() diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 93ecc3980a58..4910dc02e3d2 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -59,7 +59,7 @@ func (td telemetryData) asProperties() analytics.Properties { } } -func (c Component) collectTelemetry(ctx context.Context) (telemetryData, error) { +func (c *Component) collectTelemetry(ctx context.Context) (telemetryData, error) { var err error data := telemetryData{} @@ -85,7 +85,7 @@ func (c Component) collectTelemetry(ctx context.Context) (telemetryData, error) return data, nil } -func (c Component) getStorageType() string { +func (c *Component) getStorageType() string { switch c.clusterConfig.Spec.Storage.Type { case v1beta1.EtcdStorageType, v1beta1.KineStorageType: return c.clusterConfig.Spec.Storage.Type @@ -93,7 +93,7 @@ func (c Component) getStorageType() string { return "unknown" } -func (c Component) getClusterID(ctx context.Context) (string, error) { +func (c *Component) getClusterID(ctx context.Context) (string, error) { ns, err := c.kubernetesClient.CoreV1().Namespaces().Get(ctx, "kube-system", metav1.GetOptions{}) @@ -104,7 +104,7 @@ func (c Component) getClusterID(ctx context.Context) (string, error) { return fmt.Sprintf("kube-system:%s", ns.UID), nil } -func (c Component) getWorkerData(ctx context.Context) ([]workerData, workerSums, error) { +func (c *Component) getWorkerData(ctx context.Context) ([]workerData, workerSums, error) { nodes, err := c.kubernetesClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return nil, workerSums{}, err @@ -129,7 +129,7 @@ func (c Component) getWorkerData(ctx context.Context) ([]workerData, workerSums, return wds, workerSums{cpuTotal: cpuTotal, memTotal: memTotal}, nil } -func (c Component) sendTelemetry(ctx context.Context) { +func (c *Component) sendTelemetry(ctx context.Context) { data, err := c.collectTelemetry(ctx) if err != nil { c.log.WithError(err).Warning("can't prepare telemetry data") @@ -159,7 +159,7 @@ func (c Component) sendTelemetry(ctx context.Context) { } } -func (c Component) addCustomData(ctx context.Context, analyticCtx *analytics.Context) { +func (c *Component) addCustomData(ctx context.Context, analyticCtx *analytics.Context) { cm, err := c.kubernetesClient.CoreV1().ConfigMaps("kube-system").Get(ctx, "k0s-telemetry", metav1.GetOptions{}) if err != nil { return From 86049fe0b8ed580ac4ddf2320bbfcee9212e8086 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 10 Jul 2024 08:21:59 +0200 Subject: [PATCH 2/8] Pass on Kubernetes clients as arguments Don't store them in the component directly. Signed-off-by: Tom Wieczorek --- pkg/telemetry/component.go | 28 ++++++++-------------------- pkg/telemetry/telemetry.go | 27 ++++++++++++++------------- 2 files changed, 22 insertions(+), 33 deletions(-) diff --git a/pkg/telemetry/component.go b/pkg/telemetry/component.go index 4ed87e804ee8..ff2b67326575 100644 --- a/pkg/telemetry/component.go +++ b/pkg/telemetry/component.go @@ -26,7 +26,6 @@ import ( kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" ) @@ -37,8 +36,7 @@ type Component struct { Version string KubeClientFactory kubeutil.ClientFactoryInterface - kubernetesClient kubernetes.Interface - analyticsClient analyticsClient + analyticsClient analyticsClient log *logrus.Entry stopCh chan struct{} @@ -63,16 +61,6 @@ func (c *Component) Init(_ context.Context) error { return nil } -func (c *Component) retrieveKubeClient(ch chan struct{}) { - client, err := c.KubeClientFactory.GetClient() - if err != nil { - c.log.WithError(err).Warning("can't init kube client") - return - } - c.kubernetesClient = client - close(ch) -} - // Run runs work cycle func (c *Component) Start(_ context.Context) error { return nil @@ -108,22 +96,22 @@ func (c *Component) Reconcile(ctx context.Context, clusterCfg *v1beta1.ClusterCo return nil } c.clusterConfig = clusterCfg - initedCh := make(chan struct{}) - wait.Until(func() { - c.retrieveKubeClient(initedCh) - }, time.Second, initedCh) - go c.run(ctx) + clients, err := c.KubeClientFactory.GetClient() + if err != nil { + return err + } + go c.run(ctx, clients) return nil } -func (c *Component) run(ctx context.Context) { +func (c *Component) run(ctx context.Context, clients kubernetes.Interface) { c.stopCh = make(chan struct{}) ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: - c.sendTelemetry(ctx) + c.sendTelemetry(ctx, clients) case <-c.stopCh: return } diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 4910dc02e3d2..80d4865e4701 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -24,6 +24,7 @@ import ( "github.com/segmentio/analytics-go" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" @@ -59,17 +60,17 @@ func (td telemetryData) asProperties() analytics.Properties { } } -func (c *Component) collectTelemetry(ctx context.Context) (telemetryData, error) { +func (c *Component) collectTelemetry(ctx context.Context, clients kubernetes.Interface) (telemetryData, error) { var err error data := telemetryData{} data.StorageType = c.getStorageType() - data.ClusterID, err = c.getClusterID(ctx) + data.ClusterID, err = getClusterID(ctx, clients) if err != nil { return data, fmt.Errorf("can't collect cluster ID: %w", err) } - wds, sums, err := c.getWorkerData(ctx) + wds, sums, err := getWorkerData(ctx, clients) if err != nil { return data, fmt.Errorf("can't collect workers count: %w", err) } @@ -78,7 +79,7 @@ func (c *Component) collectTelemetry(ctx context.Context) (telemetryData, error) data.WorkerData = wds data.MEMTotal = sums.memTotal data.CPUTotal = sums.cpuTotal - data.ControlPlaneNodesCount, err = kubeutil.GetControlPlaneNodeCount(ctx, c.kubernetesClient) + data.ControlPlaneNodesCount, err = kubeutil.GetControlPlaneNodeCount(ctx, clients) if err != nil { return data, fmt.Errorf("can't collect control plane nodes count: %w", err) } @@ -93,8 +94,8 @@ func (c *Component) getStorageType() string { return "unknown" } -func (c *Component) getClusterID(ctx context.Context) (string, error) { - ns, err := c.kubernetesClient.CoreV1().Namespaces().Get(ctx, +func getClusterID(ctx context.Context, clients kubernetes.Interface) (string, error) { + ns, err := clients.CoreV1().Namespaces().Get(ctx, "kube-system", metav1.GetOptions{}) if err != nil { @@ -104,8 +105,8 @@ func (c *Component) getClusterID(ctx context.Context) (string, error) { return fmt.Sprintf("kube-system:%s", ns.UID), nil } -func (c *Component) getWorkerData(ctx context.Context) ([]workerData, workerSums, error) { - nodes, err := c.kubernetesClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) +func getWorkerData(ctx context.Context, clients kubernetes.Interface) ([]workerData, workerSums, error) { + nodes, err := clients.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return nil, workerSums{}, err } @@ -129,8 +130,8 @@ func (c *Component) getWorkerData(ctx context.Context) ([]workerData, workerSums return wds, workerSums{cpuTotal: cpuTotal, memTotal: memTotal}, nil } -func (c *Component) sendTelemetry(ctx context.Context) { - data, err := c.collectTelemetry(ctx) +func (c *Component) sendTelemetry(ctx context.Context, clients kubernetes.Interface) { + data, err := c.collectTelemetry(ctx, clients) if err != nil { c.log.WithError(err).Warning("can't prepare telemetry data") return @@ -146,7 +147,7 @@ func (c *Component) sendTelemetry(ctx context.Context) { hostData.Extra["cpuArch"] = runtime.GOARCH addSysInfo(&hostData) - c.addCustomData(ctx, &hostData) + addCustomData(ctx, &hostData, clients) c.log.WithField("data", data).WithField("hostdata", hostData).Info("sending telemetry") if err := c.analyticsClient.Enqueue(analytics.Track{ @@ -159,8 +160,8 @@ func (c *Component) sendTelemetry(ctx context.Context) { } } -func (c *Component) addCustomData(ctx context.Context, analyticCtx *analytics.Context) { - cm, err := c.kubernetesClient.CoreV1().ConfigMaps("kube-system").Get(ctx, "k0s-telemetry", metav1.GetOptions{}) +func addCustomData(ctx context.Context, analyticCtx *analytics.Context, clients kubernetes.Interface) { + cm, err := clients.CoreV1().ConfigMaps("kube-system").Get(ctx, "k0s-telemetry", metav1.GetOptions{}) if err != nil { return } From 06dd0ca1c2cfdf060af6e209bb81beaf391571ca Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 10 Jul 2024 08:37:09 +0200 Subject: [PATCH 3/8] Remove internal analyticsClient abstraction This was basically a copy of the analytics.Client interface. Signed-off-by: Tom Wieczorek --- pkg/telemetry/component.go | 10 ++++++---- pkg/telemetry/segment.go | 12 +----------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/pkg/telemetry/component.go b/pkg/telemetry/component.go index ff2b67326575..32c0954affe2 100644 --- a/pkg/telemetry/component.go +++ b/pkg/telemetry/component.go @@ -23,10 +23,12 @@ import ( "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" "github.com/k0sproject/k0s/pkg/component/manager" "github.com/k0sproject/k0s/pkg/config" - kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" - "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + + "github.com/segmentio/analytics-go" + "github.com/sirupsen/logrus" ) // Component is a telemetry component for k0s component manager @@ -36,7 +38,7 @@ type Component struct { Version string KubeClientFactory kubeutil.ClientFactoryInterface - analyticsClient analyticsClient + analyticsClient analytics.Client log *logrus.Entry stopCh chan struct{} @@ -56,7 +58,7 @@ func (c *Component) Init(_ context.Context) error { return nil } - c.analyticsClient = newSegmentClient(segmentToken) + c.analyticsClient = analytics.New(segmentToken) c.log.Info("segment client has been init") return nil } diff --git a/pkg/telemetry/segment.go b/pkg/telemetry/segment.go index 70062b1f46d2..865f1f79ad8f 100644 --- a/pkg/telemetry/segment.go +++ b/pkg/telemetry/segment.go @@ -24,19 +24,9 @@ var segmentToken = "" const heartbeatEvent = "cluster-heartbeat" -// Analytics is the interface used for our analytics client. -type analyticsClient interface { - Enqueue(msg analytics.Message) error - Close() error -} - -func NewDefaultSegmentClient() analyticsClient { +func NewDefaultSegmentClient() analytics.Client { if segmentToken == "" { return nil } - return newSegmentClient(segmentToken) -} - -func newSegmentClient(segmentToken string) analyticsClient { return analytics.New(segmentToken) } From 011b4d255a29d0ba7d1da9a6db9fb5c4ea1bcdc7 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 10 Jul 2024 08:41:01 +0200 Subject: [PATCH 4/8] Replace segment token checks with a global one That way, the componend isn't even registered in case telemetry has not been enabled during compilation time. Signed-off-by: Tom Wieczorek --- cmd/controller/controller.go | 14 +++++++++----- pkg/telemetry/component.go | 13 ------------- pkg/telemetry/segment.go | 7 ++++++- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index 60896434f15b..5717bb83cdfb 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -545,11 +545,15 @@ func (c *command) start(ctx context.Context) error { }) } - clusterComponents.Add(ctx, &telemetry.Component{ - Version: build.Version, - K0sVars: c.K0sVars, - KubeClientFactory: adminClientFactory, - }) + if telemetry.IsEnabled() { + clusterComponents.Add(ctx, &telemetry.Component{ + Version: build.Version, + K0sVars: c.K0sVars, + KubeClientFactory: adminClientFactory, + }) + } else { + logrus.Info("Telemetry is disabled") + } clusterComponents.Add(ctx, &controller.Autopilot{ K0sVars: c.K0sVars, diff --git a/pkg/telemetry/component.go b/pkg/telemetry/component.go index 32c0954affe2..66a4eba73906 100644 --- a/pkg/telemetry/component.go +++ b/pkg/telemetry/component.go @@ -53,11 +53,6 @@ var interval = time.Minute * 10 func (c *Component) Init(_ context.Context) error { c.log = logrus.WithField("component", "telemetry") - if segmentToken == "" { - c.log.Info("no token, telemetry is disabled") - return nil - } - c.analyticsClient = analytics.New(segmentToken) c.log.Info("segment client has been init") return nil @@ -70,10 +65,6 @@ func (c *Component) Start(_ context.Context) error { // Run does nothing func (c *Component) Stop() error { - if segmentToken == "" { - c.log.Info("no token, telemetry is disabled") - return nil - } if c.stopCh != nil { close(c.stopCh) } @@ -93,10 +84,6 @@ func (c *Component) Reconcile(ctx context.Context, clusterCfg *v1beta1.ClusterCo // We must have the worker stuff already running, do nothing return nil } - if segmentToken == "" { - c.log.Info("no token, telemetry is disabled") - return nil - } c.clusterConfig = clusterCfg clients, err := c.KubeClientFactory.GetClient() if err != nil { diff --git a/pkg/telemetry/segment.go b/pkg/telemetry/segment.go index 865f1f79ad8f..62894ca8a194 100644 --- a/pkg/telemetry/segment.go +++ b/pkg/telemetry/segment.go @@ -24,9 +24,14 @@ var segmentToken = "" const heartbeatEvent = "cluster-heartbeat" +func IsEnabled() bool { + return segmentToken != "" +} + func NewDefaultSegmentClient() analytics.Client { - if segmentToken == "" { + if !IsEnabled() { return nil } + return analytics.New(segmentToken) } From 291d9489b290d832ee0de77e11d4b3bdf50de970 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 10 Jul 2024 08:48:09 +0200 Subject: [PATCH 5/8] Move the analytics client out of the component struct Handle its lifecycle directly in the telemetry goroutine. Signed-off-by: Tom Wieczorek --- pkg/telemetry/component.go | 22 +++++++++++----------- pkg/telemetry/telemetry.go | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/telemetry/component.go b/pkg/telemetry/component.go index 66a4eba73906..c83f76c43840 100644 --- a/pkg/telemetry/component.go +++ b/pkg/telemetry/component.go @@ -38,8 +38,6 @@ type Component struct { Version string KubeClientFactory kubeutil.ClientFactoryInterface - analyticsClient analytics.Client - log *logrus.Entry stopCh chan struct{} } @@ -50,11 +48,8 @@ var _ manager.Reconciler = (*Component)(nil) var interval = time.Minute * 10 // Init set up for external service clients (segment, k8s api) -func (c *Component) Init(_ context.Context) error { +func (c *Component) Init(context.Context) error { c.log = logrus.WithField("component", "telemetry") - - c.analyticsClient = analytics.New(segmentToken) - c.log.Info("segment client has been init") return nil } @@ -68,9 +63,6 @@ func (c *Component) Stop() error { if c.stopCh != nil { close(c.stopCh) } - if c.analyticsClient != nil { - _ = c.analyticsClient.Close() - } return nil } @@ -96,11 +88,19 @@ func (c *Component) Reconcile(ctx context.Context, clusterCfg *v1beta1.ClusterCo func (c *Component) run(ctx context.Context, clients kubernetes.Interface) { c.stopCh = make(chan struct{}) ticker := time.NewTicker(interval) - defer ticker.Stop() + analyticsClient := analytics.New(segmentToken) + + defer func() { + ticker.Stop() + if err := analyticsClient.Close(); err != nil { + c.log.WithError(err).Debug("Failed to close analytics client") + } + }() + for { select { case <-ticker.C: - c.sendTelemetry(ctx, clients) + c.sendTelemetry(ctx, analyticsClient, clients) case <-c.stopCh: return } diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 80d4865e4701..c3fd3266ea62 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -130,7 +130,7 @@ func getWorkerData(ctx context.Context, clients kubernetes.Interface) ([]workerD return wds, workerSums{cpuTotal: cpuTotal, memTotal: memTotal}, nil } -func (c *Component) sendTelemetry(ctx context.Context, clients kubernetes.Interface) { +func (c *Component) sendTelemetry(ctx context.Context, analyticsClient analytics.Client, clients kubernetes.Interface) { data, err := c.collectTelemetry(ctx, clients) if err != nil { c.log.WithError(err).Warning("can't prepare telemetry data") @@ -150,7 +150,7 @@ func (c *Component) sendTelemetry(ctx context.Context, clients kubernetes.Interf addCustomData(ctx, &hostData, clients) c.log.WithField("data", data).WithField("hostdata", hostData).Info("sending telemetry") - if err := c.analyticsClient.Enqueue(analytics.Track{ + if err := analyticsClient.Enqueue(analytics.Track{ AnonymousId: "(removed)", Event: heartbeatEvent, Properties: data.asProperties(), From 0b9d7255abc692e8dc42e8094b1496e8a3edbfef Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 10 Jul 2024 08:55:16 +0200 Subject: [PATCH 6/8] Use the version constant directly in the telemetry The extra field is not providing any extra value at the moment. Signed-off-by: Tom Wieczorek --- cmd/controller/controller.go | 1 - pkg/telemetry/component.go | 1 - pkg/telemetry/telemetry.go | 10 ++++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index 5717bb83cdfb..6ec8f58c378c 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -547,7 +547,6 @@ func (c *command) start(ctx context.Context) error { if telemetry.IsEnabled() { clusterComponents.Add(ctx, &telemetry.Component{ - Version: build.Version, K0sVars: c.K0sVars, KubeClientFactory: adminClientFactory, }) diff --git a/pkg/telemetry/component.go b/pkg/telemetry/component.go index c83f76c43840..035e8c648990 100644 --- a/pkg/telemetry/component.go +++ b/pkg/telemetry/component.go @@ -35,7 +35,6 @@ import ( type Component struct { clusterConfig *v1beta1.ClusterConfig K0sVars *config.CfgVars - Version string KubeClientFactory kubeutil.ClientFactoryInterface log *logrus.Entry diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index c3fd3266ea62..ab3d5261f0f3 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -21,13 +21,15 @@ import ( "fmt" "runtime" - "github.com/segmentio/analytics-go" + "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" + "github.com/k0sproject/k0s/pkg/build" + kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" - kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" + "github.com/segmentio/analytics-go" ) type telemetryData struct { @@ -141,7 +143,7 @@ func (c *Component) sendTelemetry(ctx context.Context, analyticsClient analytics Extra: map[string]interface{}{"direct": true}, } - hostData.App.Version = c.Version + hostData.App.Version = build.Version hostData.App.Name = "k0s" hostData.App.Namespace = "k0s" hostData.Extra["cpuArch"] = runtime.GOARCH From bb1fe55cdd2002c886e43007a40ecae2935ad5ce Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 10 Jul 2024 09:15:41 +0200 Subject: [PATCH 7/8] Add storage type to telemetry during construction The storage type is part of the node config and cannot be reconciled. Don't take it from the cluster config, but let it be set directly into the componend via a public fied. This allows for removing the cluster config from the struct. Signed-off-by: Tom Wieczorek --- cmd/controller/controller.go | 5 ++++- pkg/telemetry/component.go | 3 +-- pkg/telemetry/telemetry.go | 11 +---------- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index 6ec8f58c378c..acfaa0581906 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -193,9 +193,11 @@ func (c *command) start(ctx context.Context) error { return err } logrus.Infof("DNS address: %s", dnsAddress) + var storageBackend manager.Component + storageType := nodeConfig.Spec.Storage.Type - switch nodeConfig.Spec.Storage.Type { + switch storageType { case v1beta1.KineStorageType: storageBackend = &controller.Kine{ Config: nodeConfig.Spec.Storage.Kine, @@ -548,6 +550,7 @@ func (c *command) start(ctx context.Context) error { if telemetry.IsEnabled() { clusterComponents.Add(ctx, &telemetry.Component{ K0sVars: c.K0sVars, + StorageType: storageType, KubeClientFactory: adminClientFactory, }) } else { diff --git a/pkg/telemetry/component.go b/pkg/telemetry/component.go index 035e8c648990..5dc0e44ebecc 100644 --- a/pkg/telemetry/component.go +++ b/pkg/telemetry/component.go @@ -33,8 +33,8 @@ import ( // Component is a telemetry component for k0s component manager type Component struct { - clusterConfig *v1beta1.ClusterConfig K0sVars *config.CfgVars + StorageType string KubeClientFactory kubeutil.ClientFactoryInterface log *logrus.Entry @@ -75,7 +75,6 @@ func (c *Component) Reconcile(ctx context.Context, clusterCfg *v1beta1.ClusterCo // We must have the worker stuff already running, do nothing return nil } - c.clusterConfig = clusterCfg clients, err := c.KubeClientFactory.GetClient() if err != nil { return err diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index ab3d5261f0f3..4db1ae060ac5 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -21,7 +21,6 @@ import ( "fmt" "runtime" - "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" "github.com/k0sproject/k0s/pkg/build" kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" @@ -66,7 +65,7 @@ func (c *Component) collectTelemetry(ctx context.Context, clients kubernetes.Int var err error data := telemetryData{} - data.StorageType = c.getStorageType() + data.StorageType = c.StorageType data.ClusterID, err = getClusterID(ctx, clients) if err != nil { @@ -88,14 +87,6 @@ func (c *Component) collectTelemetry(ctx context.Context, clients kubernetes.Int return data, nil } -func (c *Component) getStorageType() string { - switch c.clusterConfig.Spec.Storage.Type { - case v1beta1.EtcdStorageType, v1beta1.KineStorageType: - return c.clusterConfig.Spec.Storage.Type - } - return "unknown" -} - func getClusterID(ctx context.Context, clients kubernetes.Interface) (string, error) { ns, err := clients.CoreV1().Namespaces().Get(ctx, "kube-system", From 2def78650b6851d5018b98041411b336d6f9515f Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 10 Jul 2024 10:59:53 +0200 Subject: [PATCH 8/8] Add a mutex to the telemetry component The Start/Stop methods and the Reconcile method may be called concurrently by different goroutines. Ensure that there's no races when starting and stopping the telemetry loop. Also replace the bespoke for loop with wait.UntilWithContext, which is tailor-made for that purpose. Signed-off-by: Tom Wieczorek --- pkg/telemetry/component.go | 76 +++++++++++++++++++++++++------------- 1 file changed, 50 insertions(+), 26 deletions(-) diff --git a/pkg/telemetry/component.go b/pkg/telemetry/component.go index 5dc0e44ebecc..c8610beaedc1 100644 --- a/pkg/telemetry/component.go +++ b/pkg/telemetry/component.go @@ -18,6 +18,7 @@ package telemetry import ( "context" + "sync" "time" "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" @@ -25,6 +26,7 @@ import ( "github.com/k0sproject/k0s/pkg/config" kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "github.com/segmentio/analytics-go" @@ -37,8 +39,10 @@ type Component struct { StorageType string KubeClientFactory kubeutil.ClientFactoryInterface - log *logrus.Entry - stopCh chan struct{} + log *logrus.Entry + + mu sync.Mutex + stop func() } var _ manager.Component = (*Component)(nil) @@ -52,55 +56,75 @@ func (c *Component) Init(context.Context) error { return nil } -// Run runs work cycle -func (c *Component) Start(_ context.Context) error { +func (c *Component) Start(context.Context) error { return nil } -// Run does nothing func (c *Component) Stop() error { - if c.stopCh != nil { - close(c.stopCh) + c.mu.Lock() + defer c.mu.Unlock() + + if c.stop != nil { + c.stop() + c.stop = nil } + return nil } // Reconcile detects changes in configuration and applies them to the component -func (c *Component) Reconcile(ctx context.Context, clusterCfg *v1beta1.ClusterConfig) error { - logrus.Debug("reconcile method called for: Telemetry") +func (c *Component) Reconcile(_ context.Context, clusterCfg *v1beta1.ClusterConfig) error { + c.mu.Lock() + defer c.mu.Unlock() + if !clusterCfg.Spec.Telemetry.IsEnabled() { - return c.Stop() - } - if c.stopCh != nil { - // We must have the worker stuff already running, do nothing + if c.stop == nil { + c.log.Debug("Telemetry remains disabled") + } else { + c.stop() + c.stop = nil + } + return nil } + + if c.stop != nil { + return nil // already running + } + clients, err := c.KubeClientFactory.GetClient() if err != nil { return err } - go c.run(ctx, clients) + + c.stop = c.start(clients) + return nil } +func (c *Component) start(clients kubernetes.Interface) (stop func()) { + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + + go func() { + defer close(done) + c.log.Info("Starting to collect telemetry") + c.run(ctx, clients) + c.log.Info("Stopped to collect telemetry") + }() + + return func() { cancel(); <-done } +} + func (c *Component) run(ctx context.Context, clients kubernetes.Interface) { - c.stopCh = make(chan struct{}) - ticker := time.NewTicker(interval) analyticsClient := analytics.New(segmentToken) - defer func() { - ticker.Stop() if err := analyticsClient.Close(); err != nil { c.log.WithError(err).Debug("Failed to close analytics client") } }() - for { - select { - case <-ticker.C: - c.sendTelemetry(ctx, analyticsClient, clients) - case <-c.stopCh: - return - } - } + wait.UntilWithContext(ctx, func(ctx context.Context) { + c.sendTelemetry(ctx, analyticsClient, clients) + }, interval) }