From c0ae84b58f127d9f110a31e941ec2404452fcbeb Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 11 Feb 2022 10:45:23 +0100 Subject: [PATCH 1/2] Fixes memberlist usage report Signed-off-by: Cyril Tovena --- pkg/loki/loki.go | 7 +++ pkg/loki/modules.go | 1 + pkg/usagestats/reporter.go | 53 ++++++++++++++-- pkg/usagestats/reporter_test.go | 8 ++- pkg/usagestats/seed.go | 50 ++++++++++++++++ pkg/usagestats/seed_test.go | 103 ++++++++++++++++++++++++++++++++ pkg/usagestats/stats.go | 2 + 7 files changed, 216 insertions(+), 8 deletions(-) create mode 100644 pkg/usagestats/seed_test.go diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 9446572f3c1f2..97f25dbb2b13a 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -494,6 +494,7 @@ func (t *Loki) setupModuleManager() error { // Add dependencies deps := map[string][]string{ Ring: {RuntimeConfig, Server, MemberlistKV}, + UsageReport: {}, Overrides: {RuntimeConfig}, OverridesExporter: {Overrides, Server}, TenantConfigs: {RuntimeConfig}, @@ -540,6 +541,12 @@ func (t *Loki) setupModuleManager() error { t.deps = deps t.ModuleManager = mm + if t.isModuleActive(Ingester) { + if err := mm.AddDependency(UsageReport, Ring); err != nil { + return err + } + } + return nil } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 9c5fe1473ec72..938f48ab08aad 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -674,6 +674,7 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { t.Cfg.MemberlistKV.MetricsRegisterer = reg t.Cfg.MemberlistKV.Codecs = []codec.Codec{ ring.GetCodec(), + usagestats.JSONCodec, } dnsProviderReg := prometheus.WrapRegistererWithPrefix( diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index d13ad7084918b..42b4b189fc7ca 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -32,7 +32,10 @@ const ( var ( reportCheckInterval = time.Minute - reportInterval = 1 * time.Hour + reportInterval = 4 * time.Hour + + stabilityCheckInterval = 5 * time.Second + stabilityMinimunRequired = 6 ) type Config struct { @@ -80,11 +83,12 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed { return nil } // Try to become leader via the kv client - for backoff := backoff.New(ctx, backoff.Config{ + backoff := backoff.New(ctx, backoff.Config{ MinBackoff: time.Second, MaxBackoff: time.Minute, MaxRetries: 0, - }); ; backoff.Ongoing() { + }) + for backoff.Ongoing() { // create a new cluster seed seed := ClusterSeed{ UID: uuid.NewString(), @@ -94,16 +98,19 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed { if err := kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) { // The key is already set, so we don't need to do anything if in != nil { - if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed.UID != seed.UID { + if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed != nil && kvSeed.UID != seed.UID { seed = *kvSeed return nil, false, nil } } - return seed, true, nil + return &seed, true, nil }); err != nil { level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) continue } + // ensure stability of the cluster seed + stableSeed := ensureStableKey(ctx, kvClient, rep.logger) + seed = *stableSeed // Fetch the remote cluster seed. remoteSeed, err := rep.fetchSeed(ctx, func(err error) bool { @@ -115,14 +122,49 @@ func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed { // we are the leader and we need to save the file. if err := rep.writeSeedFile(ctx, seed); err != nil { level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) + backoff.Wait() continue } return &seed } + backoff.Wait() continue } return remoteSeed } + return nil +} + +// ensureStableKey ensures that the cluster seed is stable for at least 30seconds. +// This is required when using gossiping kv client like memberlist which will never have the same seed +// but will converge eventually. +func ensureStableKey(ctx context.Context, kvClient kv.Client, logger log.Logger) *ClusterSeed { + var ( + previous *ClusterSeed + stableCount int + ) + for { + time.Sleep(stabilityCheckInterval) + value, err := kvClient.Get(ctx, seedKey) + if err != nil { + level.Debug(logger).Log("msg", "failed to get cluster seed key for stability check", "err", err) + continue + } + if seed, ok := value.(*ClusterSeed); ok && seed != nil { + if previous == nil { + previous = seed + continue + } + if previous.UID != seed.UID { + previous = seed + continue + } + stableCount++ + if stableCount > stabilityMinimunRequired { + return seed + } + } + } } func (rep *Reporter) init(ctx context.Context) { @@ -161,6 +203,7 @@ func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) b readingErr = 0 } if continueFn == nil || continueFn(err) { + backoff.Wait() continue } return nil, err diff --git a/pkg/usagestats/reporter_test.go b/pkg/usagestats/reporter_test.go index 1fcb930b94af3..dafc3de8fd008 100644 --- a/pkg/usagestats/reporter_test.go +++ b/pkg/usagestats/reporter_test.go @@ -21,6 +21,8 @@ import ( var metrics = storage.NewClientMetrics() func Test_LeaderElection(t *testing.T) { + stabilityCheckInterval = 100 * time.Millisecond + result := make(chan *ClusterSeed, 10) objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ FSConfig: local.FSConfig{ @@ -71,6 +73,7 @@ func Test_ReportLoop(t *testing.T) { // stub reportCheckInterval = 100 * time.Millisecond reportInterval = time.Second + stabilityCheckInterval = 100 * time.Millisecond totalReport := 0 clusterIDs := []string{} @@ -94,12 +97,11 @@ func Test_ReportLoop(t *testing.T) { Store: "inmemory", }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) require.NoError(t, err) - - r.initLeader(context.Background()) ctx, cancel := context.WithCancel(context.Background()) + r.initLeader(ctx) go func() { - <-time.After(6 * time.Second) + <-time.After(6*time.Second + (stabilityCheckInterval * time.Duration(stabilityMinimunRequired+1))) cancel() }() require.Equal(t, context.Canceled, r.running(ctx)) diff --git a/pkg/usagestats/seed.go b/pkg/usagestats/seed.go index cbf4108c440aa..53ab020a3f820 100644 --- a/pkg/usagestats/seed.go +++ b/pkg/usagestats/seed.go @@ -1,18 +1,68 @@ package usagestats import ( + "fmt" "time" jsoniter "github.com/json-iterator/go" prom "github.com/prometheus/prometheus/web/api/v1" + + "github.com/grafana/dskit/kv/memberlist" ) +// ClusterSeed is the seed for the usage stats. +// A unique ID is generated for each cluster. type ClusterSeed struct { UID string `json:"UID"` CreatedAt time.Time `json:"created_at"` prom.PrometheusVersion `json:"version"` } +// Merge implements the memberlist.Mergeable interface. +// It allow to merge the content of two different seeds. +func (c *ClusterSeed) Merge(mergeable memberlist.Mergeable, localCAS bool) (change memberlist.Mergeable, error error) { + if mergeable == nil { + return nil, nil + } + other, ok := mergeable.(*ClusterSeed) + if !ok { + return nil, fmt.Errorf("expected *usagestats.ClusterSeed, got %T", mergeable) + } + if other == nil { + return nil, nil + } + // if we already have (c) the oldest key, then should not request change. + if c.CreatedAt.Before(other.CreatedAt) { + return nil, nil + } + if c.CreatedAt == other.CreatedAt { + // if we have the exact same creation date but the key is different, take the smallest UID to ensure stability. + if c.UID > other.UID { + *c = *other + return other, nil + } + return nil, nil + } + // if our seed is not the oldest, then we should request a change. + *c = *other + return other, nil +} + +// MergeContent tells if the content of the two seeds are the same. +func (c *ClusterSeed) MergeContent() []string { + return []string{c.UID} +} + +// RemoveTombstones is not required for usagestats +func (c *ClusterSeed) RemoveTombstones(limit time.Time) (total, removed int) { + return 0, 0 +} + +func (c *ClusterSeed) Clone() memberlist.Mergeable { + new := *c + return &new +} + var JSONCodec = jsonCodec{} type jsonCodec struct{} diff --git a/pkg/usagestats/seed_test.go b/pkg/usagestats/seed_test.go new file mode 100644 index 0000000000000..8376c4013bc72 --- /dev/null +++ b/pkg/usagestats/seed_test.go @@ -0,0 +1,103 @@ +package usagestats + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/kv/codec" + "github.com/grafana/dskit/kv/memberlist" + "github.com/grafana/dskit/services" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/chunk/local" + "github.com/grafana/loki/pkg/storage/chunk/storage" +) + +type dnsProviderMock struct { + resolved []string +} + +func (p *dnsProviderMock) Resolve(ctx context.Context, addrs []string) error { + p.resolved = addrs + return nil +} + +func (p dnsProviderMock) Addresses() []string { + return p.resolved +} + +func createMemberlist(t *testing.T, port, memberID int) *memberlist.KV { + t.Helper() + var cfg memberlist.KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = memberlist.TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: 0, + } + cfg.GossipInterval = 100 * time.Millisecond + cfg.GossipNodes = 3 + cfg.PushPullInterval = 5 * time.Second + cfg.NodeName = fmt.Sprintf("Member-%d", memberID) + cfg.Codecs = []codec.Codec{JSONCodec} + + mkv := memberlist.NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, nil) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), mkv)) + if port != 0 { + _, err := mkv.JoinMembers([]string{fmt.Sprintf("127.0.0.1:%d", port)}) + require.NoError(t, err, "%s failed to join the cluster: %v", memberID, err) + } + t.Cleanup(func() { + services.StopAndAwaitTerminated(context.TODO(), mkv) + }) + return mkv +} + +func Test_Memberlist(t *testing.T) { + stabilityCheckInterval = time.Second + + objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ + FSConfig: local.FSConfig{ + Directory: t.TempDir(), + }, + }, metrics) + require.NoError(t, err) + result := make(chan *ClusterSeed, 10) + + // create a first memberlist to get a valid listening port. + initMKV := createMemberlist(t, 0, -1) + + for i := 0; i < 10; i++ { + go func(i int) { + leader, err := NewReporter(Config{ + Leader: true, + }, kv.Config{ + Store: "memberlist", + StoreConfig: kv.StoreConfig{ + MemberlistKV: func() (*memberlist.KV, error) { + return createMemberlist(t, initMKV.GetListeningPort(), i), nil + }, + }, + }, objectClient, log.NewLogfmtLogger(os.Stdout), nil) + require.NoError(t, err) + leader.init(context.Background()) + result <- leader.cluster + }(i) + } + + var UID []string + for i := 0; i < 10; i++ { + cluster := <-result + require.NotNil(t, cluster) + UID = append(UID, cluster.UID) + } + first := UID[0] + for _, uid := range UID { + require.Equal(t, first, uid) + } +} diff --git a/pkg/usagestats/stats.go b/pkg/usagestats/stats.go index 89218f6f6f50d..84f751d5565cf 100644 --- a/pkg/usagestats/stats.go +++ b/pkg/usagestats/stats.go @@ -35,6 +35,7 @@ type Report struct { ClusterID string `json:"clusterID"` CreatedAt time.Time `json:"createdAt"` Interval time.Time `json:"interval"` + IntervalPeriod float64 `json:"intervalPeriod"` Target string `json:"target"` prom.PrometheusVersion `json:"version"` Os string `json:"os"` @@ -92,6 +93,7 @@ func buildReport(seed *ClusterSeed, interval time.Time) Report { PrometheusVersion: build.GetVersion(), CreatedAt: seed.CreatedAt, Interval: interval, + IntervalPeriod: reportInterval.Seconds(), Os: runtime.GOOS, Arch: runtime.GOARCH, Target: targetName, From efc88caab3b3955329eff855b9f6bae0875c4141 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 11 Feb 2022 11:12:14 +0100 Subject: [PATCH 2/2] Fixes the linter and improve comment Signed-off-by: Cyril Tovena --- pkg/usagestats/reporter.go | 1 + pkg/usagestats/seed.go | 3 ++- pkg/usagestats/seed_test.go | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go index 42b4b189fc7ca..431eece72024a 100644 --- a/pkg/usagestats/reporter.go +++ b/pkg/usagestats/reporter.go @@ -157,6 +157,7 @@ func ensureStableKey(ctx context.Context, kvClient kv.Client, logger log.Logger) } if previous.UID != seed.UID { previous = seed + stableCount = 0 continue } stableCount++ diff --git a/pkg/usagestats/seed.go b/pkg/usagestats/seed.go index 53ab020a3f820..1a9856950497d 100644 --- a/pkg/usagestats/seed.go +++ b/pkg/usagestats/seed.go @@ -36,7 +36,8 @@ func (c *ClusterSeed) Merge(mergeable memberlist.Mergeable, localCAS bool) (chan return nil, nil } if c.CreatedAt == other.CreatedAt { - // if we have the exact same creation date but the key is different, take the smallest UID to ensure stability. + // if we have the exact same creation date but the key is different + // we take the smallest UID using string alphabetical comparison to ensure stability. if c.UID > other.UID { *c = *other return other, nil diff --git a/pkg/usagestats/seed_test.go b/pkg/usagestats/seed_test.go index 8376c4013bc72..0d30e7bd31926 100644 --- a/pkg/usagestats/seed_test.go +++ b/pkg/usagestats/seed_test.go @@ -53,7 +53,7 @@ func createMemberlist(t *testing.T, port, memberID int) *memberlist.KV { require.NoError(t, err, "%s failed to join the cluster: %v", memberID, err) } t.Cleanup(func() { - services.StopAndAwaitTerminated(context.TODO(), mkv) + _ = services.StopAndAwaitTerminated(context.TODO(), mkv) }) return mkv }