From bbaef790dbea7c7d8aca6b86317f00f8250e47fb Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 10 Feb 2022 13:54:13 +0100 Subject: [PATCH] Add usage report into Loki. (#5361) * Adds leader election process Signed-off-by: Cyril Tovena * fluke Signed-off-by: Cyril Tovena * fixes the kv typecheck * wire up the http client * Hooking into loki services, hit a bug * Add stats variable. * re-vendor dskit and improve to never fail service * Intrument Loki with the package * Add changelog entry Signed-off-by: Cyril Tovena * Fixes compactor test Signed-off-by: Cyril Tovena * Add configuration documentation Signed-off-by: Cyril Tovena * Update pkg/usagestats/reporter.go Co-authored-by: Danny Kopping * Add boundary check Signed-off-by: Cyril Tovena * Add log for success report. Signed-off-by: Cyril Tovena * lint Signed-off-by: Cyril Tovena * Update pkg/usagestats/reporter.go Co-authored-by: Danny Kopping Co-authored-by: Danny Kopping --- CHANGELOG.md | 1 + .../pkg/promtail/targets/cloudflare/target.go | 10 +- .../targets/cloudflare/target_test.go | 26 +- docs/sources/configuration/_index.md | 13 + go.mod | 4 +- go.sum | 7 +- pkg/distributor/distributor.go | 7 +- pkg/ingester/flush.go | 17 +- pkg/ingester/ingester.go | 27 +- pkg/ingester/instance.go | 5 + pkg/ingester/stream.go | 5 + pkg/loghttp/push/push.go | 6 + pkg/logql/metrics.go | 18 + pkg/loki/loki.go | 23 +- pkg/loki/modules.go | 30 ++ pkg/ruler/base/ruler.go | 8 +- pkg/storage/store.go | 12 + .../stores/shipper/compactor/compactor.go | 13 + pkg/storage/stores/shipper/compactor/table.go | 11 +- .../stores/shipper/downloads/index_set.go | 10 +- pkg/storage/stores/shipper/downloads/table.go | 4 +- pkg/usagestats/reporter.go | 266 +++++++++++++ pkg/usagestats/reporter_test.go | 147 ++++++++ pkg/usagestats/seed.go | 33 ++ pkg/usagestats/stats.go | 352 ++++++++++++++++++ pkg/usagestats/stats_test.go | 97 +++++ pkg/util/build/build.go | 12 + pkg/util/server/error.go | 2 +- pkg/util/server/error_test.go | 62 +-- vendor/github.com/google/uuid/null.go | 118 ------ vendor/github.com/google/uuid/uuid.go | 45 +-- vendor/github.com/google/uuid/version4.go | 27 +- .../grafana/dskit/backoff/backoff.go | 6 +- .../grafana/dskit/concurrency/runner.go | 55 +-- .../grafana/dskit/crypto/tls/tls.go | 10 +- .../grafana/dskit/grpcclient/grpcclient.go | 14 +- vendor/github.com/grafana/dskit/kv/client.go | 19 +- .../grafana/dskit/kv/consul/client.go | 18 +- .../github.com/grafana/dskit/kv/etcd/etcd.go | 6 +- .../dskit/kv/memberlist/memberlist_client.go | 34 +- .../dskit/kv/memberlist/tcp_transport.go | 6 +- vendor/github.com/grafana/dskit/kv/multi.go | 8 +- .../grafana/dskit/ring/basic_lifecycler.go | 18 + vendor/github.com/grafana/dskit/ring/http.go | 73 ++-- .../grafana/dskit/ring/lifecycler.go | 45 ++- .../grafana/dskit/ring/replication_set.go | 3 +- vendor/github.com/grafana/dskit/ring/ring.go | 46 ++- .../grafana/dskit/runtimeconfig/manager.go | 2 +- vendor/modules.txt | 4 +- 49 files changed, 1384 insertions(+), 401 deletions(-) create mode 100644 pkg/usagestats/reporter.go create mode 100644 pkg/usagestats/reporter_test.go create mode 100644 pkg/usagestats/seed.go create mode 100644 pkg/usagestats/stats.go create mode 100644 pkg/usagestats/stats_test.go delete mode 100644 vendor/github.com/google/uuid/null.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 109115168ae71..773f54a99c91a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [5361](https://github.com/grafana/loki/pull/5361) **ctovena**: Add usage report to grafana.com. * [5289](https://github.com/grafana/loki/pull/5289) **ctovena**: Fix deduplication bug in queries when mutating labels. * [5302](https://github.com/grafana/loki/pull/5302) **MasslessParticle** Update azure blobstore client to use new sdk. * [5243](https://github.com/grafana/loki/pull/5290) **ssncferreira**: Update Promtail to support duration string formats. diff --git a/clients/pkg/promtail/targets/cloudflare/target.go b/clients/pkg/promtail/targets/cloudflare/target.go index 806eefabfdc14..ad097af45dffb 100644 --- a/clients/pkg/promtail/targets/cloudflare/target.go +++ b/clients/pkg/promtail/targets/cloudflare/target.go @@ -109,11 +109,11 @@ func (t *Target) start() { end = maxEnd } start := end.Add(-time.Duration(t.config.PullRange)) - + requests := splitRequests(start, end, t.config.Workers) // Use background context for workers as we don't want to cancel half way through. // In case of errors we stop the target, each worker has it's own retry logic. - if err := concurrency.ForEach(context.Background(), splitRequests(start, end, t.config.Workers), t.config.Workers, func(ctx context.Context, job interface{}) error { - request := job.(pullRequest) + if err := concurrency.ForEachJob(context.Background(), len(requests), t.config.Workers, func(ctx context.Context, idx int) error { + request := requests[idx] return t.pull(ctx, request.start, request.end) }); err != nil { level.Error(t.logger).Log("msg", "failed to pull logs", "err", err, "start", start, "end", end) @@ -229,9 +229,9 @@ type pullRequest struct { end time.Time } -func splitRequests(start, end time.Time, workers int) []interface{} { +func splitRequests(start, end time.Time, workers int) []pullRequest { perWorker := end.Sub(start) / time.Duration(workers) - var requests []interface{} + var requests []pullRequest for i := 0; i < workers; i++ { r := pullRequest{ start: start.Add(time.Duration(i) * perWorker), diff --git a/clients/pkg/promtail/targets/cloudflare/target_test.go b/clients/pkg/promtail/targets/cloudflare/target_test.go index 770b1be714e13..a06c101a99ccb 100644 --- a/clients/pkg/promtail/targets/cloudflare/target_test.go +++ b/clients/pkg/promtail/targets/cloudflare/target_test.go @@ -251,26 +251,26 @@ func Test_splitRequests(t *testing.T) { tests := []struct { start time.Time end time.Time - want []interface{} + want []pullRequest }{ // perfectly divisible { time.Unix(0, 0), time.Unix(0, int64(time.Minute)), - []interface{}{ - pullRequest{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))}, - pullRequest{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))}, - pullRequest{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute))}, + []pullRequest{ + {start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))}, + {start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))}, + {start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute))}, }, }, // not divisible { time.Unix(0, 0), time.Unix(0, int64(time.Minute+1)), - []interface{}{ - pullRequest{start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))}, - pullRequest{start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))}, - pullRequest{start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute+1))}, + []pullRequest{ + {start: time.Unix(0, 0), end: time.Unix(0, int64(time.Minute/3))}, + {start: time.Unix(0, int64(time.Minute/3)), end: time.Unix(0, int64(time.Minute*2/3))}, + {start: time.Unix(0, int64(time.Minute*2/3)), end: time.Unix(0, int64(time.Minute+1))}, }, }, } @@ -279,11 +279,11 @@ func Test_splitRequests(t *testing.T) { got := splitRequests(tt.start, tt.end, 3) if !assert.Equal(t, tt.want, got) { for i := range got { - if !assert.Equal(t, tt.want[i].(pullRequest).start, got[i].(pullRequest).start) { - t.Logf("expected i:%d start: %d , got: %d", i, tt.want[i].(pullRequest).start.UnixNano(), got[i].(pullRequest).start.UnixNano()) + if !assert.Equal(t, tt.want[i].start, got[i].start) { + t.Logf("expected i:%d start: %d , got: %d", i, tt.want[i].start.UnixNano(), got[i].start.UnixNano()) } - if !assert.Equal(t, tt.want[i].(pullRequest).end, got[i].(pullRequest).end) { - t.Logf("expected i:%d end: %d , got: %d", i, tt.want[i].(pullRequest).end.UnixNano(), got[i].(pullRequest).end.UnixNano()) + if !assert.Equal(t, tt.want[i].end, got[i].end) { + t.Logf("expected i:%d end: %d , got: %d", i, tt.want[i].end.UnixNano(), got[i].end.UnixNano()) } } } diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index c12e60bb4c2c7..1db281a9a2844 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -164,6 +164,9 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set # If a more specific configuration is given in other sections, # the related configuration within this section will be ignored. [common: ] + +# Configuration for usage report +[usage_report: ] ``` ## server @@ -2496,6 +2499,16 @@ This way, one doesn't have to replicate configuration in multiple places. [ring: ] ``` +## usage_report + +This block allow to configure usage report of Loki to grafana.com + +```yaml +# Whether or not usage report should be disabled. +# CLI flag: -usage-report.disabled +[disabled: : default = false] +``` + ### storage The common `storage` block defines a common storage to be reused by different diff --git a/go.mod b/go.mod index 5b04cf6aae094..753e9f79004d0 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/google/go-cmp v0.5.6 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 - github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 + github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/hashicorp/consul/api v1.12.0 @@ -105,6 +105,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.13.0 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.0.0-00010101000000-000000000000 github.com/google/renameio/v2 v2.0.0 + github.com/google/uuid v1.2.0 github.com/mattn/go-ieproxy v0.0.1 github.com/xdg-go/scram v1.0.2 gopkg.in/Graylog2/go-gelf.v2 v2.0.0-20191017102106-1550ee647df0 @@ -185,7 +186,6 @@ require ( github.com/google/go-querystring v1.0.0 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect - github.com/google/uuid v1.3.0 // indirect github.com/googleapis/gax-go/v2 v2.1.1 // indirect github.com/googleapis/gnostic v0.4.1 // indirect github.com/gophercloud/gophercloud v0.24.0 // indirect diff --git a/go.sum b/go.sum index c3c220e5be307..99536d2ac8688 100644 --- a/go.sum +++ b/go.sum @@ -994,9 +994,8 @@ github.com/google/renameio/v2 v2.0.0/go.mod h1:BtmJXm5YlszgC+TD4HOEEUFgkJP3nLxeh github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.2+incompatible h1:silFMLAnr330+NRuag/VjIGF7TLp/LBrV2CJKFLWEww= github.com/googleapis/gax-go v2.0.2+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= @@ -1033,8 +1032,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM= -github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 h1:IXo/V2+KKLYLD724qh3uRaZgAy3BV3HdtXuSs7lb3jU= -github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE= +github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0 h1:R0Pw7VjouhYSS7bsMdxEidcJbCq1KUBCzPgsh7805NM= +github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0/go.mod h1:Q9WmQ9cVkrHx6g4KSl6TN+N3vEOkDZd9RtyXCHd5OPQ= github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8 h1:aEOagXOTqtN9gd4jiDuP/5a81HdoJBqkVfn8WaxbsK4= github.com/grafana/go-gelf v0.0.0-20211112153804-126646b86de8/go.mod h1:QAvS2C7TtQRhhv9Uf/sxD+BUhpkrPFm5jK/9MzUiDCY= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 2b65da90bf4e3..563377324bbe4 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -28,6 +28,7 @@ import ( "github.com/grafana/loki/pkg/runtime" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/validation" @@ -37,7 +38,10 @@ const ( ringKey = "distributor" ) -var maxLabelCacheSize = 100000 +var ( + maxLabelCacheSize = 100000 + rfStats = usagestats.NewInt("distributor_replication_factor") +) // Config for a Distributor. type Config struct { @@ -168,6 +172,7 @@ func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, in }), } d.replicationFactor.Set(float64(ingestersRing.ReplicationFactor())) + rfStats.Set(int64(ingestersRing.ReplicationFactor())) servs = append(servs, d.pool) d.subservices, err = services.NewManager(servs...) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index b1b8dab657fc8..f0e7f00c31c52 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" loki_util "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" @@ -90,6 +91,12 @@ var ( // 1h -> 8hr Buckets: prometheus.LinearBuckets(1, 1, 8), }) + flushedChunksStats = usagestats.NewCounter("ingester_flushed_chunks") + flushedChunksBytesStats = usagestats.NewStatistics("ingester_flushed_chunks_bytes") + flushedChunksLinesStats = usagestats.NewStatistics("ingester_flushed_chunks_lines") + flushedChunksAgeStats = usagestats.NewStatistics("ingester_flushed_chunks_age_seconds") + flushedChunksLifespanStats = usagestats.NewStatistics("ingester_flushed_chunks_lifespan_seconds") + flushedChunksUtilizationStats = usagestats.NewStatistics("ingester_flushed_chunks_utilization") ) const ( @@ -382,6 +389,7 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP if err := i.store.Put(ctx, wireChunks); err != nil { return err } + flushedChunksStats.Inc(int64(len(wireChunks))) // Record statistics only when actual put request did not return error. sizePerTenant := chunkSizePerTenant.WithLabelValues(userID) @@ -408,7 +416,8 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize) } - chunkUtilization.Observe(wc.Data.Utilization()) + utilization := wc.Data.Utilization() + chunkUtilization.Observe(utilization) chunkEntries.Observe(float64(numEntries)) chunkSize.Observe(compressedSize) sizePerTenant.Add(compressedSize) @@ -416,6 +425,12 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP firstTime, lastTime := cs[i].chunk.Bounds() chunkAge.Observe(time.Since(firstTime).Seconds()) chunkLifespan.Observe(lastTime.Sub(firstTime).Hours()) + + flushedChunksBytesStats.Record(compressedSize) + flushedChunksLinesStats.Record(float64(numEntries)) + flushedChunksUtilizationStats.Record(utilization) + flushedChunksAgeStats.Record(time.Since(firstTime).Seconds()) + flushedChunksLifespanStats.Record(lastTime.Sub(firstTime).Hours()) } return nil diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 3359a612050ef..3d93bf454e107 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -32,6 +32,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/stores/shipper" "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" errUtil "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" @@ -45,12 +46,18 @@ const ( // ErrReadOnly is returned when the ingester is shutting down and a push was // attempted. -var ErrReadOnly = errors.New("Ingester is shutting down") - -var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cortex_ingester_flush_queue_length", - Help: "The total number of series pending in the flush queue.", -}) +var ( + ErrReadOnly = errors.New("Ingester is shutting down") + + flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_ingester_flush_queue_length", + Help: "The total number of series pending in the flush queue.", + }) + compressionStats = usagestats.NewString("ingester_compression") + targetSizeStats = usagestats.NewInt("ingester_target_size_bytes") + walStats = usagestats.NewString("ingester_wal") + activeTenantsStats = usagestats.NewInt("ingester_active_tenants") +) // Config for an ingester. type Config struct { @@ -215,7 +222,12 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.New } - + compressionStats.Set(cfg.ChunkEncoding) + targetSizeStats.Set(int64(cfg.TargetChunkSize)) + walStats.Set("disabled") + if cfg.WAL.Enabled { + walStats.Set("enabled") + } metrics := newIngesterMetrics(registerer) i := &Ingester{ @@ -549,6 +561,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) *instance { if !ok { inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter) i.instances[instanceID] = inst + activeTenantsStats.Set(int64(len(i.instances))) } return inst } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 6bf66c3472178..bf60bc9351143 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -26,6 +26,7 @@ import ( "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/runtime" "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/math" @@ -53,6 +54,8 @@ var ( Name: "ingester_streams_removed_total", Help: "The total number of streams removed per tenant.", }, []string{"tenant"}) + + streamsCountStats = usagestats.NewInt("ingester_streams_count") ) type instance struct { @@ -248,6 +251,7 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *WALRecord memoryStreams.WithLabelValues(i.instanceID).Inc() i.streamsCreatedTotal.Inc() i.addTailersToNewStream(s) + streamsCountStats.Add(1) if i.configs.LogStreamCreation(i.instanceID) { level.Debug(util_log.Logger).Log( @@ -288,6 +292,7 @@ func (i *instance) removeStream(s *stream) { i.index.Delete(s.labels, s.fp) i.streamsRemovedTotal.Inc() memoryStreams.WithLabelValues(i.instanceID).Dec() + streamsCountStats.Add(-1) } } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 99d3a563b8900..fdf9a9b3c7a51 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util/flagext" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/validation" @@ -47,6 +48,8 @@ var ( Buckets: prometheus.ExponentialBuckets(5, 2, 6), }) + + chunkCreatedStats = usagestats.NewCounter("ingester_chunk_created") ) var ErrEntriesExist = errors.New("duplicate push - entries already exist") @@ -203,6 +206,7 @@ func (s *stream) Push( chunk: s.NewChunk(), }) chunksCreatedTotal.Inc() + chunkCreatedStats.Inc(1) } var storedEntries []logproto.Entry @@ -379,6 +383,7 @@ func (s *stream) cutChunk(ctx context.Context) *chunkDesc { samplesPerChunk.Observe(float64(chunk.chunk.Size())) blocksPerChunk.Observe(float64(chunk.chunk.BlockCount())) chunksCreatedTotal.Inc() + chunkCreatedStats.Inc(1) s.chunks = append(s.chunks, chunkDesc{ chunk: s.NewChunk(), diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 486042ecedea2..9ee8881f5f2e6 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" loki_util "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/unmarshal" @@ -39,6 +40,9 @@ var ( Name: "distributor_lines_received_total", Help: "The total number of lines received per tenant", }, []string{"tenant"}) + + bytesReceivedStats = usagestats.NewCounter("distributor_bytes_received") + linesReceivedStats = usagestats.NewCounter("distributor_lines_received") ) const applicationJSON = "application/json" @@ -130,6 +134,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete totalEntries++ entriesSize += int64(len(e.Line)) bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(int64(len(e.Line)))) + bytesReceivedStats.Inc(int64(len(e.Line))) if e.Timestamp.After(mostRecentEntry) { mostRecentEntry = e.Timestamp } @@ -140,6 +145,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete if totalEntries != 0 && userID != "" { linesIngested.WithLabelValues(userID).Add(float64(totalEntries)) } + linesReceivedStats.Inc(totalEntries) level.Debug(logger).Log( "msg", "push request parsed", diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 56b82b5091a89..9359b56da5154 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -64,6 +65,11 @@ var ( Name: "logql_querystats_ingester_sent_lines_total", Help: "Total count of lines sent from ingesters while executing LogQL queries.", }) + + bytePerSecondMetricUsage = usagestats.NewStatistics("query_metric_bytes_per_second") + bytePerSecondLogUsage = usagestats.NewStatistics("query_log_bytes_per_second") + linePerSecondMetricUsage = usagestats.NewStatistics("query_metric_lines_per_second") + linePerSecondLogUsage = usagestats.NewStatistics("query_log_lines_per_second") ) func RecordMetrics(ctx context.Context, p Params, status string, stats logql_stats.Result, result promql_parser.Value) { @@ -125,6 +131,18 @@ func RecordMetrics(ctx context.Context, p Params, status string, stats logql_sta chunkDownloadedTotal.WithLabelValues(status, queryType, rt). Add(float64(stats.TotalChunksDownloaded())) ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent)) + + recordUsageStats(queryType, stats) +} + +func recordUsageStats(queryType string, stats logql_stats.Result) { + if queryType == QueryTypeMetric { + bytePerSecondMetricUsage.Record(float64(stats.Summary.BytesProcessedPerSecond)) + linePerSecondMetricUsage.Record(float64(stats.Summary.LinesProcessedPerSecond)) + } else { + bytePerSecondLogUsage.Record(float64(stats.Summary.BytesProcessedPerSecond)) + linePerSecondLogUsage.Record(float64(stats.Summary.LinesProcessedPerSecond)) + } } func QueryType(query string) (string, error) { diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index bb42d3194c160..9446572f3c1f2 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -45,6 +45,7 @@ import ( chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" "github.com/grafana/loki/pkg/tracing" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/fakeauth" util_log "github.com/grafana/loki/pkg/util/log" @@ -79,6 +80,7 @@ type Config struct { Tracing tracing.Config `yaml:"tracing"` CompactorConfig compactor.Config `yaml:"compactor,omitempty"` QueryScheduler scheduler.Config `yaml:"query_scheduler"` + UsageReport usagestats.Config `yaml:"usage_report"` } // RegisterFlags registers flag. @@ -115,6 +117,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Tracing.RegisterFlags(f) c.CompactorConfig.RegisterFlags(f) c.QueryScheduler.RegisterFlags(f) + c.UsageReport.RegisterFlags(f) } func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) { @@ -245,6 +248,7 @@ type Loki struct { compactor *compactor.Compactor QueryFrontEndTripperware basetripper.Tripperware queryScheduler *scheduler.Scheduler + usageReport *usagestats.Reporter clientMetrics chunk_storage.ClientMetrics @@ -481,6 +485,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(Compactor, t.initCompactor) mm.RegisterModule(IndexGateway, t.initIndexGateway) mm.RegisterModule(QueryScheduler, t.initQueryScheduler) + mm.RegisterModule(UsageReport, t.initUsageReport) mm.RegisterModule(All, nil) mm.RegisterModule(Read, nil) @@ -492,17 +497,17 @@ func (t *Loki) setupModuleManager() error { Overrides: {RuntimeConfig}, OverridesExporter: {Overrides, Server}, TenantConfigs: {RuntimeConfig}, - Distributor: {Ring, Server, Overrides, TenantConfigs}, + Distributor: {Ring, Server, Overrides, TenantConfigs, UsageReport}, Store: {Overrides}, - Ingester: {Store, Server, MemberlistKV, TenantConfigs}, - Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs}, + Ingester: {Store, Server, MemberlistKV, TenantConfigs, UsageReport}, + Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport}, QueryFrontendTripperware: {Server, Overrides, TenantConfigs}, - QueryFrontend: {QueryFrontendTripperware}, - QueryScheduler: {Server, Overrides, MemberlistKV}, - Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs}, - TableManager: {Server}, - Compactor: {Server, Overrides, MemberlistKV}, - IndexGateway: {Server}, + QueryFrontend: {QueryFrontendTripperware, UsageReport}, + QueryScheduler: {Server, Overrides, MemberlistKV, UsageReport}, + Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport}, + TableManager: {Server, UsageReport}, + Compactor: {Server, Overrides, MemberlistKV, UsageReport}, + IndexGateway: {Server, UsageReport}, IngesterQuerier: {Ring}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, Read: {QueryScheduler, QueryFrontend, Querier, Ruler, Compactor}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 6f142dac550cb..685de4b69e488 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/dskit/runtimeconfig" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/common/version" "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/weaveworks/common/middleware" @@ -49,6 +50,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" "github.com/grafana/loki/pkg/storage/stores/shipper/uploads" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" serverutil "github.com/grafana/loki/pkg/util/server" @@ -82,6 +84,7 @@ const ( All string = "all" Read string = "read" Write string = "write" + UsageReport string = "usage-report" ) func (t *Loki) initServer() (services.Service, error) { @@ -749,6 +752,33 @@ func (t *Loki) initQueryScheduler() (services.Service, error) { return s, nil } +func (t *Loki) initUsageReport() (services.Service, error) { + if t.Cfg.UsageReport.Disabled { + return nil, nil + } + t.Cfg.UsageReport.Leader = false + if t.isModuleActive(Ingester) { + t.Cfg.UsageReport.Leader = true + } + usagestats.Edition("oss") + usagestats.Target(t.Cfg.Target.String()) + period, err := t.Cfg.SchemaConfig.SchemaForTime(model.Now()) + if err != nil { + return nil, err + } + + objectClient, err := chunk_storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig.Config, t.clientMetrics) + if err != nil { + return nil, err + } + ur, err := usagestats.NewReporter(t.Cfg.UsageReport, t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore, objectClient, util_log.Logger, prometheus.DefaultRegisterer) + if err != nil { + return nil, err + } + t.usageReport = ur + return ur, nil +} + func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) { if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 { return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`") diff --git a/pkg/ruler/base/ruler.go b/pkg/ruler/base/ruler.go index 46fca61517d43..cf8761d6f45e2 100644 --- a/pkg/ruler/base/ruler.go +++ b/pkg/ruler/base/ruler.go @@ -427,7 +427,7 @@ func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) { if r.cfg.EnableSharding { r.ring.ServeHTTP(w, req) } else { - var unshardedPage = ` + unshardedPage := ` @@ -769,9 +769,9 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupSta // Concurrently fetch rules from all rulers. Since rules are not replicated, // we need all requests to succeed. - jobs := concurrency.CreateJobsFromStrings(rulers.GetAddresses()) - err = concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error { - addr := job.(string) + addresses := rulers.GetAddresses() + err = concurrency.ForEachJob(ctx, len(addresses), len(addresses), func(ctx context.Context, idx int) error { + addr := addresses[idx] rulerClient, err := r.clientsPool.GetClientFor(addr) if err != nil { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 8a65211fa8829..1f5ec9fac7594 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/storage" "github.com/grafana/loki/pkg/storage/stores/shipper" "github.com/grafana/loki/pkg/tenant" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" ) @@ -31,6 +32,9 @@ var ( errCurrentBoltdbShipperNon24Hours = errors.New("boltdb-shipper works best with 24h periodic index config. Either add a new config with future date set to 24h to retain the existing index or change the existing config to use 24h period") errUpcomingBoltdbShipperNon24Hours = errors.New("boltdb-shipper with future date must always have periodic config for index set to 24h") errZeroLengthConfig = errors.New("must specify at least one schema configuration") + indexTypeStats = usagestats.NewString("store_index_type") + objectTypeStats = usagestats.NewString("store_object_type") + schemaStats = usagestats.NewString("store_schema") ) // Config is the loki storage configuration @@ -125,6 +129,14 @@ type store struct { // NewStore creates a new Loki Store using configuration supplied. func NewStore(cfg Config, schemaCfg SchemaConfig, chunkStore chunk.Store, registerer prometheus.Registerer) (Store, error) { + if len(schemaCfg.Configs) != 0 { + if index := ActivePeriodConfig(schemaCfg.Configs); index != -1 && index < len(schemaCfg.Configs) { + indexTypeStats.Set(schemaCfg.Configs[index].IndexType) + objectTypeStats.Set(schemaCfg.Configs[index].ObjectType) + schemaStats.Set(schemaCfg.Configs[index].Schema) + } + } + return &store{ Store: chunkStore, cfg: cfg, diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 0aee339550376..bbfeb044bf374 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -26,6 +26,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" shipper_storage "github.com/grafana/loki/pkg/storage/stores/shipper/storage" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -52,6 +53,11 @@ const ( ringNumTokens = 1 ) +var ( + retentionEnabledStats = usagestats.NewString("compactor_retention_enabled") + defaultRetentionStats = usagestats.NewString("compactor_default_retention") +) + type Config struct { WorkingDirectory string `yaml:"working_directory"` SharedStoreType string `yaml:"shared_store"` @@ -119,6 +125,13 @@ type Compactor struct { } func NewCompactor(cfg Config, storageConfig storage.Config, schemaConfig loki_storage.SchemaConfig, limits retention.Limits, clientMetrics storage.ClientMetrics, r prometheus.Registerer) (*Compactor, error) { + retentionEnabledStats.Set("false") + if cfg.RetentionEnabled { + retentionEnabledStats.Set("true") + } + if limits != nil { + defaultRetentionStats.Set(limits.DefaultLimits().RetentionPeriod.String()) + } if cfg.SharedStoreType == "" { return nil, errors.New("compactor shared_store_type must be specified") } diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index b9ab02611c6d3..c1c5434725ac0 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -264,18 +264,13 @@ func (t *table) compactFiles(files []storage.IndexFile) error { return err } - jobs := make([]interface{}, len(files)) - for i := 0; i < len(files); i++ { - jobs[i] = i - } - - return concurrency.ForEach(t.ctx, jobs, readDBsConcurrency, func(ctx context.Context, job interface{}) error { - workNum := job.(int) + return concurrency.ForEachJob(t.ctx, len(files), readDBsConcurrency, func(ctx context.Context, idx int) error { + workNum := idx // skip seed file if workNum == t.seedSourceFileIdx { return nil } - fileName := files[workNum].Name + fileName := files[idx].Name downloadAt := filepath.Join(t.workingDirectory, fileName) err = shipper_util.DownloadFileFromStorage(downloadAt, shipper_util.IsCompressedFile(fileName), diff --git a/pkg/storage/stores/shipper/downloads/index_set.go b/pkg/storage/stores/shipper/downloads/index_set.go index 9672f2eb48faf..3c9d0f8c32b37 100644 --- a/pkg/storage/stores/shipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/downloads/index_set.go @@ -390,13 +390,8 @@ func (t *indexSet) doConcurrentDownload(ctx context.Context, files []storage.Ind downloadedFiles := make([]string, 0, len(files)) downloadedFilesMtx := sync.Mutex{} - jobs := make([]interface{}, len(files)) - for i := 0; i < len(files); i++ { - jobs[i] = i - } - - err := concurrency.ForEach(ctx, jobs, maxDownloadConcurrency, func(ctx context.Context, job interface{}) error { - fileName := files[job.(int)].Name + err := concurrency.ForEachJob(ctx, len(files), maxDownloadConcurrency, func(ctx context.Context, idx int) error { + fileName := files[idx].Name err := t.downloadFileFromStorage(ctx, fileName, t.cacheLocation) if err != nil { if t.baseIndexSet.IsFileNotFoundErr(err) { @@ -412,7 +407,6 @@ func (t *indexSet) doConcurrentDownload(ctx context.Context, files []storage.Ind return nil }) - if err != nil { return nil, err } diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index ec334eb63aa44..ca427a633c9b5 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -308,8 +308,8 @@ func (t *Table) EnsureQueryReadiness(ctx context.Context) error { // downloadUserIndexes downloads user specific index files concurrently. func (t *Table) downloadUserIndexes(ctx context.Context, userIDs []string) error { - return concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(userIDs), maxDownloadConcurrency, func(ctx context.Context, userID interface{}) error { - indexSet, err := t.getOrCreateIndexSet(userID.(string)) + return concurrency.ForEachJob(ctx, len(userIDs), maxDownloadConcurrency, func(ctx context.Context, idx int) error { + indexSet, err := t.getOrCreateIndexSet(userIDs[idx]) if err != nil { return err } diff --git a/pkg/usagestats/reporter.go b/pkg/usagestats/reporter.go new file mode 100644 index 0000000000000..af57c163f863c --- /dev/null +++ b/pkg/usagestats/reporter.go @@ -0,0 +1,266 @@ +package usagestats + +import ( + "bytes" + "context" + "flag" + "io" + "math" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/google/uuid" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/multierror" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/util/build" +) + +const ( + // File name for the cluster seed file. + ClusterSeedFileName = "loki_cluster_seed.json" + // attemptNumber how many times we will try to read a corrupted cluster seed before deleting it. + attemptNumber = 4 + // seedKey is the key for the cluster seed to use with the kv store. + seedKey = "usagestats_token" +) + +var ( + reportCheckInterval = time.Minute + reportInterval = 1 * time.Hour +) + +type Config struct { + Disabled bool `yaml:"disabled"` + Leader bool `yaml:"-"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.Disabled, "usage-report.disabled", false, "Disable anonymous usage reporting.") +} + +type Reporter struct { + kvClient kv.Client + logger log.Logger + objectClient chunk.ObjectClient + reg prometheus.Registerer + + services.Service + + conf Config + cluster *ClusterSeed + lastReport time.Time +} + +func NewReporter(config Config, kvConfig kv.Config, objectClient chunk.ObjectClient, logger log.Logger, reg prometheus.Registerer) (*Reporter, error) { + if config.Disabled { + return nil, nil + } + kvClient, err := kv.NewClient(kvConfig, JSONCodec, kv.RegistererWithKVName(reg, "usagestats"), logger) + if err != nil { + return nil, err + } + r := &Reporter{ + kvClient: kvClient, + logger: logger, + objectClient: objectClient, + conf: config, + reg: reg, + } + r.Service = services.NewBasicService(nil, r.running, nil) + return r, nil +} + +func (rep *Reporter) initLeader(ctx context.Context) *ClusterSeed { + // Try to become leader via the kv client + for backoff := backoff.New(ctx, backoff.Config{ + MinBackoff: time.Second, + MaxBackoff: time.Minute, + MaxRetries: 0, + }); ; backoff.Ongoing() { + // create a new cluster seed + seed := ClusterSeed{ + UID: uuid.NewString(), + PrometheusVersion: build.GetVersion(), + CreatedAt: time.Now(), + } + if err := rep.kvClient.CAS(ctx, seedKey, func(in interface{}) (out interface{}, retry bool, err error) { + // The key is already set, so we don't need to do anything + if in != nil { + if kvSeed, ok := in.(*ClusterSeed); ok && kvSeed.UID != seed.UID { + seed = *kvSeed + return nil, false, nil + } + } + return seed, true, nil + }); err != nil { + level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) + continue + } + // Fetch the remote cluster seed. + remoteSeed, err := rep.fetchSeed(ctx, + func(err error) bool { + // we only want to retry if the error is not an object not found error + return !rep.objectClient.IsObjectNotFoundErr(err) + }) + if err != nil { + if rep.objectClient.IsObjectNotFoundErr(err) { + // we are the leader and we need to save the file. + if err := rep.writeSeedFile(ctx, seed); err != nil { + level.Info(rep.logger).Log("msg", "failed to CAS cluster seed key", "err", err) + continue + } + return &seed + } + continue + } + return remoteSeed + } +} + +func (rep *Reporter) init(ctx context.Context) { + if rep.conf.Leader { + rep.cluster = rep.initLeader(ctx) + return + } + // follower only wait for the cluster seed to be set. + // it will try forever to fetch the cluster seed. + seed, _ := rep.fetchSeed(ctx, nil) + rep.cluster = seed +} + +// fetchSeed fetches the cluster seed from the object store and try until it succeeds. +// continueFn allow you to decide if we should continue retrying. Nil means always retry +func (rep *Reporter) fetchSeed(ctx context.Context, continueFn func(err error) bool) (*ClusterSeed, error) { + var ( + backoff = backoff.New(ctx, backoff.Config{ + MinBackoff: time.Second, + MaxBackoff: time.Minute, + MaxRetries: 0, + }) + readingErr = 0 + ) + for backoff.Ongoing() { + seed, err := rep.readSeedFile(ctx) + if err != nil { + if !rep.objectClient.IsObjectNotFoundErr(err) { + readingErr++ + } + level.Debug(rep.logger).Log("msg", "failed to read cluster seed file", "err", err) + if readingErr > attemptNumber { + if err := rep.objectClient.DeleteObject(ctx, ClusterSeedFileName); err != nil { + level.Error(rep.logger).Log("msg", "failed to delete corrupted cluster seed file, deleting it", "err", err) + } + readingErr = 0 + } + if continueFn == nil || continueFn(err) { + continue + } + return nil, err + } + return seed, nil + } + return nil, backoff.Err() +} + +// readSeedFile reads the cluster seed file from the object store. +func (rep *Reporter) readSeedFile(ctx context.Context) (*ClusterSeed, error) { + reader, _, err := rep.objectClient.GetObject(ctx, ClusterSeedFileName) + if err != nil { + return nil, err + } + if err != nil { + return nil, err + } + defer func() { + if err := reader.Close(); err != nil { + level.Error(rep.logger).Log("msg", "failed to close reader", "err", err) + } + }() + data, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + seed, err := JSONCodec.Decode(data) + if err != nil { + return nil, err + } + return seed.(*ClusterSeed), nil +} + +// writeSeedFile writes the cluster seed to the object store. +func (rep *Reporter) writeSeedFile(ctx context.Context, seed ClusterSeed) error { + data, err := JSONCodec.Encode(seed) + if err != nil { + return err + } + return rep.objectClient.PutObject(ctx, ClusterSeedFileName, bytes.NewReader(data)) +} + +// running inits the reporter seed and start sending report for every interval +func (rep *Reporter) running(ctx context.Context) error { + rep.init(ctx) + + // check every minute if we should report. + ticker := time.NewTicker(reportCheckInterval) + defer ticker.Stop() + + // find when to send the next report. + next := nextReport(reportInterval, rep.cluster.CreatedAt, time.Now()) + if rep.lastReport.IsZero() { + // if we never reported assumed it was the last interval. + rep.lastReport = next.Add(-reportInterval) + } + for { + select { + case <-ticker.C: + now := time.Now() + if !next.Equal(now) && now.Sub(rep.lastReport) < reportInterval { + continue + } + level.Debug(rep.logger).Log("msg", "reporting cluster stats", "date", time.Now()) + if err := rep.reportUsage(ctx, next); err != nil { + level.Info(rep.logger).Log("msg", "failed to report usage", "err", err) + continue + } + rep.lastReport = next + next = next.Add(reportInterval) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// reportUsage reports the usage to grafana.com. +func (rep *Reporter) reportUsage(ctx context.Context, interval time.Time) error { + backoff := backoff.New(ctx, backoff.Config{ + MinBackoff: time.Second, + MaxBackoff: 30 * time.Second, + MaxRetries: 5, + }) + var errs multierror.MultiError + for backoff.Ongoing() { + if err := sendReport(ctx, rep.cluster, interval); err != nil { + level.Info(rep.logger).Log("msg", "failed to send usage report", "retries", backoff.NumRetries(), "err", err) + errs.Add(err) + backoff.Wait() + continue + } + level.Debug(rep.logger).Log("msg", "usage report sent with success") + return nil + } + return errs.Err() +} + +// nextReport compute the next report time based on the interval. +// The interval is based off the creation of the cluster seed to avoid all cluster reporting at the same time. +func nextReport(interval time.Duration, createdAt, now time.Time) time.Time { + // createdAt * (x * interval ) >= now + return createdAt.Add(time.Duration(math.Ceil(float64(now.Sub(createdAt))/float64(interval))) * interval) +} diff --git a/pkg/usagestats/reporter_test.go b/pkg/usagestats/reporter_test.go new file mode 100644 index 0000000000000..c3dd63c2af418 --- /dev/null +++ b/pkg/usagestats/reporter_test.go @@ -0,0 +1,147 @@ +package usagestats + +import ( + "context" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/kv" + jsoniter "github.com/json-iterator/go" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/chunk/local" + "github.com/grafana/loki/pkg/storage/chunk/storage" +) + +var metrics = storage.NewClientMetrics() + +func Test_LeaderElection(t *testing.T) { + result := make(chan *ClusterSeed, 10) + objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ + FSConfig: local.FSConfig{ + Directory: t.TempDir(), + }, + }, metrics) + require.NoError(t, err) + for i := 0; i < 3; i++ { + go func() { + r, err := NewReporter(Config{Leader: true}, kv.Config{ + Store: "inmemory", + }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) + require.NoError(t, err) + r.init(context.Background()) + result <- r.cluster + }() + } + for i := 0; i < 7; i++ { + go func() { + r, err := NewReporter(Config{Leader: false}, kv.Config{ + Store: "inmemory", + }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) + require.NoError(t, err) + r.init(context.Background()) + result <- r.cluster + }() + } + + var UID []string + for i := 0; i < 10; i++ { + cluster := <-result + require.NotNil(t, cluster) + UID = append(UID, cluster.UID) + } + first := UID[0] + for _, uid := range UID { + require.Equal(t, first, uid) + } + kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, JSONCodec, prometheus.DefaultRegisterer, log.NewLogfmtLogger(os.Stdout)) + require.NoError(t, err) + // verify that the ID found is also correctly stored in the kv store and not overridden by another leader. + data, err := kvClient.Get(context.Background(), seedKey) + require.NoError(t, err) + require.Equal(t, data.(*ClusterSeed).UID, first) +} + +func Test_ReportLoop(t *testing.T) { + // stub + reportCheckInterval = 100 * time.Millisecond + reportInterval = time.Second + + totalReport := 0 + clusterIDs := []string{} + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + var received Report + totalReport++ + require.NoError(t, jsoniter.NewDecoder(r.Body).Decode(&received)) + clusterIDs = append(clusterIDs, received.ClusterID) + rw.WriteHeader(http.StatusOK) + })) + usageStatsURL = server.URL + + objectClient, err := storage.NewObjectClient(storage.StorageTypeFileSystem, storage.Config{ + FSConfig: local.FSConfig{ + Directory: t.TempDir(), + }, + }, metrics) + require.NoError(t, err) + + r, err := NewReporter(Config{Leader: true}, kv.Config{ + Store: "inmemory", + }, objectClient, log.NewLogfmtLogger(os.Stdout), prometheus.NewPedanticRegistry()) + require.NoError(t, err) + + r.initLeader(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + <-time.After(6 * time.Second) + cancel() + }() + require.Equal(t, context.Canceled, r.running(ctx)) + require.GreaterOrEqual(t, totalReport, 5) + first := clusterIDs[0] + for _, uid := range clusterIDs { + require.Equal(t, first, uid) + } + require.Equal(t, first, r.cluster.UID) +} + +func Test_NextReport(t *testing.T) { + fixtures := map[string]struct { + interval time.Duration + createdAt time.Time + now time.Time + + next time.Time + }{ + "createdAt aligned with interval and now": { + interval: 1 * time.Hour, + createdAt: time.Unix(0, time.Hour.Nanoseconds()), + now: time.Unix(0, 2*time.Hour.Nanoseconds()), + next: time.Unix(0, 2*time.Hour.Nanoseconds()), + }, + "createdAt aligned with interval": { + interval: 1 * time.Hour, + createdAt: time.Unix(0, time.Hour.Nanoseconds()), + now: time.Unix(0, 2*time.Hour.Nanoseconds()+1), + next: time.Unix(0, 3*time.Hour.Nanoseconds()), + }, + "createdAt not aligned": { + interval: 1 * time.Hour, + createdAt: time.Unix(0, time.Hour.Nanoseconds()+18*time.Minute.Nanoseconds()+20*time.Millisecond.Nanoseconds()), + now: time.Unix(0, 2*time.Hour.Nanoseconds()+1), + next: time.Unix(0, 2*time.Hour.Nanoseconds()+18*time.Minute.Nanoseconds()+20*time.Millisecond.Nanoseconds()), + }, + } + for name, f := range fixtures { + t.Run(name, func(t *testing.T) { + next := nextReport(f.interval, f.createdAt, f.now) + require.Equal(t, f.next, next) + }) + } +} diff --git a/pkg/usagestats/seed.go b/pkg/usagestats/seed.go new file mode 100644 index 0000000000000..cbf4108c440aa --- /dev/null +++ b/pkg/usagestats/seed.go @@ -0,0 +1,33 @@ +package usagestats + +import ( + "time" + + jsoniter "github.com/json-iterator/go" + prom "github.com/prometheus/prometheus/web/api/v1" +) + +type ClusterSeed struct { + UID string `json:"UID"` + CreatedAt time.Time `json:"created_at"` + prom.PrometheusVersion `json:"version"` +} + +var JSONCodec = jsonCodec{} + +type jsonCodec struct{} + +// todo we need to use the default codec for the rest of the code +// currently crashing because the in-memory kvstore use a singleton. +func (jsonCodec) Decode(data []byte) (interface{}, error) { + var seed ClusterSeed + if err := jsoniter.ConfigFastest.Unmarshal(data, &seed); err != nil { + return nil, err + } + return &seed, nil +} + +func (jsonCodec) Encode(obj interface{}) ([]byte, error) { + return jsoniter.ConfigFastest.Marshal(obj) +} +func (jsonCodec) CodecID() string { return "usagestats.jsonCodec" } diff --git a/pkg/usagestats/stats.go b/pkg/usagestats/stats.go new file mode 100644 index 0000000000000..89218f6f6f50d --- /dev/null +++ b/pkg/usagestats/stats.go @@ -0,0 +1,352 @@ +package usagestats + +import ( + "bytes" + "context" + "encoding/json" + "expvar" + "fmt" + "io" + "math" + "net/http" + "runtime" + "strings" + "sync" + "time" + + "github.com/grafana/loki/pkg/util/build" + + "github.com/cespare/xxhash/v2" + jsoniter "github.com/json-iterator/go" + prom "github.com/prometheus/prometheus/web/api/v1" + "go.uber.org/atomic" +) + +var ( + httpClient = http.Client{Timeout: 5 * time.Second} + usageStatsURL = "https://stats.grafana.org/loki-usage-report" + statsPrefix = "github.com/grafana/loki/" + targetKey = "target" + editionKey = "edition" +) + +// Report is the JSON object sent to the stats server +type Report struct { + ClusterID string `json:"clusterID"` + CreatedAt time.Time `json:"createdAt"` + Interval time.Time `json:"interval"` + Target string `json:"target"` + prom.PrometheusVersion `json:"version"` + Os string `json:"os"` + Arch string `json:"arch"` + Edition string `json:"edition"` + Metrics map[string]interface{} `json:"metrics"` +} + +// sendReport sends the report to the stats server +func sendReport(ctx context.Context, seed *ClusterSeed, interval time.Time) error { + report := buildReport(seed, interval) + out, err := jsoniter.MarshalIndent(report, "", " ") + if err != nil { + return err + } + req, err := http.NewRequest(http.MethodPost, usageStatsURL, bytes.NewBuffer(out)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := httpClient.Do(req.WithContext(ctx)) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode/100 != 2 { + data, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + return fmt.Errorf("failed to send usage stats: %s body: %s", resp.Status, string(data)) + } + return nil +} + +// buildReport builds the report to be sent to the stats server +func buildReport(seed *ClusterSeed, interval time.Time) Report { + var ( + targetName string + editionName string + ) + if target := expvar.Get(statsPrefix + targetKey); target != nil { + if target, ok := target.(*expvar.String); ok { + targetName = target.Value() + } + } + if edition := expvar.Get(statsPrefix + editionKey); edition != nil { + if edition, ok := edition.(*expvar.String); ok { + editionName = edition.Value() + } + } + + return Report{ + ClusterID: seed.UID, + PrometheusVersion: build.GetVersion(), + CreatedAt: seed.CreatedAt, + Interval: interval, + Os: runtime.GOOS, + Arch: runtime.GOARCH, + Target: targetName, + Edition: editionName, + Metrics: buildMetrics(), + } +} + +// buildMetrics builds the metrics part of the report to be sent to the stats server +func buildMetrics() map[string]interface{} { + result := map[string]interface{}{ + "memstats": memstats(), + "num_cpu": runtime.NumCPU(), + "num_goroutine": runtime.NumGoroutine(), + } + expvar.Do(func(kv expvar.KeyValue) { + if !strings.HasPrefix(kv.Key, statsPrefix) || kv.Key == statsPrefix+targetKey || kv.Key == statsPrefix+editionKey { + return + } + var value interface{} + switch v := kv.Value.(type) { + case *expvar.Int: + value = v.Value() + case *expvar.Float: + value = v.Value() + case *expvar.String: + value = v.Value() + case *Statistics: + value = v.Value() + case *Counter: + v.updateRate() + value = v.Value() + v.reset() + case *WordCounter: + value = v.Value() + default: + value = v.String() + } + result[strings.TrimPrefix(kv.Key, statsPrefix)] = value + }) + return result +} + +func memstats() interface{} { + stats := new(runtime.MemStats) + runtime.ReadMemStats(stats) + return map[string]interface{}{ + "alloc": stats.Alloc, + "total_alloc": stats.TotalAlloc, + "sys": stats.Sys, + "heap_alloc": stats.HeapAlloc, + "heap_inuse": stats.HeapInuse, + "stack_inuse": stats.StackInuse, + "pause_total_ns": stats.PauseTotalNs, + "num_gc": stats.NumGC, + "gc_cpu_fraction": stats.GCCPUFraction, + } +} + +// NewFloat returns a new Float stats object. +func NewFloat(name string) *expvar.Float { + return expvar.NewFloat(statsPrefix + name) +} + +// NewInt returns a new Int stats object. +func NewInt(name string) *expvar.Int { + return expvar.NewInt(statsPrefix + name) +} + +// NewString returns a new String stats object. +func NewString(name string) *expvar.String { + return expvar.NewString(statsPrefix + name) +} + +// Target sets the target name. +func Target(target string) { + NewString(targetKey).Set(target) +} + +// Edition sets the edition name. +func Edition(edition string) { + NewString(editionKey).Set(edition) +} + +type Statistics struct { + min *atomic.Float64 + max *atomic.Float64 + count *atomic.Int64 + + avg *atomic.Float64 + + // require for stddev and stdvar + mean *atomic.Float64 + value *atomic.Float64 +} + +// NewStatistics returns a new Statistics object. +// Statistics object is thread-safe and compute statistics on the fly based on sample recorded. +// Available statistics are: +// - min +// - max +// - avg +// - count +// - stddev +// - stdvar +func NewStatistics(name string) *Statistics { + s := &Statistics{ + min: atomic.NewFloat64(math.Inf(0)), + max: atomic.NewFloat64(math.Inf(-1)), + count: atomic.NewInt64(0), + avg: atomic.NewFloat64(0), + mean: atomic.NewFloat64(0), + value: atomic.NewFloat64(0), + } + expvar.Publish(statsPrefix+name, s) + return s +} + +func (s *Statistics) String() string { + b, _ := json.Marshal(s.Value()) + return string(b) +} + +func (s *Statistics) Value() map[string]interface{} { + stdvar := s.value.Load() / float64(s.count.Load()) + stddev := math.Sqrt(stdvar) + min := s.min.Load() + max := s.max.Load() + result := map[string]interface{}{ + "avg": s.avg.Load(), + "count": s.count.Load(), + } + if !math.IsInf(min, 0) { + result["min"] = min + } + if !math.IsInf(max, 0) { + result["max"] = s.max.Load() + } + if !math.IsNaN(stddev) { + result["stddev"] = stddev + } + if !math.IsNaN(stdvar) { + result["stdvar"] = stdvar + } + return result +} + +func (s *Statistics) Record(v float64) { + for { + min := s.min.Load() + if min <= v { + break + } + if s.min.CAS(min, v) { + break + } + } + for { + max := s.max.Load() + if max >= v { + break + } + if s.max.CAS(max, v) { + break + } + } + for { + avg := s.avg.Load() + count := s.count.Load() + mean := s.mean.Load() + value := s.value.Load() + + delta := v - mean + newCount := count + 1 + newMean := mean + (delta / float64(newCount)) + newValue := value + (delta * (v - newMean)) + newAvg := avg + ((v - avg) / float64(newCount)) + if s.avg.CAS(avg, newAvg) && s.count.CAS(count, newCount) && s.mean.CAS(mean, newMean) && s.value.CAS(value, newValue) { + break + } + } +} + +type Counter struct { + total *atomic.Int64 + rate *atomic.Float64 + + resetTime time.Time +} + +// NewCounter returns a new Counter stats object. +func NewCounter(name string) *Counter { + c := &Counter{ + total: atomic.NewInt64(0), + rate: atomic.NewFloat64(0), + resetTime: time.Now(), + } + expvar.Publish(statsPrefix+name, c) + return c +} + +func (c *Counter) updateRate() { + total := c.total.Load() + c.rate.Store(float64(total) / time.Since(c.resetTime).Seconds()) +} + +func (c *Counter) reset() { + c.total.Store(0) + c.rate.Store(0) + c.resetTime = time.Now() +} + +func (c *Counter) Inc(i int64) { + c.total.Add(i) +} + +func (c *Counter) String() string { + b, _ := json.Marshal(c.Value()) + return string(b) +} + +func (c *Counter) Value() map[string]interface{} { + return map[string]interface{}{ + "total": c.total.Load(), + "rate": c.rate.Load(), + } +} + +type WordCounter struct { + words sync.Map + count *atomic.Int64 +} + +// NewWordCounter returns a new WordCounter stats object. +// WordCounter object is thread-safe and count the amount of word recorded. +func NewWordCounter(name string) *WordCounter { + c := &WordCounter{ + count: atomic.NewInt64(0), + words: sync.Map{}, + } + expvar.Publish(statsPrefix+name, c) + return c +} + +func (w *WordCounter) Add(word string) { + if _, loaded := w.words.LoadOrStore(xxhash.Sum64String(word), struct{}{}); !loaded { + w.count.Add(1) + } +} + +func (w *WordCounter) String() string { + b, _ := json.Marshal(w.Value()) + return string(b) +} + +func (w *WordCounter) Value() int64 { + return w.count.Load() +} diff --git a/pkg/usagestats/stats_test.go b/pkg/usagestats/stats_test.go new file mode 100644 index 0000000000000..de5f5005b4955 --- /dev/null +++ b/pkg/usagestats/stats_test.go @@ -0,0 +1,97 @@ +package usagestats + +import ( + "runtime" + "testing" + "time" + + "github.com/grafana/loki/pkg/util/build" + + "github.com/google/uuid" + jsoniter "github.com/json-iterator/go" + "github.com/stretchr/testify/require" +) + +func Test_BuildReport(t *testing.T) { + now := time.Now() + seed := &ClusterSeed{ + UID: uuid.New().String(), + CreatedAt: now, + } + + Edition("OSS") + Target("compactor") + NewString("compression").Set("lz4") + NewInt("compression_ratio").Set(100) + NewFloat("size_mb").Set(100.1) + NewCounter("lines_written").Inc(200) + s := NewStatistics("query_throughput") + s.Record(300) + s.Record(5) + w := NewWordCounter("active_tenants") + w.Add("foo") + w.Add("bar") + w.Add("foo") + + r := buildReport(seed, now.Add(time.Hour)) + require.Equal(t, r.Arch, runtime.GOARCH) + require.Equal(t, r.Os, runtime.GOOS) + require.Equal(t, r.PrometheusVersion, build.GetVersion()) + require.Equal(t, r.Edition, "OSS") + require.Equal(t, r.Target, "compactor") + require.Equal(t, r.Metrics["num_cpu"], runtime.NumCPU()) + require.Equal(t, r.Metrics["num_goroutine"], runtime.NumGoroutine()) + require.Equal(t, r.Metrics["compression"], "lz4") + require.Equal(t, r.Metrics["compression_ratio"], int64(100)) + require.Equal(t, r.Metrics["size_mb"], 100.1) + require.Equal(t, r.Metrics["lines_written"].(map[string]interface{})["total"], int64(200)) + require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["min"], float64(5)) + require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["max"], float64(300)) + require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["count"], int64(2)) + require.Equal(t, r.Metrics["query_throughput"].(map[string]interface{})["avg"], float64(300+5)/2) + require.Equal(t, r.Metrics["active_tenants"], int64(2)) + + out, _ := jsoniter.MarshalIndent(r, "", " ") + t.Log(string(out)) +} + +func TestCounter(t *testing.T) { + c := NewCounter("test_counter") + c.Inc(100) + c.Inc(200) + c.Inc(300) + time.Sleep(1 * time.Second) + c.updateRate() + v := c.Value() + require.Equal(t, int64(600), v["total"]) + require.GreaterOrEqual(t, v["rate"], float64(590)) + c.reset() + require.Equal(t, int64(0), c.Value()["total"]) + require.Equal(t, float64(0), c.Value()["rate"]) +} + +func TestStatistic(t *testing.T) { + s := NewStatistics("test_stats") + s.Record(100) + s.Record(200) + s.Record(300) + v := s.Value() + require.Equal(t, float64(100), v["min"]) + require.Equal(t, float64(300), v["max"]) + require.Equal(t, int64(3), v["count"]) + require.Equal(t, float64(100+200+300)/3, v["avg"]) + require.Equal(t, float64(81.64965809277261), v["stddev"]) + require.Equal(t, float64(6666.666666666667), v["stdvar"]) +} + +func TestWordCounter(t *testing.T) { + w := NewWordCounter("test_words_count") + for i := 0; i < 100; i++ { + go func() { + w.Add("foo") + w.Add("bar") + w.Add("foo") + }() + } + require.Equal(t, int64(2), w.Value()) +} diff --git a/pkg/util/build/build.go b/pkg/util/build/build.go index 9a6b75a8bca95..76ff10af81b9d 100644 --- a/pkg/util/build/build.go +++ b/pkg/util/build/build.go @@ -4,6 +4,7 @@ import ( "runtime" "github.com/prometheus/common/version" + prom "github.com/prometheus/prometheus/web/api/v1" ) // Version information passed to Prometheus version package. @@ -27,3 +28,14 @@ func init() { version.BuildDate = BuildDate version.GoVersion = runtime.Version() } + +func GetVersion() prom.PrometheusVersion { + return prom.PrometheusVersion{ + Version: version.Version, + Revision: version.Revision, + Branch: version.Branch, + BuildUser: version.BuildUser, + BuildDate: version.BuildDate, + GoVersion: version.GoVersion, + } +} diff --git a/pkg/util/server/error.go b/pkg/util/server/error.go index d2598e0d27c2f..81b59a5e4eeb2 100644 --- a/pkg/util/server/error.go +++ b/pkg/util/server/error.go @@ -41,7 +41,7 @@ func NotFoundHandler(w http.ResponseWriter, r *http.Request) { func JSONError(w http.ResponseWriter, code int, message string, args ...interface{}) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.WriteHeader(code) - json.NewEncoder(w).Encode(ErrorResponseBody{ + _ = json.NewEncoder(w).Encode(ErrorResponseBody{ Code: code, Status: "error", Message: fmt.Sprintf(message, args...), diff --git a/pkg/util/server/error_test.go b/pkg/util/server/error_test.go index 211ae13803742..8e9e48cc022bd 100644 --- a/pkg/util/server/error_test.go +++ b/pkg/util/server/error_test.go @@ -32,58 +32,78 @@ func Test_writeError(t *testing.T) { }{ {"cancelled", context.Canceled, ErrClientCanceled, StatusClientClosedRequest}, {"cancelled multi", util.MultiError{context.Canceled, context.Canceled}, ErrClientCanceled, StatusClientClosedRequest}, - {"rpc cancelled", + { + "rpc cancelled", status.New(codes.Canceled, context.Canceled.Error()).Err(), "rpc error: code = Canceled desc = context canceled", - http.StatusInternalServerError}, - {"rpc cancelled multi", + http.StatusInternalServerError, + }, + { + "rpc cancelled multi", util.MultiError{status.New(codes.Canceled, context.Canceled.Error()).Err(), status.New(codes.Canceled, context.Canceled.Error()).Err()}, "2 errors: rpc error: code = Canceled desc = context canceled; rpc error: code = Canceled desc = context canceled", - http.StatusInternalServerError}, - {"mixed context and rpc cancelled", + http.StatusInternalServerError, + }, + { + "mixed context and rpc cancelled", util.MultiError{context.Canceled, status.New(codes.Canceled, context.Canceled.Error()).Err()}, "2 errors: context canceled; rpc error: code = Canceled desc = context canceled", - http.StatusInternalServerError}, - {"mixed context, rpc cancelled and another", + http.StatusInternalServerError, + }, + { + "mixed context, rpc cancelled and another", util.MultiError{errors.New("standard error"), context.Canceled, status.New(codes.Canceled, context.Canceled.Error()).Err()}, "3 errors: standard error; context canceled; rpc error: code = Canceled desc = context canceled", - http.StatusInternalServerError}, + http.StatusInternalServerError, + }, {"cancelled storage", promql.ErrStorage{Err: context.Canceled}, ErrClientCanceled, StatusClientClosedRequest}, {"orgid", user.ErrNoOrgID, user.ErrNoOrgID.Error(), http.StatusBadRequest}, {"deadline", context.DeadlineExceeded, ErrDeadlineExceeded, http.StatusGatewayTimeout}, {"deadline multi", util.MultiError{context.DeadlineExceeded, context.DeadlineExceeded}, ErrDeadlineExceeded, http.StatusGatewayTimeout}, {"rpc deadline", status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), ErrDeadlineExceeded, http.StatusGatewayTimeout}, - {"rpc deadline multi", - util.MultiError{status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), - status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()}, + { + "rpc deadline multi", + util.MultiError{ + status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), + status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err(), + }, ErrDeadlineExceeded, - http.StatusGatewayTimeout}, - {"mixed context and rpc deadline", + http.StatusGatewayTimeout, + }, + { + "mixed context and rpc deadline", util.MultiError{context.DeadlineExceeded, status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()}, ErrDeadlineExceeded, - http.StatusGatewayTimeout}, - {"mixed context, rpc deadline and another", + http.StatusGatewayTimeout, + }, + { + "mixed context, rpc deadline and another", util.MultiError{errors.New("standard error"), context.DeadlineExceeded, status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()).Err()}, "3 errors: standard error; context deadline exceeded; rpc error: code = DeadlineExceeded desc = context deadline exceeded", - http.StatusInternalServerError}, + http.StatusInternalServerError, + }, {"parse error", logqlmodel.ParseError{}, "parse error : ", http.StatusBadRequest}, {"httpgrpc", httpgrpc.Errorf(http.StatusBadRequest, errors.New("foo").Error()), "foo", http.StatusBadRequest}, {"internal", errors.New("foo"), "foo", http.StatusInternalServerError}, {"query error", chunk.ErrQueryMustContainMetricName, chunk.ErrQueryMustContainMetricName.Error(), http.StatusBadRequest}, - {"wrapped query error", + { + "wrapped query error", fmt.Errorf("wrapped: %w", chunk.ErrQueryMustContainMetricName), "wrapped: " + chunk.ErrQueryMustContainMetricName.Error(), - http.StatusBadRequest}, - {"multi mixed", + http.StatusBadRequest, + }, + { + "multi mixed", util.MultiError{context.Canceled, context.DeadlineExceeded}, "2 errors: context canceled; context deadline exceeded", - http.StatusInternalServerError}, + http.StatusInternalServerError, + }, } { t.Run(tt.name, func(t *testing.T) { rec := httptest.NewRecorder() WriteError(tt.err, rec) res := &ErrorResponseBody{} - json.NewDecoder(rec.Result().Body).Decode(res) + _ = json.NewDecoder(rec.Result().Body).Decode(res) require.Equal(t, tt.expectedStatus, res.Code) require.Equal(t, tt.expectedStatus, rec.Result().StatusCode) require.Equal(t, tt.expectedMsg, res.Message) diff --git a/vendor/github.com/google/uuid/null.go b/vendor/github.com/google/uuid/null.go deleted file mode 100644 index d7fcbf2865169..0000000000000 --- a/vendor/github.com/google/uuid/null.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2021 Google Inc. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package uuid - -import ( - "bytes" - "database/sql/driver" - "encoding/json" - "fmt" -) - -var jsonNull = []byte("null") - -// NullUUID represents a UUID that may be null. -// NullUUID implements the SQL driver.Scanner interface so -// it can be used as a scan destination: -// -// var u uuid.NullUUID -// err := db.QueryRow("SELECT name FROM foo WHERE id=?", id).Scan(&u) -// ... -// if u.Valid { -// // use u.UUID -// } else { -// // NULL value -// } -// -type NullUUID struct { - UUID UUID - Valid bool // Valid is true if UUID is not NULL -} - -// Scan implements the SQL driver.Scanner interface. -func (nu *NullUUID) Scan(value interface{}) error { - if value == nil { - nu.UUID, nu.Valid = Nil, false - return nil - } - - err := nu.UUID.Scan(value) - if err != nil { - nu.Valid = false - return err - } - - nu.Valid = true - return nil -} - -// Value implements the driver Valuer interface. -func (nu NullUUID) Value() (driver.Value, error) { - if !nu.Valid { - return nil, nil - } - // Delegate to UUID Value function - return nu.UUID.Value() -} - -// MarshalBinary implements encoding.BinaryMarshaler. -func (nu NullUUID) MarshalBinary() ([]byte, error) { - if nu.Valid { - return nu.UUID[:], nil - } - - return []byte(nil), nil -} - -// UnmarshalBinary implements encoding.BinaryUnmarshaler. -func (nu *NullUUID) UnmarshalBinary(data []byte) error { - if len(data) != 16 { - return fmt.Errorf("invalid UUID (got %d bytes)", len(data)) - } - copy(nu.UUID[:], data) - nu.Valid = true - return nil -} - -// MarshalText implements encoding.TextMarshaler. -func (nu NullUUID) MarshalText() ([]byte, error) { - if nu.Valid { - return nu.UUID.MarshalText() - } - - return jsonNull, nil -} - -// UnmarshalText implements encoding.TextUnmarshaler. -func (nu *NullUUID) UnmarshalText(data []byte) error { - id, err := ParseBytes(data) - if err != nil { - nu.Valid = false - return err - } - nu.UUID = id - nu.Valid = true - return nil -} - -// MarshalJSON implements json.Marshaler. -func (nu NullUUID) MarshalJSON() ([]byte, error) { - if nu.Valid { - return json.Marshal(nu.UUID) - } - - return jsonNull, nil -} - -// UnmarshalJSON implements json.Unmarshaler. -func (nu *NullUUID) UnmarshalJSON(data []byte) error { - if bytes.Equal(data, jsonNull) { - *nu = NullUUID{} - return nil // valid null UUID - } - err := json.Unmarshal(data, &nu.UUID) - nu.Valid = err == nil - return err -} diff --git a/vendor/github.com/google/uuid/uuid.go b/vendor/github.com/google/uuid/uuid.go index a57207aeb6fd8..60d26bb50c6a9 100644 --- a/vendor/github.com/google/uuid/uuid.go +++ b/vendor/github.com/google/uuid/uuid.go @@ -12,7 +12,6 @@ import ( "fmt" "io" "strings" - "sync" ) // A UUID is a 128 bit (16 byte) Universal Unique IDentifier as defined in RFC @@ -34,15 +33,7 @@ const ( Future // Reserved for future definition. ) -const randPoolSize = 16 * 16 - -var ( - rander = rand.Reader // random function - poolEnabled = false - poolMu sync.Mutex - poolPos = randPoolSize // protected with poolMu - pool [randPoolSize]byte // protected with poolMu -) +var rander = rand.Reader // random function type invalidLengthError struct{ len int } @@ -50,12 +41,6 @@ func (err invalidLengthError) Error() string { return fmt.Sprintf("invalid UUID length: %d", err.len) } -// IsInvalidLengthError is matcher function for custom error invalidLengthError -func IsInvalidLengthError(err error) bool { - _, ok := err.(invalidLengthError) - return ok -} - // Parse decodes s into a UUID or returns an error. Both the standard UUID // forms of xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx and // urn:uuid:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx are decoded as well as the @@ -264,31 +249,3 @@ func SetRand(r io.Reader) { } rander = r } - -// EnableRandPool enables internal randomness pool used for Random -// (Version 4) UUID generation. The pool contains random bytes read from -// the random number generator on demand in batches. Enabling the pool -// may improve the UUID generation throughput significantly. -// -// Since the pool is stored on the Go heap, this feature may be a bad fit -// for security sensitive applications. -// -// Both EnableRandPool and DisableRandPool are not thread-safe and should -// only be called when there is no possibility that New or any other -// UUID Version 4 generation function will be called concurrently. -func EnableRandPool() { - poolEnabled = true -} - -// DisableRandPool disables the randomness pool if it was previously -// enabled with EnableRandPool. -// -// Both EnableRandPool and DisableRandPool are not thread-safe and should -// only be called when there is no possibility that New or any other -// UUID Version 4 generation function will be called concurrently. -func DisableRandPool() { - poolEnabled = false - defer poolMu.Unlock() - poolMu.Lock() - poolPos = randPoolSize -} diff --git a/vendor/github.com/google/uuid/version4.go b/vendor/github.com/google/uuid/version4.go index 7697802e4d16b..86160fbd0725f 100644 --- a/vendor/github.com/google/uuid/version4.go +++ b/vendor/github.com/google/uuid/version4.go @@ -27,8 +27,6 @@ func NewString() string { // The strength of the UUIDs is based on the strength of the crypto/rand // package. // -// Uses the randomness pool if it was enabled with EnableRandPool. -// // A note about uniqueness derived from the UUID Wikipedia entry: // // Randomly generated UUIDs have 122 random bits. One's annual risk of being @@ -37,10 +35,7 @@ func NewString() string { // equivalent to the odds of creating a few tens of trillions of UUIDs in a // year and having one duplicate. func NewRandom() (UUID, error) { - if !poolEnabled { - return NewRandomFromReader(rander) - } - return newRandomFromPool() + return NewRandomFromReader(rander) } // NewRandomFromReader returns a UUID based on bytes read from a given io.Reader. @@ -54,23 +49,3 @@ func NewRandomFromReader(r io.Reader) (UUID, error) { uuid[8] = (uuid[8] & 0x3f) | 0x80 // Variant is 10 return uuid, nil } - -func newRandomFromPool() (UUID, error) { - var uuid UUID - poolMu.Lock() - if poolPos == randPoolSize { - _, err := io.ReadFull(rander, pool[:]) - if err != nil { - poolMu.Unlock() - return Nil, err - } - poolPos = 0 - } - copy(uuid[:], pool[poolPos:(poolPos+16)]) - poolPos += 16 - poolMu.Unlock() - - uuid[6] = (uuid[6] & 0x0f) | 0x40 // Version 4 - uuid[8] = (uuid[8] & 0x3f) | 0x80 // Variant is 10 - return uuid, nil -} diff --git a/vendor/github.com/grafana/dskit/backoff/backoff.go b/vendor/github.com/grafana/dskit/backoff/backoff.go index 2146f3b928e3b..c5d454715974d 100644 --- a/vendor/github.com/grafana/dskit/backoff/backoff.go +++ b/vendor/github.com/grafana/dskit/backoff/backoff.go @@ -10,9 +10,9 @@ import ( // Config configures a Backoff type Config struct { - MinBackoff time.Duration `yaml:"min_period"` // start backoff at this level - MaxBackoff time.Duration `yaml:"max_period"` // increase exponentially to this level - MaxRetries int `yaml:"max_retries"` // give up after this many; zero means infinite retries + MinBackoff time.Duration `yaml:"min_period" category:"advanced"` // start backoff at this level + MaxBackoff time.Duration `yaml:"max_period" category:"advanced"` // increase exponentially to this level + MaxRetries int `yaml:"max_retries" category:"advanced"` // give up after this many; zero means infinite retries } // RegisterFlagsWithPrefix for Config. diff --git a/vendor/github.com/grafana/dskit/concurrency/runner.go b/vendor/github.com/grafana/dskit/concurrency/runner.go index a6740f3ac9c44..023be10d7a0a3 100644 --- a/vendor/github.com/grafana/dskit/concurrency/runner.go +++ b/vendor/github.com/grafana/dskit/concurrency/runner.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" "github.com/grafana/dskit/internal/math" @@ -62,45 +63,53 @@ func ForEachUser(ctx context.Context, userIDs []string, concurrency int, userFun // ForEach runs the provided jobFunc for each job up to concurrency concurrent workers. // The execution breaks on first error encountered. +// +// Deprecated: use ForEachJob instead. func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc func(ctx context.Context, job interface{}) error) error { - if len(jobs) == 0 { - return nil + return ForEachJob(ctx, len(jobs), concurrency, func(ctx context.Context, idx int) error { + return jobFunc(ctx, jobs[idx]) + }) +} + +// CreateJobsFromStrings is an utility to create jobs from an slice of strings. +// +// Deprecated: will be removed as it's not needed when using ForEachJob. +func CreateJobsFromStrings(values []string) []interface{} { + jobs := make([]interface{}, len(values)) + for i := 0; i < len(values); i++ { + jobs[i] = values[i] } + return jobs +} - // Push all jobs to a channel. - ch := make(chan interface{}, len(jobs)) - for _, job := range jobs { - ch <- job +// ForEachJob runs the provided jobFunc for each job index in [0, jobs) up to concurrency concurrent workers. +// The execution breaks on first error encountered. +func ForEachJob(ctx context.Context, jobs int, concurrency int, jobFunc func(ctx context.Context, idx int) error) error { + if jobs == 0 { + return nil } - close(ch) + + // Initialise indexes with -1 so first Inc() returns index 0. + indexes := atomic.NewInt64(-1) // Start workers to process jobs. g, ctx := errgroup.WithContext(ctx) - for ix := 0; ix < math.Min(concurrency, len(jobs)); ix++ { + for ix := 0; ix < math.Min(concurrency, jobs); ix++ { g.Go(func() error { - for job := range ch { - if err := ctx.Err(); err != nil { - return err + for ctx.Err() == nil { + idx := int(indexes.Inc()) + if idx >= jobs { + return nil } - if err := jobFunc(ctx, job); err != nil { + if err := jobFunc(ctx, idx); err != nil { return err } } - - return nil + return ctx.Err() }) } // Wait until done (or context has canceled). return g.Wait() } - -// CreateJobsFromStrings is an utility to create jobs from an slice of strings. -func CreateJobsFromStrings(values []string) []interface{} { - jobs := make([]interface{}, len(values)) - for i := 0; i < len(values); i++ { - jobs[i] = values[i] - } - return jobs -} diff --git a/vendor/github.com/grafana/dskit/crypto/tls/tls.go b/vendor/github.com/grafana/dskit/crypto/tls/tls.go index a6fa46f073269..1588edc8939a4 100644 --- a/vendor/github.com/grafana/dskit/crypto/tls/tls.go +++ b/vendor/github.com/grafana/dskit/crypto/tls/tls.go @@ -13,11 +13,11 @@ import ( // ClientConfig is the config for client TLS. type ClientConfig struct { - CertPath string `yaml:"tls_cert_path"` - KeyPath string `yaml:"tls_key_path"` - CAPath string `yaml:"tls_ca_path"` - ServerName string `yaml:"tls_server_name"` - InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify"` + CertPath string `yaml:"tls_cert_path" category:"advanced"` + KeyPath string `yaml:"tls_key_path" category:"advanced"` + CAPath string `yaml:"tls_ca_path" category:"advanced"` + ServerName string `yaml:"tls_server_name" category:"advanced"` + InsecureSkipVerify bool `yaml:"tls_insecure_skip_verify" category:"advanced"` } var ( diff --git a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go index 094337f5d2c13..e7d93b64ec52b 100644 --- a/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go +++ b/vendor/github.com/grafana/dskit/grpcclient/grpcclient.go @@ -18,16 +18,16 @@ import ( // Config for a gRPC client. type Config struct { - MaxRecvMsgSize int `yaml:"max_recv_msg_size"` - MaxSendMsgSize int `yaml:"max_send_msg_size"` - GRPCCompression string `yaml:"grpc_compression"` - RateLimit float64 `yaml:"rate_limit"` - RateLimitBurst int `yaml:"rate_limit_burst"` + MaxRecvMsgSize int `yaml:"max_recv_msg_size" category:"advanced"` + MaxSendMsgSize int `yaml:"max_send_msg_size" category:"advanced"` + GRPCCompression string `yaml:"grpc_compression" category:"advanced"` + RateLimit float64 `yaml:"rate_limit" category:"advanced"` + RateLimitBurst int `yaml:"rate_limit_burst" category:"advanced"` - BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"` + BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits" category:"advanced"` BackoffConfig backoff.Config `yaml:"backoff_config"` - TLSEnabled bool `yaml:"tls_enabled"` + TLSEnabled bool `yaml:"tls_enabled" category:"advanced"` TLS tls.ClientConfig `yaml:",inline"` } diff --git a/vendor/github.com/grafana/dskit/kv/client.go b/vendor/github.com/grafana/dskit/kv/client.go index b73620dfd6aef..42bf5595461cb 100644 --- a/vendor/github.com/grafana/dskit/kv/client.go +++ b/vendor/github.com/grafana/dskit/kv/client.go @@ -33,8 +33,10 @@ func (r *role) Labels() prometheus.Labels { // The NewInMemoryKVClient returned by NewClient() is a singleton, so // that distributors and ingesters started in the same process can // find themselves. -var inmemoryStoreInit sync.Once -var inmemoryStore Client +var ( + inmemoryStoreInit sync.Once + inmemoryStore *consul.Client +) // StoreConfig is a configuration used for building single store client, either // Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep @@ -53,7 +55,7 @@ type StoreConfig struct { // where store can be consul or inmemory. type Config struct { Store string `yaml:"store"` - Prefix string `yaml:"prefix"` + Prefix string `yaml:"prefix" category:"advanced"` StoreConfig `yaml:",inline"` Mock Client `yaml:"-"` @@ -76,8 +78,14 @@ func (cfg *Config) RegisterFlagsWithPrefix(flagsPrefix, defaultPrefix string, f if flagsPrefix == "" { flagsPrefix = "ring." } + + // Allow clients to override default store by setting it before calling this method. + if cfg.Store == "" { + cfg.Store = "consul" + } + f.StringVar(&cfg.Prefix, flagsPrefix+"prefix", defaultPrefix, "The prefix for the keys in the store. Should end with a /.") - f.StringVar(&cfg.Store, flagsPrefix+"store", "consul", "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi.") + f.StringVar(&cfg.Store, flagsPrefix+"store", cfg.Store, "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi.") } // Client is a high-level client for key-value stores (such as Etcd and @@ -140,7 +148,8 @@ func createClient(backend string, prefix string, cfg StoreConfig, codec codec.Co inmemoryStoreInit.Do(func() { inmemoryStore, _ = consul.NewInMemoryClient(codec, logger, reg) }) - client = inmemoryStore + // however we swap the codec so that we can encode different type of values. + client = inmemoryStore.WithCodec(codec) case "memberlist": kv, err := cfg.MemberlistKV() diff --git a/vendor/github.com/grafana/dskit/kv/consul/client.go b/vendor/github.com/grafana/dskit/kv/consul/client.go index 69219cf7488f2..63114c547b69c 100644 --- a/vendor/github.com/grafana/dskit/kv/consul/client.go +++ b/vendor/github.com/grafana/dskit/kv/consul/client.go @@ -40,11 +40,11 @@ var ( // Config to create a ConsulClient type Config struct { Host string `yaml:"host"` - ACLToken string `yaml:"acl_token"` - HTTPClientTimeout time.Duration `yaml:"http_client_timeout"` - ConsistentReads bool `yaml:"consistent_reads"` - WatchKeyRateLimit float64 `yaml:"watch_rate_limit"` // Zero disables rate limit - WatchKeyBurstSize int `yaml:"watch_burst_size"` // Burst when doing rate-limit, defaults to 1 + ACLToken string `yaml:"acl_token" category:"advanced"` + HTTPClientTimeout time.Duration `yaml:"http_client_timeout" category:"advanced"` + ConsistentReads bool `yaml:"consistent_reads" category:"advanced"` + WatchKeyRateLimit float64 `yaml:"watch_rate_limit" category:"advanced"` // Zero disables rate limit + WatchKeyBurstSize int `yaml:"watch_burst_size" category:"advanced"` // Burst when doing rate-limit, defaults to 1 // Used in tests only. MaxCasRetries int `yaml:"-"` @@ -234,7 +234,6 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b } kvp, meta, err := c.kv.Get(key, queryOptions.WithContext(ctx)) - // Don't backoff if value is not found (kvp == nil). In that case, Consul still returns index value, // and next call to Get will block as expected. We handle missing value below. if err != nil { @@ -397,3 +396,10 @@ func (c *Client) createRateLimiter() *rate.Limiter { } return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst) } + +// WithCodec Clones and changes the codec of the consul client. +func (c *Client) WithCodec(codec codec.Codec) *Client { + new := *c + new.codec = codec + return &new +} diff --git a/vendor/github.com/grafana/dskit/kv/etcd/etcd.go b/vendor/github.com/grafana/dskit/kv/etcd/etcd.go index fa6944d4f5206..0661fc5daa1d9 100644 --- a/vendor/github.com/grafana/dskit/kv/etcd/etcd.go +++ b/vendor/github.com/grafana/dskit/kv/etcd/etcd.go @@ -22,9 +22,9 @@ import ( // Config for a new etcd.Client. type Config struct { Endpoints []string `yaml:"endpoints"` - DialTimeout time.Duration `yaml:"dial_timeout"` - MaxRetries int `yaml:"max_retries"` - EnableTLS bool `yaml:"tls_enabled"` + DialTimeout time.Duration `yaml:"dial_timeout" category:"advanced"` + MaxRetries int `yaml:"max_retries" category:"advanced"` + EnableTLS bool `yaml:"tls_enabled" category:"advanced"` TLS dstls.ClientConfig `yaml:",inline"` UserName string `yaml:"username"` diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index d7ad176d0e5cf..30f0992d35213 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -124,16 +124,16 @@ func (c *Client) awaitKVRunningOrStopping(ctx context.Context) error { // KVConfig is a config for memberlist.KV type KVConfig struct { // Memberlist options. - NodeName string `yaml:"node_name"` - RandomizeNodeName bool `yaml:"randomize_node_name"` - StreamTimeout time.Duration `yaml:"stream_timeout"` - RetransmitMult int `yaml:"retransmit_factor"` - PushPullInterval time.Duration `yaml:"pull_push_interval"` - GossipInterval time.Duration `yaml:"gossip_interval"` - GossipNodes int `yaml:"gossip_nodes"` - GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time"` - DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time"` - EnableCompression bool `yaml:"compression_enabled"` + NodeName string `yaml:"node_name" category:"advanced"` + RandomizeNodeName bool `yaml:"randomize_node_name" category:"advanced"` + StreamTimeout time.Duration `yaml:"stream_timeout" category:"advanced"` + RetransmitMult int `yaml:"retransmit_factor" category:"advanced"` + PushPullInterval time.Duration `yaml:"pull_push_interval" category:"advanced"` + GossipInterval time.Duration `yaml:"gossip_interval" category:"advanced"` + GossipNodes int `yaml:"gossip_nodes" category:"advanced"` + GossipToTheDeadTime time.Duration `yaml:"gossip_to_dead_nodes_time" category:"advanced"` + DeadNodeReclaimTime time.Duration `yaml:"dead_node_reclaim_time" category:"advanced"` + EnableCompression bool `yaml:"compression_enabled" category:"advanced"` // ip:port to advertise other cluster members. Used for NAT traversal AdvertiseAddr string `yaml:"advertise_addr"` @@ -141,20 +141,20 @@ type KVConfig struct { // List of members to join JoinMembers flagext.StringSlice `yaml:"join_members"` - MinJoinBackoff time.Duration `yaml:"min_join_backoff"` - MaxJoinBackoff time.Duration `yaml:"max_join_backoff"` - MaxJoinRetries int `yaml:"max_join_retries"` + MinJoinBackoff time.Duration `yaml:"min_join_backoff" category:"advanced"` + MaxJoinBackoff time.Duration `yaml:"max_join_backoff" category:"advanced"` + MaxJoinRetries int `yaml:"max_join_retries" category:"advanced"` AbortIfJoinFails bool `yaml:"abort_if_cluster_join_fails"` - RejoinInterval time.Duration `yaml:"rejoin_interval"` + RejoinInterval time.Duration `yaml:"rejoin_interval" category:"advanced"` // Remove LEFT ingesters from ring after this timeout. - LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout"` + LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout" category:"advanced"` // Timeout used when leaving the memberlist cluster. - LeaveTimeout time.Duration `yaml:"leave_timeout"` + LeaveTimeout time.Duration `yaml:"leave_timeout" category:"advanced"` // How much space to use to keep received and sent messages in memory (for troubleshooting). - MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes"` + MessageHistoryBufferBytes int `yaml:"message_history_buffer_bytes" category:"advanced"` TCPTransport TCPTransportConfig `yaml:",inline"` diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go index 4265a3b223221..eb5451878359c 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go @@ -45,10 +45,10 @@ type TCPTransportConfig struct { // Timeout used when making connections to other nodes to send packet. // Zero = no timeout - PacketDialTimeout time.Duration `yaml:"packet_dial_timeout"` + PacketDialTimeout time.Duration `yaml:"packet_dial_timeout" category:"advanced"` // Timeout for writing packet data. Zero = no timeout. - PacketWriteTimeout time.Duration `yaml:"packet_write_timeout"` + PacketWriteTimeout time.Duration `yaml:"packet_write_timeout" category:"advanced"` // Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on TransportDebug bool `yaml:"-"` @@ -57,7 +57,7 @@ type TCPTransportConfig struct { MetricsRegisterer prometheus.Registerer `yaml:"-"` MetricsNamespace string `yaml:"-"` - TLSEnabled bool `yaml:"tls_enabled"` + TLSEnabled bool `yaml:"tls_enabled" category:"advanced"` TLS dstls.ClientConfig `yaml:",inline"` } diff --git a/vendor/github.com/grafana/dskit/kv/multi.go b/vendor/github.com/grafana/dskit/kv/multi.go index 8a3382e9859fc..9a9c24bb834ac 100644 --- a/vendor/github.com/grafana/dskit/kv/multi.go +++ b/vendor/github.com/grafana/dskit/kv/multi.go @@ -16,11 +16,11 @@ import ( // MultiConfig is a configuration for MultiClient. type MultiConfig struct { - Primary string `yaml:"primary"` - Secondary string `yaml:"secondary"` + Primary string `yaml:"primary" category:"advanced"` + Secondary string `yaml:"secondary" category:"advanced"` - MirrorEnabled bool `yaml:"mirror_enabled"` - MirrorTimeout time.Duration `yaml:"mirror_timeout"` + MirrorEnabled bool `yaml:"mirror_enabled" category:"advanced"` + MirrorTimeout time.Duration `yaml:"mirror_timeout" category:"advanced"` // ConfigProvider returns channel with MultiRuntimeConfig updates. ConfigProvider func() <-chan MultiRuntimeConfig `yaml:"-"` diff --git a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go index 726a85430d32b..32775c98291ce 100644 --- a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go @@ -3,6 +3,7 @@ package ring import ( "context" "fmt" + "net/http" "sort" "sync" "time" @@ -491,3 +492,20 @@ func (l *BasicLifecycler) run(fn func() error) error { return <-errCh } } + +func (l *BasicLifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { + return l.store.CAS(ctx, l.ringKey, f) +} + +func (l *BasicLifecycler) getRing(ctx context.Context) (*Desc, error) { + obj, err := l.store.Get(ctx, l.ringKey) + if err != nil { + return nil, err + } + + return GetOrCreateRingDesc(obj), nil +} + +func (l *BasicLifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + newRingPageHandler(l, l.cfg.HeartbeatPeriod).handle(w, req) +} diff --git a/vendor/github.com/grafana/dskit/ring/http.go b/vendor/github.com/grafana/dskit/ring/http.go index f23f08b81241e..1d6c10e801df2 100644 --- a/vendor/github.com/grafana/dskit/ring/http.go +++ b/vendor/github.com/grafana/dskit/ring/http.go @@ -10,8 +10,6 @@ import ( "sort" "strings" "time" - - "github.com/go-kit/log/level" ) const pageContent = ` @@ -90,19 +88,6 @@ func init() { pageTemplate = template.Must(t.Parse(pageContent)) } -func (r *Ring) forget(ctx context.Context, id string) error { - unregister := func(in interface{}) (out interface{}, retry bool, err error) { - if in == nil { - return nil, false, fmt.Errorf("found empty ring when trying to unregister") - } - - ringDesc := in.(*Desc) - ringDesc.RemoveIngester(id) - return ringDesc, true, nil - } - return r.KVClient.CAS(ctx, r.key, unregister) -} - type ingesterDesc struct { ID string `json:"id"` State string `json:"state"` @@ -121,11 +106,33 @@ type httpResponse struct { ShowTokens bool `json:"-"` } -func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { +type ringAccess interface { + casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error + getRing(context.Context) (*Desc, error) +} + +type ringPageHandler struct { + r ringAccess + heartbeatPeriod time.Duration +} + +func newRingPageHandler(r ringAccess, heartbeatPeriod time.Duration) *ringPageHandler { + return &ringPageHandler{ + r: r, + heartbeatPeriod: heartbeatPeriod, + } +} + +func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { if req.Method == http.MethodPost { ingesterID := req.FormValue("forget") - if err := r.forget(req.Context(), ingesterID); err != nil { - level.Error(r.logger).Log("msg", "error forgetting instance", "err", err) + if err := h.forget(req.Context(), ingesterID); err != nil { + http.Error( + w, + fmt.Errorf("error forgetting instance '%s': %w", ingesterID, err).Error(), + http.StatusInternalServerError, + ) + return } // Implement PRG pattern to prevent double-POST and work with CSRF middleware. @@ -140,23 +147,26 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - r.mtx.RLock() - defer r.mtx.RUnlock() + ringDesc, err := h.r.getRing(req.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + _, ownedTokens := ringDesc.countTokens() ingesterIDs := []string{} - for id := range r.ringDesc.Ingesters { + for id := range ringDesc.Ingesters { ingesterIDs = append(ingesterIDs, id) } sort.Strings(ingesterIDs) now := time.Now() var ingesters []ingesterDesc - _, owned := r.countTokens() for _, id := range ingesterIDs { - ing := r.ringDesc.Ingesters[id] + ing := ringDesc.Ingesters[id] heartbeatTimestamp := time.Unix(ing.Timestamp, 0) state := ing.State.String() - if !r.IsHealthy(&ing, Reporting, now) { + if !ing.IsHealthy(Reporting, h.heartbeatPeriod, now) { state = unhealthy } @@ -175,7 +185,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { Tokens: ing.Tokens, Zone: ing.Zone, NumTokens: len(ing.Tokens), - Ownership: (float64(owned[id]) / float64(math.MaxUint32)) * 100, + Ownership: (float64(ownedTokens[id]) / float64(math.MaxUint32)) * 100, }) } @@ -203,6 +213,19 @@ func renderHTTPResponse(w http.ResponseWriter, v httpResponse, t *template.Templ } } +func (h *ringPageHandler) forget(ctx context.Context, id string) error { + unregister := func(in interface{}) (out interface{}, retry bool, err error) { + if in == nil { + return nil, false, fmt.Errorf("found empty ring when trying to unregister") + } + + ringDesc := in.(*Desc) + ringDesc.RemoveIngester(id) + return ringDesc, true, nil + } + return h.r.casRing(ctx, unregister) +} + // WriteJSONResponse writes some JSON as a HTTP response. func writeJSONResponse(w http.ResponseWriter, v httpResponse) { w.Header().Set("Content-Type", "application/json") diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index be103e1fbad79..92ad34608f463 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "net/http" "os" "sort" "sync" @@ -26,17 +27,20 @@ type LifecyclerConfig struct { RingConfig Config `yaml:"ring"` // Config for the ingester lifecycle control - NumTokens int `yaml:"num_tokens"` - HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` - ObservePeriod time.Duration `yaml:"observe_period"` - JoinAfter time.Duration `yaml:"join_after"` - MinReadyDuration time.Duration `yaml:"min_ready_duration"` - InfNames []string `yaml:"interface_names"` - FinalSleep time.Duration `yaml:"final_sleep"` + NumTokens int `yaml:"num_tokens" category:"advanced"` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period" category:"advanced"` + ObservePeriod time.Duration `yaml:"observe_period" category:"advanced"` + JoinAfter time.Duration `yaml:"join_after" category:"advanced"` + MinReadyDuration time.Duration `yaml:"min_ready_duration" category:"advanced"` + InfNames []string `yaml:"interface_names"` + + // FinalSleep's default value can be overridden by + // setting it before calling RegisterFlags or RegisterFlagsWithPrefix. + FinalSleep time.Duration `yaml:"final_sleep" category:"advanced"` TokensFilePath string `yaml:"tokens_file_path"` Zone string `yaml:"availability_zone"` - UnregisterOnShutdown bool `yaml:"unregister_on_shutdown"` - ReadinessCheckRingHealth bool `yaml:"readiness_check_ring_health"` + UnregisterOnShutdown bool `yaml:"unregister_on_shutdown" category:"advanced"` + ReadinessCheckRingHealth bool `yaml:"readiness_check_ring_health" category:"advanced"` // For testing, you can override the address and ID of this ingester Addr string `yaml:"address" doc:"hidden"` @@ -47,12 +51,14 @@ type LifecyclerConfig struct { ListenPort int `yaml:"-"` } -// RegisterFlags adds the flags required to config this to the given FlagSet +// RegisterFlags adds the flags required to config this to the given FlagSet. +// The default values of some flags can be changed; see docs of LifecyclerConfig. func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", f) } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet. +// The default values of some flags can be changed; see docs of LifecyclerConfig. func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.RingConfig.RegisterFlagsWithPrefix(prefix, f) @@ -67,7 +73,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.") f.DurationVar(&cfg.ObservePeriod, prefix+"observe-period", 0*time.Second, "Observe tokens after generating to resolve collisions. Useful when using gossiping ring.") f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 15*time.Second, "Minimum duration to wait after the internal readiness checks have passed but before succeeding the readiness endpoint. This is used to slowdown deployment controllers (eg. Kubernetes) after an instance is ready and before they proceed with a rolling update, to give the rest of the cluster instances enough time to receive ring updates.") - f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.") + f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", cfg.FinalSleep, "Duration to sleep for before exiting, to ensure metrics are scraped.") f.StringVar(&cfg.TokensFilePath, prefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") hostname, err := os.Hostname() @@ -849,6 +855,23 @@ func (i *Lifecycler) processShutdown(ctx context.Context) { time.Sleep(i.cfg.FinalSleep) } +func (i *Lifecycler) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { + return i.KVStore.CAS(ctx, i.RingKey, f) +} + +func (i *Lifecycler) getRing(ctx context.Context) (*Desc, error) { + obj, err := i.KVStore.Get(ctx, i.RingKey) + if err != nil { + return nil, err + } + + return GetOrCreateRingDesc(obj), nil +} + +func (i *Lifecycler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + newRingPageHandler(i, i.cfg.HeartbeatPeriod).handle(w, req) +} + // unregister removes our entry from consul. func (i *Lifecycler) unregister(ctx context.Context) error { level.Debug(i.logger).Log("msg", "unregistering instance from ring", "ring", i.RingName) diff --git a/vendor/github.com/grafana/dskit/ring/replication_set.go b/vendor/github.com/grafana/dskit/ring/replication_set.go index 461429d6fa872..b73227136d300 100644 --- a/vendor/github.com/grafana/dskit/ring/replication_set.go +++ b/vendor/github.com/grafana/dskit/ring/replication_set.go @@ -20,8 +20,9 @@ type ReplicationSet struct { MaxUnavailableZones int } -// Do function f in parallel for all replicas in the set, erroring is we exceed +// Do function f in parallel for all replicas in the set, erroring if we exceed // MaxErrors and returning early otherwise. +// Return a slice of all results from f, or nil if an error occurred. func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) { type instanceResult struct { res interface{} diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index 6aaf165bf9702..5553c6b72121a 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -8,11 +8,13 @@ import ( "fmt" "math" "math/rand" + "net/http" "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -513,27 +515,32 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro } // countTokens returns the number of tokens and tokens within the range for each instance. -// The ring read lock must be already taken when calling this function. -func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) { - owned := map[string]uint32{} - numTokens := map[string]uint32{} - for i, token := range r.ringTokens { +func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { + var ( + owned = map[string]uint32{} + numTokens = map[string]uint32{} + + ringTokens = r.GetTokens() + ringInstanceByToken = r.getTokensInfo() + ) + + for i, token := range ringTokens { var diff uint32 // Compute how many tokens are within the range. - if i+1 == len(r.ringTokens) { - diff = (math.MaxUint32 - token) + r.ringTokens[0] + if i+1 == len(ringTokens) { + diff = (math.MaxUint32 - token) + ringTokens[0] } else { - diff = r.ringTokens[i+1] - token + diff = ringTokens[i+1] - token } - info := r.ringInstanceByToken[token] + info := ringInstanceByToken[token] numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1 owned[info.InstanceID] = owned[info.InstanceID] + diff } // Set to 0 the number of owned tokens by instances which don't have tokens yet. - for id := range r.ringDesc.Ingesters { + for id := range r.Ingesters { if _, ok := owned[id]; !ok { owned[id] = 0 numTokens[id] = 0 @@ -582,7 +589,7 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { prevOwners := r.reportedOwners r.reportedOwners = make(map[string]struct{}) - numTokens, ownedRange := r.countTokens() + numTokens, ownedRange := r.ringDesc.countTokens() for id, totalOwned := range ownedRange { r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) @@ -840,6 +847,23 @@ func (r *Ring) CleanupShuffleShardCache(identifier string) { } } +func (r *Ring) casRing(ctx context.Context, f func(in interface{}) (out interface{}, retry bool, err error)) error { + return r.KVClient.CAS(ctx, r.key, f) +} + +func (r *Ring) getRing(ctx context.Context) (*Desc, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + ringDesc := proto.Clone(r.ringDesc).(*Desc) + + return ringDesc, nil +} + +func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { + newRingPageHandler(r, r.cfg.HeartbeatTimeout).handle(w, req) +} + // Operation describes which instances can be included in the replica set, based on their state. // // Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states. diff --git a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go index e5da50bc7a324..a7f29ab8cd88a 100644 --- a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go +++ b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go @@ -26,7 +26,7 @@ type Loader func(r io.Reader) (interface{}, error) // Config holds the config for an Manager instance. // It holds config related to loading per-tenant config. type Config struct { - ReloadPeriod time.Duration `yaml:"period"` + ReloadPeriod time.Duration `yaml:"period" category:"advanced"` // LoadPath contains the path to the runtime config file, requires an // non-empty value LoadPath string `yaml:"file"` diff --git a/vendor/modules.txt b/vendor/modules.txt index 12bcf7b034d66..be72b52c70d25 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -590,7 +590,7 @@ github.com/google/pprof/profile # github.com/google/renameio/v2 v2.0.0 ## explicit; go 1.13 github.com/google/renameio/v2 -# github.com/google/uuid v1.3.0 +# github.com/google/uuid v1.2.0 ## explicit github.com/google/uuid # github.com/googleapis/gax-go/v2 v2.1.1 @@ -623,7 +623,7 @@ github.com/gorilla/mux # github.com/gorilla/websocket v1.4.2 ## explicit; go 1.12 github.com/gorilla/websocket -# github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 +# github.com/grafana/dskit v0.0.0-20220209070952-ea22a8f662d0 ## explicit; go 1.16 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency