Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry component cleanup #4742

Merged
merged 8 commits into from
Jul 23, 2024
18 changes: 12 additions & 6 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,11 @@ func (c *command) start(ctx context.Context) error {
return err
}
logrus.Infof("DNS address: %s", dnsAddress)

var storageBackend manager.Component
storageType := nodeConfig.Spec.Storage.Type

switch nodeConfig.Spec.Storage.Type {
switch storageType {
case v1beta1.KineStorageType:
storageBackend = &controller.Kine{
Config: nodeConfig.Spec.Storage.Kine,
Expand Down Expand Up @@ -545,11 +547,15 @@ func (c *command) start(ctx context.Context) error {
})
}

clusterComponents.Add(ctx, &telemetry.Component{
Version: build.Version,
K0sVars: c.K0sVars,
KubeClientFactory: adminClientFactory,
})
if telemetry.IsEnabled() {
clusterComponents.Add(ctx, &telemetry.Component{
K0sVars: c.K0sVars,
StorageType: storageType,
KubeClientFactory: adminClientFactory,
})
} else {
logrus.Info("Telemetry is disabled")
}

clusterComponents.Add(ctx, &controller.Autopilot{
K0sVars: c.K0sVars,
Expand Down
129 changes: 64 additions & 65 deletions pkg/telemetry/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,31 @@ package telemetry

import (
"context"
"sync"
"time"

"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/config"

kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"
"github.com/sirupsen/logrus"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/segmentio/analytics-go"
"github.com/sirupsen/logrus"
)

// Component is a telemetry component for k0s component manager
type Component struct {
clusterConfig *v1beta1.ClusterConfig
K0sVars *config.CfgVars
Version string
StorageType string
KubeClientFactory kubeutil.ClientFactoryInterface

kubernetesClient kubernetes.Interface
analyticsClient analyticsClient
log *logrus.Entry

log *logrus.Entry
stopCh chan struct{}
mu sync.Mutex
stop func()
}

var _ manager.Component = (*Component)(nil)
Expand All @@ -50,82 +51,80 @@ var _ manager.Reconciler = (*Component)(nil)
var interval = time.Minute * 10

// Init set up for external service clients (segment, k8s api)
func (c *Component) Init(_ context.Context) error {
func (c *Component) Init(context.Context) error {
c.log = logrus.WithField("component", "telemetry")

if segmentToken == "" {
c.log.Info("no token, telemetry is disabled")
return nil
}

c.analyticsClient = newSegmentClient(segmentToken)
c.log.Info("segment client has been init")
return nil
}

func (c *Component) retrieveKubeClient(ch chan struct{}) {
client, err := c.KubeClientFactory.GetClient()
if err != nil {
c.log.WithError(err).Warning("can't init kube client")
return
}
c.kubernetesClient = client
close(ch)
}

// Run runs work cycle
func (c *Component) Start(_ context.Context) error {
func (c *Component) Start(context.Context) error {
return nil
}

// Run does nothing
func (c *Component) Stop() error {
if segmentToken == "" {
c.log.Info("no token, telemetry is disabled")
return nil
}
if c.stopCh != nil {
close(c.stopCh)
}
if c.analyticsClient != nil {
_ = c.analyticsClient.Close()
c.mu.Lock()
defer c.mu.Unlock()

if c.stop != nil {
c.stop()
c.stop = nil
}

return nil
}

// Reconcile detects changes in configuration and applies them to the component
func (c *Component) Reconcile(ctx context.Context, clusterCfg *v1beta1.ClusterConfig) error {
logrus.Debug("reconcile method called for: Telemetry")
func (c *Component) Reconcile(_ context.Context, clusterCfg *v1beta1.ClusterConfig) error {
c.mu.Lock()
defer c.mu.Unlock()

if !clusterCfg.Spec.Telemetry.IsEnabled() {
return c.Stop()
}
if c.stopCh != nil {
// We must have the worker stuff already running, do nothing
if c.stop == nil {
c.log.Debug("Telemetry remains disabled")
} else {
c.stop()
c.stop = nil
}

return nil
}
if segmentToken == "" {
c.log.Info("no token, telemetry is disabled")
return nil

if c.stop != nil {
return nil // already running
}

clients, err := c.KubeClientFactory.GetClient()
if err != nil {
return err
}
c.clusterConfig = clusterCfg
initedCh := make(chan struct{})
wait.Until(func() {
c.retrieveKubeClient(initedCh)
}, time.Second, initedCh)
go c.run(ctx)

c.stop = c.start(clients)

return nil
}

func (c Component) run(ctx context.Context) {
c.stopCh = make(chan struct{})
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.sendTelemetry(ctx)
case <-c.stopCh:
return
func (c *Component) start(clients kubernetes.Interface) (stop func()) {
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})

go func() {
defer close(done)
c.log.Info("Starting to collect telemetry")
c.run(ctx, clients)
c.log.Info("Stopped to collect telemetry")
}()

return func() { cancel(); <-done }
}

func (c *Component) run(ctx context.Context, clients kubernetes.Interface) {
analyticsClient := analytics.New(segmentToken)
defer func() {
if err := analyticsClient.Close(); err != nil {
c.log.WithError(err).Debug("Failed to close analytics client")
}
}
}()

wait.UntilWithContext(ctx, func(ctx context.Context) {
c.sendTelemetry(ctx, analyticsClient, clients)
}, interval)
}
13 changes: 4 additions & 9 deletions pkg/telemetry/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,14 @@ var segmentToken = ""

const heartbeatEvent = "cluster-heartbeat"

// Analytics is the interface used for our analytics client.
type analyticsClient interface {
Enqueue(msg analytics.Message) error
Close() error
func IsEnabled() bool {
return segmentToken != ""
}

func NewDefaultSegmentClient() analyticsClient {
if segmentToken == "" {
func NewDefaultSegmentClient() analytics.Client {
if !IsEnabled() {
return nil
}
return newSegmentClient(segmentToken)
}

func newSegmentClient(segmentToken string) analyticsClient {
return analytics.New(segmentToken)
}
48 changes: 21 additions & 27 deletions pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"fmt"
"runtime"

"github.com/segmentio/analytics-go"
"github.com/k0sproject/k0s/pkg/build"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"

"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"
"github.com/segmentio/analytics-go"
)

type telemetryData struct {
Expand Down Expand Up @@ -59,17 +61,17 @@ func (td telemetryData) asProperties() analytics.Properties {
}
}

func (c Component) collectTelemetry(ctx context.Context) (telemetryData, error) {
func (c *Component) collectTelemetry(ctx context.Context, clients kubernetes.Interface) (telemetryData, error) {
var err error
data := telemetryData{}

data.StorageType = c.getStorageType()
data.ClusterID, err = c.getClusterID(ctx)
data.StorageType = c.StorageType
data.ClusterID, err = getClusterID(ctx, clients)

if err != nil {
return data, fmt.Errorf("can't collect cluster ID: %w", err)
}
wds, sums, err := c.getWorkerData(ctx)
wds, sums, err := getWorkerData(ctx, clients)
if err != nil {
return data, fmt.Errorf("can't collect workers count: %w", err)
}
Expand All @@ -78,23 +80,15 @@ func (c Component) collectTelemetry(ctx context.Context) (telemetryData, error)
data.WorkerData = wds
data.MEMTotal = sums.memTotal
data.CPUTotal = sums.cpuTotal
data.ControlPlaneNodesCount, err = kubeutil.GetControlPlaneNodeCount(ctx, c.kubernetesClient)
data.ControlPlaneNodesCount, err = kubeutil.GetControlPlaneNodeCount(ctx, clients)
if err != nil {
return data, fmt.Errorf("can't collect control plane nodes count: %w", err)
}
return data, nil
}

func (c Component) getStorageType() string {
switch c.clusterConfig.Spec.Storage.Type {
case v1beta1.EtcdStorageType, v1beta1.KineStorageType:
return c.clusterConfig.Spec.Storage.Type
}
return "unknown"
}

func (c Component) getClusterID(ctx context.Context) (string, error) {
ns, err := c.kubernetesClient.CoreV1().Namespaces().Get(ctx,
func getClusterID(ctx context.Context, clients kubernetes.Interface) (string, error) {
ns, err := clients.CoreV1().Namespaces().Get(ctx,
"kube-system",
metav1.GetOptions{})
if err != nil {
Expand All @@ -104,8 +98,8 @@ func (c Component) getClusterID(ctx context.Context) (string, error) {
return fmt.Sprintf("kube-system:%s", ns.UID), nil
}

func (c Component) getWorkerData(ctx context.Context) ([]workerData, workerSums, error) {
nodes, err := c.kubernetesClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
func getWorkerData(ctx context.Context, clients kubernetes.Interface) ([]workerData, workerSums, error) {
nodes, err := clients.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, workerSums{}, err
}
Expand All @@ -129,8 +123,8 @@ func (c Component) getWorkerData(ctx context.Context) ([]workerData, workerSums,
return wds, workerSums{cpuTotal: cpuTotal, memTotal: memTotal}, nil
}

func (c Component) sendTelemetry(ctx context.Context) {
data, err := c.collectTelemetry(ctx)
func (c *Component) sendTelemetry(ctx context.Context, analyticsClient analytics.Client, clients kubernetes.Interface) {
data, err := c.collectTelemetry(ctx, clients)
if err != nil {
c.log.WithError(err).Warning("can't prepare telemetry data")
return
Expand All @@ -140,16 +134,16 @@ func (c Component) sendTelemetry(ctx context.Context) {
Extra: map[string]interface{}{"direct": true},
}

hostData.App.Version = c.Version
hostData.App.Version = build.Version
hostData.App.Name = "k0s"
hostData.App.Namespace = "k0s"
hostData.Extra["cpuArch"] = runtime.GOARCH

addSysInfo(&hostData)
c.addCustomData(ctx, &hostData)
addCustomData(ctx, &hostData, clients)

c.log.WithField("data", data).WithField("hostdata", hostData).Info("sending telemetry")
if err := c.analyticsClient.Enqueue(analytics.Track{
if err := analyticsClient.Enqueue(analytics.Track{
AnonymousId: "(removed)",
Event: heartbeatEvent,
Properties: data.asProperties(),
Expand All @@ -159,8 +153,8 @@ func (c Component) sendTelemetry(ctx context.Context) {
}
}

func (c Component) addCustomData(ctx context.Context, analyticCtx *analytics.Context) {
cm, err := c.kubernetesClient.CoreV1().ConfigMaps("kube-system").Get(ctx, "k0s-telemetry", metav1.GetOptions{})
func addCustomData(ctx context.Context, analyticCtx *analytics.Context, clients kubernetes.Interface) {
cm, err := clients.CoreV1().ConfigMaps("kube-system").Get(ctx, "k0s-telemetry", metav1.GetOptions{})
if err != nil {
return
}
Expand Down
Loading