diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index b40f771c413..bd64611a0e1 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -1,327 +1,296 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. +// Copyright (c) The Thanos Authors. // Licensed under the Apache License 2.0. package main -import ( - "context" - "io/ioutil" - "net/url" - "os" - "path" - "strings" - "time" - - extflag "github.com/efficientgo/tools/extkingpin" - "github.com/go-kit/log" - "github.com/go-kit/log/level" - grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" - "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags" - "github.com/oklog/run" - "github.com/opentracing/opentracing-go" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/model/relabel" - "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/objstore" - "github.com/thanos-io/objstore/client" - "gopkg.in/yaml.v2" - - "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/exemplars" - "github.com/thanos-io/thanos/pkg/extgrpc" - "github.com/thanos-io/thanos/pkg/extkingpin" - "github.com/thanos-io/thanos/pkg/extprom" - "github.com/thanos-io/thanos/pkg/info" - "github.com/thanos-io/thanos/pkg/info/infopb" - "github.com/thanos-io/thanos/pkg/logging" - "github.com/thanos-io/thanos/pkg/prober" - "github.com/thanos-io/thanos/pkg/receive" - "github.com/thanos-io/thanos/pkg/runutil" - grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" - httpserver "github.com/thanos-io/thanos/pkg/server/http" - "github.com/thanos-io/thanos/pkg/store" - "github.com/thanos-io/thanos/pkg/store/labelpb" - "github.com/thanos-io/thanos/pkg/tls" -) - -func registerReceive(app *extkingpin.App) { - cmd := app.Command(component.Receive.String(), "Accept Prometheus remote write API requests and write to local tsdb.") - - conf := &receiveConfig{} - conf.registerFlag(cmd) - - cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { - lset, err := parseFlagLabels(conf.labelStrs) - if err != nil { - return errors.Wrap(err, "parse labels") - } - - if !model.LabelName.IsValid(model.LabelName(conf.tenantLabelName)) { - return errors.Errorf("unsupported format for tenant label name, got %s", conf.tenantLabelName) - } - if len(lset) == 0 { - return errors.New("no external labels configured for receive, uniquely identifying external labels must be configured (ideally with `receive_` prefix); see https://thanos.io/tip/thanos/storage.md#external-labels for details.") - } +import ( "context" "io/ioutil" "net/url" "os" "path" "strings" "time" + +``` +extflag "github.com/efficientgo/tools/extkingpin" +"github.com/go-kit/log" +"github.com/go-kit/log/level" +grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" +"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tags" +"github.com/oklog/run" +"github.com/opentracing/opentracing-go" +"github.com/pkg/errors" +"github.com/prometheus/client_golang/prometheus" +"github.com/prometheus/client_golang/prometheus/promauto" +"github.com/prometheus/common/model" +"github.com/prometheus/prometheus/model/labels" +"github.com/prometheus/prometheus/model/relabel" +"github.com/prometheus/prometheus/tsdb" +"github.com/thanos-io/objstore" +"github.com/thanos-io/objstore/client" +"gopkg.in/yaml.v2" + +"github.com/thanos-io/thanos/pkg/block/metadata" +"github.com/thanos-io/thanos/pkg/component" +"github.com/thanos-io/thanos/pkg/exemplars" +"github.com/thanos-io/thanos/pkg/extgrpc" +"github.com/thanos-io/thanos/pkg/extkingpin" +"github.com/thanos-io/thanos/pkg/extprom" +"github.com/thanos-io/thanos/pkg/info" +"github.com/thanos-io/thanos/pkg/info/infopb" +"github.com/thanos-io/thanos/pkg/logging" +"github.com/thanos-io/thanos/pkg/prober" +"github.com/thanos-io/thanos/pkg/receive" +"github.com/thanos-io/thanos/pkg/runutil" +grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" +httpserver "github.com/thanos-io/thanos/pkg/server/http" +"github.com/thanos-io/thanos/pkg/store" +"github.com/thanos-io/thanos/pkg/store/labelpb" +"github.com/thanos-io/thanos/pkg/tls" +``` - tagOpts, grpcLogOpts, err := logging.ParsegRPCOptions("", conf.reqLogConfig) - if err != nil { - return errors.Wrap(err, "error while parsing config for request logging") - } - - tsdbOpts := &tsdb.Options{ - MinBlockDuration: int64(time.Duration(*conf.tsdbMinBlockDuration) / time.Millisecond), - MaxBlockDuration: int64(time.Duration(*conf.tsdbMaxBlockDuration) / time.Millisecond), - RetentionDuration: int64(time.Duration(*conf.retention) / time.Millisecond), - NoLockfile: conf.noLockFile, - WALCompression: conf.walCompression, - AllowOverlappingBlocks: conf.tsdbAllowOverlappingBlocks, - MaxExemplars: conf.tsdbMaxExemplars, - EnableExemplarStorage: true, - } - - // Are we running in IngestorOnly, RouterOnly or RouterIngestor mode? - receiveMode := conf.determineMode() - - return runReceive( - g, - logger, - reg, - tracer, - grpcLogOpts, tagOpts, - tsdbOpts, - lset, - component.Receive, - metadata.HashFunc(conf.hashFunc), - receiveMode, - conf, - ) - }) -} +) -func runReceive( - g *run.Group, - logger log.Logger, - reg *prometheus.Registry, - tracer opentracing.Tracer, - grpcLogOpts []grpc_logging.Option, - tagOpts []tags.Option, - tsdbOpts *tsdb.Options, - lset labels.Labels, - comp component.SourceStoreAPI, - hashFunc metadata.HashFunc, - receiveMode receive.ReceiverMode, - conf *receiveConfig, -) error { - logger = log.With(logger, "component", "receive") +func registerReceive(app *extkingpin.App) { cmd := app.Command(component.Receive.String(), "Accept Prometheus remote write API requests and write to local tsdb.") - level.Info(logger).Log("mode", receiveMode, "msg", "running receive") +``` +conf := &receiveConfig{} +conf.registerFlag(cmd) - rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA) +cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { + lset, err := parseFlagLabels(conf.labelStrs) if err != nil { - return err + return errors.Wrap(err, "parse labels") } - dialOpts, err := extgrpc.StoreClientGRPCOpts( - logger, - reg, - tracer, - *conf.grpcCert != "", - *conf.grpcClientCA == "", - conf.rwClientCert, - conf.rwClientKey, - conf.rwClientServerCA, - conf.rwClientServerName, - ) - if err != nil { - return err + if !model.LabelName.IsValid(model.LabelName(conf.tenantLabelName)) { + return errors.Errorf("unsupported format for tenant label name, got %s", conf.tenantLabelName) } - - var bkt objstore.Bucket - confContentYaml, err := conf.objStoreConfig.Content() - if err != nil { - return err + if len(lset) == 0 { + return errors.New("no external labels configured for receive, uniquely identifying external labels must be configured (ideally with `receive_` prefix); see https://thanos.io/tip/thanos/storage.md#external-labels for details.") } - // Has this thanos receive instance been configured to ingest metrics into a local TSDB? - enableIngestion := receiveMode == receive.IngestorOnly || receiveMode == receive.RouterIngestor - - upload := len(confContentYaml) > 0 - if enableIngestion { - if upload { - if tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration { - if !conf.ignoreBlockSize { - return errors.Errorf("found that TSDB Max time is %d and Min time is %d. "+ - "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) - } - level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") - } - // The background shipper continuously scans the data directory and uploads - // new blocks to object storage service. - bkt, err = client.NewBucket(logger, confContentYaml, reg, comp.String()) - if err != nil { - return err - } - } else { - level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled") - } + tagOpts, grpcLogOpts, err := logging.ParsegRPCOptions("", conf.reqLogConfig) + if err != nil { + return errors.Wrap(err, "error while parsing config for request logging") } - // TODO(brancz): remove after a couple of versions - // Migrate non-multi-tsdb capable storage to multi-tsdb disk layout. - if err := migrateLegacyStorage(logger, conf.dataDir, conf.defaultTenantID); err != nil { - return errors.Wrapf(err, "migrate legacy storage in %v to default tenant %v", conf.dataDir, conf.defaultTenantID) + tsdbOpts := &tsdb.Options{ + MinBlockDuration: int64(time.Duration(*conf.tsdbMinBlockDuration) / time.Millisecond), + MaxBlockDuration: int64(time.Duration(*conf.tsdbMaxBlockDuration) / time.Millisecond), + RetentionDuration: int64(time.Duration(*conf.retention) / time.Millisecond), + NoLockfile: conf.noLockFile, + WALCompression: conf.walCompression, + AllowOverlappingBlocks: conf.tsdbAllowOverlappingBlocks, + MaxExemplars: conf.tsdbMaxExemplars, + EnableExemplarStorage: true, } - relabelContentYaml, err := conf.relabelConfigPath.Content() - if err != nil { - return errors.Wrap(err, "get content of relabel configuration") - } - var relabelConfig []*relabel.Config - if err := yaml.Unmarshal(relabelContentYaml, &relabelConfig); err != nil { - return errors.Wrap(err, "parse relabel configuration") - } + // Are we running in IngestorOnly, RouterOnly or RouterIngestor mode? + receiveMode := conf.determineMode() - dbs := receive.NewMultiTSDB( - conf.dataDir, + return runReceive( + g, logger, reg, + tracer, + grpcLogOpts, tagOpts, tsdbOpts, lset, - conf.tenantLabelName, - bkt, - conf.allowOutOfOrderUpload, - hashFunc, + component.Receive, + metadata.HashFunc(conf.hashFunc), + receiveMode, + conf, ) - writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) - webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ - Writer: writer, - ListenAddress: conf.rwAddress, - Registry: reg, - Endpoint: conf.endpoint, - TenantHeader: conf.tenantHeader, - TenantField: conf.tenantField, - DefaultTenantID: conf.defaultTenantID, - ReplicaHeader: conf.replicaHeader, - ReplicationFactor: conf.replicationFactor, - RelabelConfigs: relabelConfig, - ReceiverMode: receiveMode, - Tracer: tracer, - TLSConfig: rwTLSConfig, - DialOpts: dialOpts, - ForwardTimeout: time.Duration(*conf.forwardTimeout), - TSDBStats: dbs, - MaxPerTenantLimit: conf.maxPerTenantLimit, - MetaMonitoringUrl: conf.metaMonitoringUrl, - MetaMonitoringHttpClient: conf.metaMonitoringHttpClient, - MetaMonitoringLimitQuery: conf.metaMonitoringLimitQuery, - }) +}) +``` - grpcProbe := prober.NewGRPC() - httpProbe := prober.NewHTTP() - statusProber := prober.Combine( - httpProbe, - grpcProbe, - prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), - ) +} - // Start all components while we wait for TSDB to open but only load - // initial config and mark ourselves as ready after it completed. +func runReceive( g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, grpcLogOpts []grpc_logging.Option, tagOpts []tags.Option, tsdbOpts *tsdb.Options, lset labels.Labels, comp component.SourceStoreAPI, hashFunc metadata.HashFunc, receiveMode receive.ReceiverMode, conf *receiveConfig, ) error { logger = log.With(logger, "component", "receive") - // reloadGRPCServer signals when - (1)TSDB is ready and the Store gRPC server can start. - // (2) The Hashring files have changed if tsdb ingestion is disabled. - reloadGRPCServer := make(chan struct{}, 1) - // hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change. - hashringChangedChan := make(chan struct{}, 1) +``` +level.Info(logger).Log("mode", receiveMode, "msg", "running receive") - if enableIngestion { - // uploadC signals when new blocks should be uploaded. - uploadC := make(chan struct{}, 1) - // uploadDone signals when uploading has finished. - uploadDone := make(chan struct{}, 1) - - level.Debug(logger).Log("msg", "setting up tsdb") - { - if err := startTSDBAndUpload(g, logger, reg, dbs, reloadGRPCServer, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt); err != nil { - return err +rwTLSConfig, err := tls.NewServerConfig(log.With(logger, "protocol", "HTTP"), conf.rwServerCert, conf.rwServerKey, conf.rwServerClientCA) +if err != nil { + return err +} + +dialOpts, err := extgrpc.StoreClientGRPCOpts( + logger, + reg, + tracer, + *conf.grpcCert != "", + *conf.grpcClientCA == "", + conf.rwClientCert, + conf.rwClientKey, + conf.rwClientServerCA, + conf.rwClientServerName, +) +if err != nil { + return err +} + +var bkt objstore.Bucket +confContentYaml, err := conf.objStoreConfig.Content() +if err != nil { + return err +} + +// Has this thanos receive instance been configured to ingest metrics into a local TSDB? +enableIngestion := receiveMode == receive.IngestorOnly || receiveMode == receive.RouterIngestor + +upload := len(confContentYaml) > 0 +if enableIngestion { + if upload { + if tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration { + if !conf.ignoreBlockSize { + return errors.Errorf("found that TSDB Max time is %d and Min time is %d. "+ + "Compaction needs to be disabled (tsdb.min-block-duration = tsdb.max-block-duration)", tsdbOpts.MaxBlockDuration, tsdbOpts.MinBlockDuration) } + level.Warn(logger).Log("msg", "flag to ignore min/max block duration flags differing is being used. If the upload of a 2h block fails and a tsdb compaction happens that block may be missing from your Thanos bucket storage.") } - } - - level.Debug(logger).Log("msg", "setting up hashring") - { - if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, reloadGRPCServer, enableIngestion); err != nil { + // The background shipper continuously scans the data directory and uploads + // new blocks to object storage service. + bkt, err = client.NewBucket(logger, confContentYaml, reg, comp.String()) + if err != nil { return err } + } else { + level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled") } +} - level.Debug(logger).Log("msg", "setting up http server") - { - srv := httpserver.New(logger, reg, comp, httpProbe, - httpserver.WithListen(*conf.httpBindAddr), - httpserver.WithGracePeriod(time.Duration(*conf.httpGracePeriod)), - httpserver.WithTLSConfig(*conf.httpTLSConfig), - ) - g.Add(func() error { - statusProber.Healthy() +// TODO(brancz): remove after a couple of versions +// Migrate non-multi-tsdb capable storage to multi-tsdb disk layout. +if err := migrateLegacyStorage(logger, conf.dataDir, conf.defaultTenantID); err != nil { + return errors.Wrapf(err, "migrate legacy storage in %v to default tenant %v", conf.dataDir, conf.defaultTenantID) +} - return srv.ListenAndServe() - }, func(err error) { - statusProber.NotReady(err) - defer statusProber.NotHealthy(err) +relabelContentYaml, err := conf.relabelConfigPath.Content() +if err != nil { + return errors.Wrap(err, "get content of relabel configuration") +} +var relabelConfig []*relabel.Config +if err := yaml.Unmarshal(relabelContentYaml, &relabelConfig); err != nil { + return errors.Wrap(err, "parse relabel configuration") +} - srv.Shutdown(err) - }) - } +dbs := receive.NewMultiTSDB( + conf.dataDir, + logger, + reg, + tsdbOpts, + lset, + conf.tenantLabelName, + bkt, + conf.allowOutOfOrderUpload, + hashFunc, +) +writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs) +webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{ + Writer: writer, + ListenAddress: conf.rwAddress, + Registry: reg, + Endpoint: conf.endpoint, + TenantHeader: conf.tenantHeader, + TenantField: conf.tenantField, + DefaultTenantID: conf.defaultTenantID, + ReplicaHeader: conf.replicaHeader, + ReplicationFactor: conf.replicationFactor, + RelabelConfigs: relabelConfig, + ReceiverMode: receiveMode, + Tracer: tracer, + TLSConfig: rwTLSConfig, + DialOpts: dialOpts, + ForwardTimeout: time.Duration(*conf.forwardTimeout), + TSDBStats: dbs, + MaxPerTenantLimit: conf.maxPerTenantLimit, + MetaMonitoringUrl: conf.metaMonitoringUrl, + MetaMonitoringHttpClient: conf.metaMonitoringHttpClient, + MetaMonitoringLimitQuery: conf.metaMonitoringLimitQuery, +}) + +grpcProbe := prober.NewGRPC() +httpProbe := prober.NewHTTP() +statusProber := prober.Combine( + httpProbe, + grpcProbe, + prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), +) + +// Start all components while we wait for TSDB to open but only load +// initial config and mark ourselves as ready after it completed. - level.Debug(logger).Log("msg", "setting up grpc server") +// reloadGRPCServer signals when - (1)TSDB is ready and the Store gRPC server can start. +// (2) The Hashring files have changed if tsdb ingestion is disabled. +reloadGRPCServer := make(chan struct{}, 1) +// hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change. +hashringChangedChan := make(chan struct{}, 1) + +if enableIngestion { + // uploadC signals when new blocks should be uploaded. + uploadC := make(chan struct{}, 1) + // uploadDone signals when uploading has finished. + uploadDone := make(chan struct{}, 1) + + level.Debug(logger).Log("msg", "setting up tsdb") { - if err := setupAndRunGRPCServer(g, logger, reg, tracer, conf, reloadGRPCServer, comp, dbs, webHandler, grpcLogOpts, - tagOpts, grpcProbe, httpProbe.IsReady); err != nil { + if err := startTSDBAndUpload(g, logger, reg, dbs, reloadGRPCServer, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt); err != nil { return err } } +} - level.Debug(logger).Log("msg", "setting up receive http handler") - { - g.Add( - func() error { - return errors.Wrap(webHandler.Run(), "error starting web server") - }, - func(err error) { - webHandler.Close() - }, - ) +level.Debug(logger).Log("msg", "setting up hashring") +{ + if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, reloadGRPCServer, enableIngestion); err != nil { + return err } +} - if (receiveMode == receive.RouterOnly || receiveMode == receive.RouterIngestor) && conf.maxPerTenantLimit != 0 { - level.Info(logger).Log("msg", "setting up periodic (every 15s) meta-monitoring query for limiting cache") - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - return runutil.Repeat(15*time.Second, ctx.Done(), func() error { - if err := webHandler.ActiveSeriesLimit.QueryMetaMonitoring(ctx, log.With(logger, "component", "receive-meta-monitoring")); err != nil { - level.Error(logger).Log("msg", "failed to query meta-monitoring", "err", err.Error()) - } - return nil - }) - }, func(err error) { - cancel() - }) - } +level.Debug(logger).Log("msg", "setting up http server") +{ + srv := httpserver.New(logger, reg, comp, httpProbe, + httpserver.WithListen(*conf.httpBindAddr), + httpserver.WithGracePeriod(time.Duration(*conf.httpGracePeriod)), + httpserver.WithTLSConfig(*conf.httpTLSConfig), + ) + g.Add(func() error { + statusProber.Healthy() + + return srv.ListenAndServe() + }, func(err error) { + statusProber.NotReady(err) + defer statusProber.NotHealthy(err) + + srv.Shutdown(err) + }) +} + +level.Debug(logger).Log("msg", "setting up grpc server") +{ + if err := setupAndRunGRPCServer(g, logger, reg, tracer, conf, reloadGRPCServer, comp, dbs, webHandler, grpcLogOpts, + tagOpts, grpcProbe, httpProbe.IsReady); err != nil { + return err } +} - level.Debug(logger).Log("msg", "setting up periodic tenant pruning") +level.Debug(logger).Log("msg", "setting up receive http handler") +{ + g.Add( + func() error { + return errors.Wrap(webHandler.Run(), "error starting web server") + }, + func(err error) { + webHandler.Close() + }, + ) +} + +seriesLimitSupported := receiveMode == receive.RouterOnly || receiveMode == receive.RouterIngestor +if seriesLimitSupported && conf.maxPerTenantLimit != 0 { + level.Info(logger).Log("msg", "setting up periodic (every 15s) meta-monitoring query for limiting cache") { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { - return runutil.Repeat(2*time.Hour, ctx.Done(), func() error { - if err := dbs.Prune(ctx); err != nil { - level.Error(logger).Log("err", err) + return runutil.Repeat(15*time.Second, ctx.Done(), func() error { + if err := webHandler.ActiveSeriesLimit.QueryMetaMonitoring(ctx, log.With(logger, "component", "receive-meta-monitoring")); err != nil { + level.Error(logger).Log("msg", "failed to query meta-monitoring", "err", err.Error()) } return nil }) @@ -329,589 +298,572 @@ func runReceive( cancel() }) } +} - level.Info(logger).Log("msg", "starting receiver") - return nil +level.Debug(logger).Log("msg", "setting up periodic tenant pruning") +{ + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + return runutil.Repeat(2*time.Hour, ctx.Done(), func() error { + if err := dbs.Prune(ctx); err != nil { + level.Error(logger).Log("err", err) + } + return nil + }) + }, func(err error) { + cancel() + }) } -// setupAndRunGRPCServer sets up the configuration for the gRPC server. -// It also sets up a handler for reloading the server if tsdb reloads. -func setupAndRunGRPCServer(g *run.Group, - logger log.Logger, - reg *prometheus.Registry, - tracer opentracing.Tracer, - conf *receiveConfig, - reloadGRPCServer chan struct{}, - comp component.SourceStoreAPI, - dbs *receive.MultiTSDB, - webHandler *receive.Handler, - grpcLogOpts []grpc_logging.Option, - tagOpts []tags.Option, - grpcProbe *prober.GRPCProbe, - isReady func() bool, -) error { +level.Info(logger).Log("msg", "starting receiver") +return nil +``` - var s *grpcserver.Server - // startGRPCListening re-starts the gRPC server once it receives a signal. - startGRPCListening := make(chan struct{}) +} - g.Add(func() error { - defer close(startGRPCListening) +// setupAndRunGRPCServer sets up the configuration for the gRPC server. // It also sets up a handler for reloading the server if tsdb reloads. func setupAndRunGRPCServer(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, conf *receiveConfig, reloadGRPCServer chan struct{}, comp component.SourceStoreAPI, dbs *receive.MultiTSDB, webHandler *receive.Handler, grpcLogOpts []grpc_logging.Option, tagOpts []tags.Option, grpcProbe *prober.GRPCProbe, isReady func() bool, ) error { - tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), *conf.grpcCert, *conf.grpcKey, *conf.grpcClientCA) - if err != nil { - return errors.Wrap(err, "setup gRPC server") - } +``` +var s *grpcserver.Server +// startGRPCListening re-starts the gRPC server once it receives a signal. +startGRPCListening := make(chan struct{}) - for range reloadGRPCServer { - if s != nil { - s.Shutdown(errors.New("reload hashrings")) - } +g.Add(func() error { + defer close(startGRPCListening) - mts := store.NewMultiTSDBStore( - logger, - reg, - comp, - dbs.TSDBStores, - ) - rw := store.ReadWriteTSDBStore{ - StoreServer: mts, - WriteableStoreServer: webHandler, - } + tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), *conf.grpcCert, *conf.grpcKey, *conf.grpcClientCA) + if err != nil { + return errors.Wrap(err, "setup gRPC server") + } - infoSrv := info.NewInfoServer( - component.Receive.String(), - info.WithLabelSetFunc(func() []labelpb.ZLabelSet { return mts.LabelSet() }), - info.WithStoreInfoFunc(func() *infopb.StoreInfo { - if isReady() { - minTime, maxTime := mts.TimeRange() - return &infopb.StoreInfo{ - MinTime: minTime, - MaxTime: maxTime, - } - } - return nil - }), - info.WithExemplarsInfoFunc(), - ) - - s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, - grpcserver.WithServer(store.RegisterStoreServer(rw)), - grpcserver.WithServer(store.RegisterWritableStoreServer(rw)), - grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewMultiTSDB(dbs.TSDBExemplars))), - grpcserver.WithServer(info.RegisterInfoServer(infoSrv)), - grpcserver.WithListen(*conf.grpcBindAddr), - grpcserver.WithGracePeriod(time.Duration(*conf.grpcGracePeriod)), - grpcserver.WithTLSConfig(tlsCfg), - grpcserver.WithMaxConnAge(*conf.grpcMaxConnAge), - ) - startGRPCListening <- struct{}{} - } + for range reloadGRPCServer { if s != nil { - s.Shutdown(err) + s.Shutdown(errors.New("reload hashrings")) } - return nil - }, func(error) {}) - // We need to be able to start and stop the gRPC server - // whenever the DB changes, thus it needs its own run group. - g.Add(func() error { - for range startGRPCListening { - level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", *conf.grpcBindAddr) - if err := s.ListenAndServe(); err != nil { - return errors.Wrap(err, "serve gRPC") - } + mts := store.NewMultiTSDBStore( + logger, + reg, + comp, + dbs.TSDBStores, + ) + rw := store.ReadWriteTSDBStore{ + StoreServer: mts, + WriteableStoreServer: webHandler, } - return nil - }, func(error) { - defer close(reloadGRPCServer) - }) + infoSrv := info.NewInfoServer( + component.Receive.String(), + info.WithLabelSetFunc(func() []labelpb.ZLabelSet { return mts.LabelSet() }), + info.WithStoreInfoFunc(func() *infopb.StoreInfo { + if isReady() { + minTime, maxTime := mts.TimeRange() + return &infopb.StoreInfo{ + MinTime: minTime, + MaxTime: maxTime, + } + } + return nil + }), + info.WithExemplarsInfoFunc(), + ) + + s = grpcserver.New(logger, &receive.UnRegisterer{Registerer: reg}, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, + grpcserver.WithServer(store.RegisterStoreServer(rw)), + grpcserver.WithServer(store.RegisterWritableStoreServer(rw)), + grpcserver.WithServer(exemplars.RegisterExemplarsServer(exemplars.NewMultiTSDB(dbs.TSDBExemplars))), + grpcserver.WithServer(info.RegisterInfoServer(infoSrv)), + grpcserver.WithListen(*conf.grpcBindAddr), + grpcserver.WithGracePeriod(time.Duration(*conf.grpcGracePeriod)), + grpcserver.WithTLSConfig(tlsCfg), + grpcserver.WithMaxConnAge(*conf.grpcMaxConnAge), + ) + startGRPCListening <- struct{}{} + } + if s != nil { + s.Shutdown(err) + } + return nil +}, func(error) {}) + +// We need to be able to start and stop the gRPC server +// whenever the DB changes, thus it needs its own run group. +g.Add(func() error { + for range startGRPCListening { + level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", *conf.grpcBindAddr) + if err := s.ListenAndServe(); err != nil { + return errors.Wrap(err, "serve gRPC") + } + } return nil +}, func(error) { + defer close(reloadGRPCServer) +}) + +return nil +``` } -// setupHashring sets up the hashring configuration provided. -// If no hashring is provided, we setup a single node hashring with local endpoint. -func setupHashring(g *run.Group, - logger log.Logger, - reg *prometheus.Registry, - conf *receiveConfig, - hashringChangedChan chan struct{}, - webHandler *receive.Handler, - statusProber prober.Probe, - reloadGRPCServer chan struct{}, - enableIngestion bool, -) error { - // Note: the hashring configuration watcher - // is the sender and thus closes the chan. - // In the single-node case, which has no configuration - // watcher, we close the chan ourselves. - updates := make(chan receive.Hashring, 1) - - // The Hashrings config file path is given initializing config watcher. - if conf.hashringsFilePath != "" { - cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, conf.hashringsFilePath, *conf.refreshInterval) - if err != nil { - return errors.Wrap(err, "failed to initialize config watcher") - } +// setupHashring sets up the hashring configuration provided. // If no hashring is provided, we setup a single node hashring with local endpoint. func setupHashring(g *run.Group, logger log.Logger, reg *prometheus.Registry, conf *receiveConfig, hashringChangedChan chan struct{}, webHandler *receive.Handler, statusProber prober.Probe, reloadGRPCServer chan struct{}, enableIngestion bool, ) error { // Note: the hashring configuration watcher // is the sender and thus closes the chan. // In the single-node case, which has no configuration // watcher, we close the chan ourselves. updates := make(chan receive.Hashring, 1) + +``` +// The Hashrings config file path is given initializing config watcher. +if conf.hashringsFilePath != "" { + cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, conf.hashringsFilePath, *conf.refreshInterval) + if err != nil { + return errors.Wrap(err, "failed to initialize config watcher") + } + + // Check the hashring configuration on before running the watcher. + if err := cw.ValidateConfig(); err != nil { + cw.Stop() + close(updates) + return errors.Wrap(err, "failed to validate hashring configuration file") + } - // Check the hashring configuration on before running the watcher. - if err := cw.ValidateConfig(); err != nil { - cw.Stop() + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + level.Info(logger).Log("msg", "the hashring initialized with config watcher.") + return receive.HashringFromConfigWatcher(ctx, receive.HashringAlgorithm(conf.hashringsAlgorithm), updates, cw) + }, func(error) { + cancel() + }) +} else { + var ( + ring receive.Hashring + err error + ) + // The Hashrings config file content given initialize configuration from content. + if len(conf.hashringsFileContent) > 0 { + ring, err = receive.HashringFromConfig(receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.hashringsFileContent) + if err != nil { close(updates) return errors.Wrap(err, "failed to validate hashring configuration file") } - - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - level.Info(logger).Log("msg", "the hashring initialized with config watcher.") - return receive.HashringFromConfigWatcher(ctx, receive.HashringAlgorithm(conf.hashringsAlgorithm), updates, cw) - }, func(error) { - cancel() - }) + level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.") } else { - var ( - ring receive.Hashring - err error - ) - // The Hashrings config file content given initialize configuration from content. - if len(conf.hashringsFileContent) > 0 { - ring, err = receive.HashringFromConfig(receive.HashringAlgorithm(conf.hashringsAlgorithm), conf.hashringsFileContent) - if err != nil { - close(updates) - return errors.Wrap(err, "failed to validate hashring configuration file") - } - level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.") - } else { - level.Info(logger).Log("msg", "the hashring file is not specified use single node hashring.") - ring = receive.SingleNodeHashring(conf.endpoint) - } - - cancel := make(chan struct{}) - g.Add(func() error { - defer close(updates) - updates <- ring - <-cancel - return nil - }, func(error) { - close(cancel) - }) + level.Info(logger).Log("msg", "the hashring file is not specified use single node hashring.") + ring = receive.SingleNodeHashring(conf.endpoint) } cancel := make(chan struct{}) g.Add(func() error { + defer close(updates) + updates <- ring + <-cancel + return nil + }, func(error) { + close(cancel) + }) +} - if enableIngestion { - defer close(hashringChangedChan) - } +cancel := make(chan struct{}) +g.Add(func() error { - for { - select { - case h, ok := <-updates: - if !ok { - return nil - } - webHandler.Hashring(h) - msg := "hashring has changed; server is not ready to receive web requests" - statusProber.NotReady(errors.New(msg)) - level.Info(logger).Log("msg", msg) - - if enableIngestion { - // send a signal to tsdb to reload, and then restart the gRPC server. - hashringChangedChan <- struct{}{} - } else { - // we dont need tsdb to reload, so restart the gRPC server. - level.Info(logger).Log("msg", "server has reloaded, ready to start accepting requests") - statusProber.Ready() - reloadGRPCServer <- struct{}{} - } - case <-cancel: + if enableIngestion { + defer close(hashringChangedChan) + } + + for { + select { + case h, ok := <-updates: + if !ok { return nil } + webHandler.Hashring(h) + msg := "hashring has changed; server is not ready to receive web requests" + statusProber.NotReady(errors.New(msg)) + level.Info(logger).Log("msg", msg) + + if enableIngestion { + // send a signal to tsdb to reload, and then restart the gRPC server. + hashringChangedChan <- struct{}{} + } else { + // we dont need tsdb to reload, so restart the gRPC server. + level.Info(logger).Log("msg", "server has reloaded, ready to start accepting requests") + statusProber.Ready() + reloadGRPCServer <- struct{}{} + } + case <-cancel: + return nil } - }, func(err error) { - close(cancel) - }, - ) - return nil + } +}, func(err error) { + close(cancel) +}, +) +return nil +``` + } -// startTSDBAndUpload starts up the multi-tsdb and sets up the rungroup to flush the tsdb and reload on hashring change. -// It also uploads the tsdb to object store if upload is enabled. -func startTSDBAndUpload(g *run.Group, - logger log.Logger, - reg *prometheus.Registry, - dbs *receive.MultiTSDB, - reloadGRPCServer chan struct{}, - uploadC chan struct{}, - hashringChangedChan chan struct{}, - upload bool, - uploadDone chan struct{}, - statusProber prober.Probe, - bkt objstore.Bucket, +// startTSDBAndUpload starts up the multi-tsdb and sets up the rungroup to flush the tsdb and reload on hashring change. // It also uploads the tsdb to object store if upload is enabled. func startTSDBAndUpload(g *run.Group, logger log.Logger, reg *prometheus.Registry, dbs *receive.MultiTSDB, reloadGRPCServer chan struct{}, uploadC chan struct{}, hashringChangedChan chan struct{}, upload bool, uploadDone chan struct{}, statusProber prober.Probe, bkt objstore.Bucket, ) error { - log.With(logger, "component", "storage") - dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_receive_multi_db_updates_attempted_total", - Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes", - }) - dbUpdatesCompleted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "thanos_receive_multi_db_updates_completed_total", - Help: "Number of Multi DB completed reloads with flush and potential upload due to hashring changes", - }) - - level.Debug(logger).Log("msg", "removing storage lock files if any") - if err := dbs.RemoveLockFilesIfAny(); err != nil { - return errors.Wrap(err, "remove storage lock files") - } +``` +log.With(logger, "component", "storage") +dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_receive_multi_db_updates_attempted_total", + Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes", +}) +dbUpdatesCompleted := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_receive_multi_db_updates_completed_total", + Help: "Number of Multi DB completed reloads with flush and potential upload due to hashring changes", +}) + +level.Debug(logger).Log("msg", "removing storage lock files if any") +if err := dbs.RemoveLockFilesIfAny(); err != nil { + return errors.Wrap(err, "remove storage lock files") +} - // TSDBs reload logic, listening on hashring changes. - cancel := make(chan struct{}) - g.Add(func() error { - defer close(uploadC) +// TSDBs reload logic, listening on hashring changes. +cancel := make(chan struct{}) +g.Add(func() error { + defer close(uploadC) - // Before quitting, ensure the WAL is flushed and the DBs are closed. - defer func() { - level.Info(logger).Log("msg", "shutting down storage") - if err := dbs.Flush(); err != nil { - level.Error(logger).Log("err", err, "msg", "failed to flush storage") - } else { - level.Info(logger).Log("msg", "storage is flushed successfully") - } - if err := dbs.Close(); err != nil { - level.Error(logger).Log("err", err, "msg", "failed to close storage") - return - } - level.Info(logger).Log("msg", "storage is closed") - }() + // Before quitting, ensure the WAL is flushed and the DBs are closed. + defer func() { + level.Info(logger).Log("msg", "shutting down storage") + if err := dbs.Flush(); err != nil { + level.Error(logger).Log("err", err, "msg", "failed to flush storage") + } else { + level.Info(logger).Log("msg", "storage is flushed successfully") + } + if err := dbs.Close(); err != nil { + level.Error(logger).Log("err", err, "msg", "failed to close storage") + return + } + level.Info(logger).Log("msg", "storage is closed") + }() - for { - select { - case <-cancel: + for { + select { + case <-cancel: + return nil + case _, ok := <-hashringChangedChan: + if !ok { return nil - case _, ok := <-hashringChangedChan: - if !ok { - return nil - } - dbUpdatesStarted.Inc() - level.Info(logger).Log("msg", "updating storage") - - if err := dbs.Flush(); err != nil { - return errors.Wrap(err, "flushing storage") - } - if err := dbs.Open(); err != nil { - return errors.Wrap(err, "opening storage") - } - if upload { - uploadC <- struct{}{} - <-uploadDone - } - statusProber.Ready() - level.Info(logger).Log("msg", "storage started, and server is ready to receive web requests") - dbUpdatesCompleted.Inc() - reloadGRPCServer <- struct{}{} } - } - }, func(err error) { - close(cancel) - }) + dbUpdatesStarted.Inc() + level.Info(logger).Log("msg", "updating storage") - if upload { - logger := log.With(logger, "component", "uploader") - upload := func(ctx context.Context) error { - level.Debug(logger).Log("msg", "upload phase starting") - start := time.Now() - - uploaded, err := dbs.Sync(ctx) - if err != nil { - level.Warn(logger).Log("msg", "upload failed", "elapsed", time.Since(start), "err", err) - return err + if err := dbs.Flush(); err != nil { + return errors.Wrap(err, "flushing storage") } - level.Debug(logger).Log("msg", "upload phase done", "uploaded", uploaded, "elapsed", time.Since(start)) - return nil - } - { - level.Info(logger).Log("msg", "upload enabled, starting initial sync") - if err := upload(context.Background()); err != nil { - return errors.Wrap(err, "initial upload failed") + if err := dbs.Open(); err != nil { + return errors.Wrap(err, "opening storage") } - level.Info(logger).Log("msg", "initial sync done") - } - { - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - // Ensure we clean up everything properly. - defer func() { - runutil.CloseWithLogOnErr(logger, bkt, "bucket client") - }() - - // Before quitting, ensure all blocks are uploaded. - defer func() { - <-uploadC // Closed by storage routine when it's done. - level.Info(logger).Log("msg", "uploading the final cut block before exiting") - ctx, cancel := context.WithCancel(context.Background()) - uploaded, err := dbs.Sync(ctx) - if err != nil { - cancel() - level.Error(logger).Log("msg", "the final upload failed", "err", err) - return - } - cancel() - level.Info(logger).Log("msg", "the final cut block was uploaded", "uploaded", uploaded) - }() - - defer close(uploadDone) - - // Run the uploader in a loop. - tick := time.NewTicker(30 * time.Second) - defer tick.Stop() - - for { - select { - case <-ctx.Done(): - return nil - case <-uploadC: - // Upload on demand. - if err := upload(ctx); err != nil { - level.Warn(logger).Log("msg", "on demand upload failed", "err", err) - } - uploadDone <- struct{}{} - case <-tick.C: - if err := upload(ctx); err != nil { - level.Warn(logger).Log("msg", "recurring upload failed", "err", err) - } - } - } - }, func(error) { - cancel() - }) + if upload { + uploadC <- struct{}{} + <-uploadDone + } + statusProber.Ready() + level.Info(logger).Log("msg", "storage started, and server is ready to receive web requests") + dbUpdatesCompleted.Inc() + reloadGRPCServer <- struct{}{} } } +}, func(err error) { + close(cancel) +}) - return nil -} +if upload { + logger := log.With(logger, "component", "uploader") + upload := func(ctx context.Context) error { + level.Debug(logger).Log("msg", "upload phase starting") + start := time.Now() -func migrateLegacyStorage(logger log.Logger, dataDir, defaultTenantID string) error { - defaultTenantDataDir := path.Join(dataDir, defaultTenantID) - - if _, err := os.Stat(defaultTenantDataDir); !os.IsNotExist(err) { - level.Info(logger).Log("msg", "default tenant data dir already present, not attempting to migrate storage") + uploaded, err := dbs.Sync(ctx) + if err != nil { + level.Warn(logger).Log("msg", "upload failed", "elapsed", time.Since(start), "err", err) + return err + } + level.Debug(logger).Log("msg", "upload phase done", "uploaded", uploaded, "elapsed", time.Since(start)) return nil } - - if _, err := os.Stat(dataDir); os.IsNotExist(err) { - level.Info(logger).Log("msg", "no existing storage found, no data migration attempted") - return nil + { + level.Info(logger).Log("msg", "upload enabled, starting initial sync") + if err := upload(context.Background()); err != nil { + return errors.Wrap(err, "initial upload failed") + } + level.Info(logger).Log("msg", "initial sync done") } + { + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + // Ensure we clean up everything properly. + defer func() { + runutil.CloseWithLogOnErr(logger, bkt, "bucket client") + }() + + // Before quitting, ensure all blocks are uploaded. + defer func() { + <-uploadC // Closed by storage routine when it's done. + level.Info(logger).Log("msg", "uploading the final cut block before exiting") + ctx, cancel := context.WithCancel(context.Background()) + uploaded, err := dbs.Sync(ctx) + if err != nil { + cancel() + level.Error(logger).Log("msg", "the final upload failed", "err", err) + return + } + cancel() + level.Info(logger).Log("msg", "the final cut block was uploaded", "uploaded", uploaded) + }() - level.Info(logger).Log("msg", "found legacy storage, migrating to multi-tsdb layout with default tenant", "defaultTenantID", defaultTenantID) - - files, err := ioutil.ReadDir(dataDir) - if err != nil { - return errors.Wrapf(err, "read legacy data dir: %v", dataDir) - } + defer close(uploadDone) - if err := os.MkdirAll(defaultTenantDataDir, 0750); err != nil { - return errors.Wrapf(err, "create default tenant data dir: %v", defaultTenantDataDir) - } + // Run the uploader in a loop. + tick := time.NewTicker(30 * time.Second) + defer tick.Stop() - for _, f := range files { - from := path.Join(dataDir, f.Name()) - to := path.Join(defaultTenantDataDir, f.Name()) - if err := os.Rename(from, to); err != nil { - return errors.Wrapf(err, "migrate file from %v to %v", from, to) - } + for { + select { + case <-ctx.Done(): + return nil + case <-uploadC: + // Upload on demand. + if err := upload(ctx); err != nil { + level.Warn(logger).Log("msg", "on demand upload failed", "err", err) + } + uploadDone <- struct{}{} + case <-tick.C: + if err := upload(ctx); err != nil { + level.Warn(logger).Log("msg", "recurring upload failed", "err", err) + } + } + } + }, func(error) { + cancel() + }) } - - return nil } -type receiveConfig struct { - httpBindAddr *string - httpGracePeriod *model.Duration - httpTLSConfig *string +return nil +``` - grpcBindAddr *string - grpcGracePeriod *model.Duration - grpcCert *string - grpcKey *string - grpcClientCA *string - grpcMaxConnAge *time.Duration +} - rwAddress string - rwServerCert string - rwServerKey string - rwServerClientCA string - rwClientCert string - rwClientKey string - rwClientServerCA string - rwClientServerName string +func migrateLegacyStorage(logger log.Logger, dataDir, defaultTenantID string) error { defaultTenantDataDir := path.Join(dataDir, defaultTenantID) - maxPerTenantLimit uint64 - metaMonitoringLimitQuery string - metaMonitoringUrl *url.URL - metaMonitoringHttpClient *extflag.PathOrContent +``` +if _, err := os.Stat(defaultTenantDataDir); !os.IsNotExist(err) { + level.Info(logger).Log("msg", "default tenant data dir already present, not attempting to migrate storage") + return nil +} - dataDir string - labelStrs []string +if _, err := os.Stat(dataDir); os.IsNotExist(err) { + level.Info(logger).Log("msg", "no existing storage found, no data migration attempted") + return nil +} - objStoreConfig *extflag.PathOrContent - retention *model.Duration +level.Info(logger).Log("msg", "found legacy storage, migrating to multi-tsdb layout with default tenant", "defaultTenantID", defaultTenantID) - hashringsFilePath string - hashringsFileContent string - hashringsAlgorithm string +files, err := ioutil.ReadDir(dataDir) +if err != nil { + return errors.Wrapf(err, "read legacy data dir: %v", dataDir) +} - refreshInterval *model.Duration - endpoint string - tenantHeader string - tenantField string - tenantLabelName string - defaultTenantID string - replicaHeader string - replicationFactor uint64 - forwardTimeout *model.Duration +if err := os.MkdirAll(defaultTenantDataDir, 0750); err != nil { + return errors.Wrapf(err, "create default tenant data dir: %v", defaultTenantDataDir) +} - tsdbMinBlockDuration *model.Duration - tsdbMaxBlockDuration *model.Duration - tsdbAllowOverlappingBlocks bool - tsdbMaxExemplars int64 +for _, f := range files { + from := path.Join(dataDir, f.Name()) + to := path.Join(defaultTenantDataDir, f.Name()) + if err := os.Rename(from, to); err != nil { + return errors.Wrapf(err, "migrate file from %v to %v", from, to) + } +} - walCompression bool - noLockFile bool +return nil +``` - hashFunc string +} - ignoreBlockSize bool - allowOutOfOrderUpload bool +type receiveConfig struct { httpBindAddr *string httpGracePeriod *model.Duration httpTLSConfig *string + +``` +grpcBindAddr *string +grpcGracePeriod *model.Duration +grpcCert *string +grpcKey *string +grpcClientCA *string +grpcMaxConnAge *time.Duration + +rwAddress string +rwServerCert string +rwServerKey string +rwServerClientCA string +rwClientCert string +rwClientKey string +rwClientServerCA string +rwClientServerName string + +maxPerTenantLimit uint64 +metaMonitoringLimitQuery string +metaMonitoringUrl *url.URL +metaMonitoringHttpClient *extflag.PathOrContent + +dataDir string +labelStrs []string + +objStoreConfig *extflag.PathOrContent +retention *model.Duration + +hashringsFilePath string +hashringsFileContent string +hashringsAlgorithm string + +refreshInterval *model.Duration +endpoint string +tenantHeader string +tenantField string +tenantLabelName string +defaultTenantID string +replicaHeader string +replicationFactor uint64 +forwardTimeout *model.Duration + +tsdbMinBlockDuration *model.Duration +tsdbMaxBlockDuration *model.Duration +tsdbAllowOverlappingBlocks bool +tsdbMaxExemplars int64 + +walCompression bool +noLockFile bool + +hashFunc string + +ignoreBlockSize bool +allowOutOfOrderUpload bool + +reqLogConfig *extflag.PathOrContent +relabelConfigPath *extflag.PathOrContent +``` - reqLogConfig *extflag.PathOrContent - relabelConfigPath *extflag.PathOrContent } -func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { - rc.httpBindAddr, rc.httpGracePeriod, rc.httpTLSConfig = extkingpin.RegisterHTTPFlags(cmd) - rc.grpcBindAddr, rc.grpcGracePeriod, rc.grpcCert, rc.grpcKey, rc.grpcClientCA, rc.grpcMaxConnAge = extkingpin.RegisterGRPCFlags(cmd) +func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.httpBindAddr, rc.httpGracePeriod, rc.httpTLSConfig = extkingpin.RegisterHTTPFlags(cmd) rc.grpcBindAddr, rc.grpcGracePeriod, rc.grpcCert, rc.grpcKey, rc.grpcClientCA, rc.grpcMaxConnAge = extkingpin.RegisterGRPCFlags(cmd) - cmd.Flag("remote-write.address", "Address to listen on for remote write requests."). - Default("0.0.0.0:19291").StringVar(&rc.rwAddress) +``` +cmd.Flag("remote-write.address", "Address to listen on for remote write requests."). + Default("0.0.0.0:19291").StringVar(&rc.rwAddress) - cmd.Flag("remote-write.server-tls-cert", "TLS Certificate for HTTP server, leave blank to disable TLS.").Default("").StringVar(&rc.rwServerCert) +cmd.Flag("remote-write.server-tls-cert", "TLS Certificate for HTTP server, leave blank to disable TLS.").Default("").StringVar(&rc.rwServerCert) - cmd.Flag("remote-write.server-tls-key", "TLS Key for the HTTP server, leave blank to disable TLS.").Default("").StringVar(&rc.rwServerKey) +cmd.Flag("remote-write.server-tls-key", "TLS Key for the HTTP server, leave blank to disable TLS.").Default("").StringVar(&rc.rwServerKey) - cmd.Flag("remote-write.server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").StringVar(&rc.rwServerClientCA) +cmd.Flag("remote-write.server-tls-client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side. (tls.NoClientCert)").Default("").StringVar(&rc.rwServerClientCA) - cmd.Flag("remote-write.client-tls-cert", "TLS Certificates to use to identify this client to the server.").Default("").StringVar(&rc.rwClientCert) +cmd.Flag("remote-write.client-tls-cert", "TLS Certificates to use to identify this client to the server.").Default("").StringVar(&rc.rwClientCert) - cmd.Flag("remote-write.client-tls-key", "TLS Key for the client's certificate.").Default("").StringVar(&rc.rwClientKey) +cmd.Flag("remote-write.client-tls-key", "TLS Key for the client's certificate.").Default("").StringVar(&rc.rwClientKey) - cmd.Flag("remote-write.client-tls-ca", "TLS CA Certificates to use to verify servers.").Default("").StringVar(&rc.rwClientServerCA) +cmd.Flag("remote-write.client-tls-ca", "TLS CA Certificates to use to verify servers.").Default("").StringVar(&rc.rwClientServerCA) - cmd.Flag("remote-write.client-server-name", "Server name to verify the hostname on the returned TLS certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").StringVar(&rc.rwClientServerName) +cmd.Flag("remote-write.client-server-name", "Server name to verify the hostname on the returned TLS certificates. See https://tools.ietf.org/html/rfc4366#section-3.1").Default("").StringVar(&rc.rwClientServerName) - cmd.Flag("tsdb.path", "Data directory of TSDB."). - Default("./data").StringVar(&rc.dataDir) +cmd.Flag("tsdb.path", "Data directory of TSDB."). + Default("./data").StringVar(&rc.dataDir) - cmd.Flag("label", "External labels to announce. This flag will be removed in the future when handling multiple tsdb instances is added.").PlaceHolder("key=\"value\"").StringsVar(&rc.labelStrs) +cmd.Flag("label", "External labels to announce. This flag will be removed in the future when handling multiple tsdb instances is added.").PlaceHolder("key=\"value\"").StringsVar(&rc.labelStrs) - rc.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false) +rc.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false) - rc.retention = extkingpin.ModelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention. For more details on how retention is enforced for individual tenants, please refer to the Tenant lifecycle management section in the Receive documentation: https://thanos.io/tip/components/receive.md/#tenant-lifecycle-management").Default("15d")) +rc.retention = extkingpin.ModelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables this retention. For more details on how retention is enforced for individual tenants, please refer to the Tenant lifecycle management section in the Receive documentation: https://thanos.io/tip/components/receive.md/#tenant-lifecycle-management").Default("15d")) - cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration. A watcher is initialized to watch changes and update the hashring dynamically.").PlaceHolder("").StringVar(&rc.hashringsFilePath) +cmd.Flag("receive.hashrings-file", "Path to file that contains the hashring configuration. A watcher is initialized to watch changes and update the hashring dynamically.").PlaceHolder("").StringVar(&rc.hashringsFilePath) - cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("").StringVar(&rc.hashringsFileContent) +cmd.Flag("receive.hashrings", "Alternative to 'receive.hashrings-file' flag (lower priority). Content of file that contains the hashring configuration.").PlaceHolder("").StringVar(&rc.hashringsFileContent) - hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ") - cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext). - Default(string(receive.AlgorithmHashmod)). - EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)) +hashringAlgorithmsHelptext := strings.Join([]string{string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)}, ", ") +cmd.Flag("receive.hashrings-algorithm", "The algorithm used when distributing series in the hashrings. Must be one of "+hashringAlgorithmsHelptext). + Default(string(receive.AlgorithmHashmod)). + EnumVar(&rc.hashringsAlgorithm, string(receive.AlgorithmHashmod), string(receive.AlgorithmKetama)) - rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)"). - Default("5m")) +rc.refreshInterval = extkingpin.ModelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)"). + Default("5m")) - cmd.Flag("receive.local-endpoint", "Endpoint of local receive node. Used to identify the local node in the hashring configuration. If it's empty AND hashring configuration was provided, it means that receive will run in RoutingOnly mode.").StringVar(&rc.endpoint) +cmd.Flag("receive.local-endpoint", "Endpoint of local receive node. Used to identify the local node in the hashring configuration. If it's empty AND hashring configuration was provided, it means that receive will run in RoutingOnly mode.").StringVar(&rc.endpoint) - cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(receive.DefaultTenantHeader).StringVar(&rc.tenantHeader) +cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(receive.DefaultTenantHeader).StringVar(&rc.tenantHeader) - cmd.Flag("receive.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+receive.CertificateFieldOrganization+", "+receive.CertificateFieldOrganizationalUnit+" or "+receive.CertificateFieldCommonName+". This setting will cause the receive.tenant-header flag value to be ignored.").Default("").EnumVar(&rc.tenantField, "", receive.CertificateFieldOrganization, receive.CertificateFieldOrganizationalUnit, receive.CertificateFieldCommonName) +cmd.Flag("receive.tenant-certificate-field", "Use TLS client's certificate field to determine tenant for write requests. Must be one of "+receive.CertificateFieldOrganization+", "+receive.CertificateFieldOrganizationalUnit+" or "+receive.CertificateFieldCommonName+". This setting will cause the receive.tenant-header flag value to be ignored.").Default("").EnumVar(&rc.tenantField, "", receive.CertificateFieldOrganization, receive.CertificateFieldOrganizationalUnit, receive.CertificateFieldCommonName) - cmd.Flag("receive.default-tenant-id", "Default tenant ID to use when none is provided via a header.").Default(receive.DefaultTenant).StringVar(&rc.defaultTenantID) +cmd.Flag("receive.default-tenant-id", "Default tenant ID to use when none is provided via a header.").Default(receive.DefaultTenant).StringVar(&rc.defaultTenantID) - cmd.Flag("receive.tenant-label-name", "Label name through which the tenant will be announced.").Default(receive.DefaultTenantLabel).StringVar(&rc.tenantLabelName) +cmd.Flag("receive.tenant-label-name", "Label name through which the tenant will be announced.").Default(receive.DefaultTenantLabel).StringVar(&rc.tenantLabelName) - cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default(receive.DefaultReplicaHeader).StringVar(&rc.replicaHeader) +cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default(receive.DefaultReplicaHeader).StringVar(&rc.replicaHeader) - cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor) +cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor) - cmd.Flag("receive.tenant-limits.max-head-series", "The total number of active (head) series that a tenant is allowed to have within a Receive topology. For more details refer: https://thanos.io/tip/components/receive.md/#limiting").Hidden().Uint64Var(&rc.maxPerTenantLimit) +cmd.Flag("receive.tenant-limits.max-head-series", "The total number of active (head) series that a tenant is allowed to have within a Receive topology. For more details refer: https://thanos.io/tip/components/receive.md/#limiting").Hidden().Uint64Var(&rc.maxPerTenantLimit) - cmd.Flag("receive.tenant-limits.meta-monitoring-url", "Meta-monitoring URL which is compatible with Prometheus Query API for active series limiting.").Hidden().URLVar(&rc.metaMonitoringUrl) +cmd.Flag("receive.tenant-limits.meta-monitoring-url", "Meta-monitoring URL which is compatible with Prometheus Query API for active series limiting.").Hidden().URLVar(&rc.metaMonitoringUrl) - cmd.Flag("receive.tenant-limits.meta-monitoring-query", "PromQL Query to execute against meta-monitoring, to get the current number of active series for each tenant, across Receive replicas.").Default("sum(prometheus_tsdb_head_series) by (tenant)").Hidden().StringVar(&rc.metaMonitoringLimitQuery) +cmd.Flag("receive.tenant-limits.meta-monitoring-query", "PromQL Query to execute against meta-monitoring, to get the current number of active series for each tenant, across Receive replicas.").Default("sum(prometheus_tsdb_head_series) by (tenant)").Hidden().StringVar(&rc.metaMonitoringLimitQuery) - rc.metaMonitoringHttpClient = extflag.RegisterPathOrContent( - cmd, - "receive.tenant-limits.meta-monitoring-client", - "YAML file or string with http client configs for meta-monitoring.", - extflag.WithHidden(), - ) +rc.metaMonitoringHttpClient = extflag.RegisterPathOrContent( + cmd, + "receive.tenant-limits.meta-monitoring-client", + "YAML file or string with http client configs for meta-monitoring.", + extflag.WithHidden(), +) + +rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) - rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) +rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabeling configuration.", extflag.WithEnvSubstitution()) - rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabeling configuration.", extflag.WithEnvSubstitution()) +rc.tsdbMinBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) - rc.tsdbMinBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) +rc.tsdbMaxBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) - rc.tsdbMaxBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) +cmd.Flag("tsdb.allow-overlapping-blocks", "Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge.").Default("false").BoolVar(&rc.tsdbAllowOverlappingBlocks) - cmd.Flag("tsdb.allow-overlapping-blocks", "Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge.").Default("false").BoolVar(&rc.tsdbAllowOverlappingBlocks) +cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").BoolVar(&rc.walCompression) - cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").BoolVar(&rc.walCompression) +cmd.Flag("tsdb.no-lockfile", "Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup.").Default("false").BoolVar(&rc.noLockFile) - cmd.Flag("tsdb.no-lockfile", "Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup.").Default("false").BoolVar(&rc.noLockFile) +cmd.Flag("tsdb.max-exemplars", + "Enables support for ingesting exemplars and sets the maximum number of exemplars that will be stored per tenant."+ + " In case the exemplar storage becomes full (number of stored exemplars becomes equal to max-exemplars),"+ + " ingesting a new exemplar will evict the oldest exemplar from storage. 0 (or less) value of this flag disables exemplars storage."). + Default("0").Int64Var(&rc.tsdbMaxExemplars) - cmd.Flag("tsdb.max-exemplars", - "Enables support for ingesting exemplars and sets the maximum number of exemplars that will be stored per tenant."+ - " In case the exemplar storage becomes full (number of stored exemplars becomes equal to max-exemplars),"+ - " ingesting a new exemplar will evict the oldest exemplar from storage. 0 (or less) value of this flag disables exemplars storage."). - Default("0").Int64Var(&rc.tsdbMaxExemplars) +cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). + Default("").EnumVar(&rc.hashFunc, "SHA256", "") - cmd.Flag("hash-func", "Specify which hash function to use when calculating the hashes of produced files. If no function has been specified, it does not happen. This permits avoiding downloading some files twice albeit at some performance cost. Possible values are: \"\", \"SHA256\"."). - Default("").EnumVar(&rc.hashFunc, "SHA256", "") +cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().BoolVar(&rc.ignoreBlockSize) - cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().BoolVar(&rc.ignoreBlockSize) +cmd.Flag("shipper.allow-out-of-order-uploads", + "If true, shipper will skip failed block uploads in the given iteration and retry later. This means that some newer blocks might be uploaded sooner than older blocks."+ + "This can trigger compaction without those blocks and as a result will create an overlap situation. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+ + "about order."). + Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload) - cmd.Flag("shipper.allow-out-of-order-uploads", - "If true, shipper will skip failed block uploads in the given iteration and retry later. This means that some newer blocks might be uploaded sooner than older blocks."+ - "This can trigger compaction without those blocks and as a result will create an overlap situation. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+ - "about order."). - Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload) +rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) +``` - rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) } -// determineMode returns the ReceiverMode that this receiver is configured to run in. -// This is used to configure this Receiver's forwarding and ingesting behavior at runtime. -func (rc *receiveConfig) determineMode() receive.ReceiverMode { - // Has the user provided some kind of hashring configuration? - hashringSpecified := rc.hashringsFileContent != "" || rc.hashringsFilePath != "" - // Has the user specified the --receive.local-endpoint flag? - localEndpointSpecified := rc.endpoint != "" +// determineMode returns the ReceiverMode that this receiver is configured to run in. // This is used to configure this Receiver's forwarding and ingesting behavior at runtime. func (rc *receiveConfig) determineMode() receive.ReceiverMode { // Has the user provided some kind of hashring configuration? hashringSpecified := rc.hashringsFileContent != "" || rc.hashringsFilePath != "" // Has the user specified the --receive.local-endpoint flag? localEndpointSpecified := rc.endpoint != "" + +``` +switch { +case hashringSpecified && localEndpointSpecified: + return receive.RouterIngestor +case hashringSpecified && !localEndpointSpecified: + // Be careful - if the hashring contains an address that routes to itself and does not specify a local + // endpoint - you've just created an infinite loop / fork bomb :) + return receive.RouterOnly +default: + // hashring configuration has not been provided so we ingest all metrics locally. + return receive.IngestorOnly +} +``` - switch { - case hashringSpecified && localEndpointSpecified: - return receive.RouterIngestor - case hashringSpecified && !localEndpointSpecified: - // Be careful - if the hashring contains an address that routes to itself and does not specify a local - // endpoint - you've just created an infinite loop / fork bomb :) - return receive.RouterOnly - default: - // hashring configuration has not been provided so we ingest all metrics locally. - return receive.IngestorOnly - } } diff --git a/docs/components/receive.md b/docs/components/receive.md index 89d2c112993..63b2704a363 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -77,7 +77,7 @@ The example content of `hashring.json`: With such configuration any receive listens for remote write on `10908/api/v1/receive` and will forward to correct one in hashring if needed for tenancy and replication. -## Limiting +## Limiting (Experimental feature) Thanos Receive, in Router or RouterIngestor mode, supports limiting tenant active (head) series to maintain the system's stability. It uses any Prometheus Query API compatible meta-monitoring solution that consumes the metrics exposed by all receivers in the Thanos system. Such query endpoint allows getting the scrape time seconds old number of all active series per tenant, which is then compared with a configured limit before ingesting any tenant's remote write request. In case a tenant has gone above the limit, their remote write requests fail fully. diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index be7a2e21ea4..4afbaf65619 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -109,19 +109,10 @@ type Options struct { MetaMonitoringLimitQuery string } -// activeSeriesLimit implements active series limiting for web Handler. -type activeSeriesLimit struct { - mtx sync.Mutex - limit uint64 - tenantCurrentSeriesMap map[string]float64 - - metaMonitoringURL *url.URL - metaMonitoringClient *http.Client - metaMonitoringQuery string - - configuredTenantLimit prometheus.Gauge - limitedRequests *prometheus.CounterVec - metaMonitoringErr prometheus.Counter +// activeSeriesLimiter encompasses active series limiting logic. +type activeSeriesLimiter interface { + QueryMetaMonitoring(context.Context, log.Logger) error + isUnderLimit(string, log.Logger) (bool, error) } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -138,7 +129,7 @@ type Handler struct { expBackoff backoff.Backoff peerStates map[string]*retryState receiverMode ReceiverMode - ActiveSeriesLimit *activeSeriesLimit + ActiveSeriesLimit activeSeriesLimiter forwardRequests *prometheus.CounterVec replications *prometheus.CounterVec @@ -189,29 +180,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Help: "The number of times to replicate incoming write requests.", }, ), - ActiveSeriesLimit: &activeSeriesLimit{ - limit: o.MaxPerTenantLimit, - metaMonitoringURL: o.MetaMonitoringUrl, - metaMonitoringQuery: o.MetaMonitoringLimitQuery, - configuredTenantLimit: promauto.With(registerer).NewGauge( - prometheus.GaugeOpts{ - Name: "thanos_receive_tenant_head_series_limit", - Help: "The configured limit for active (head) series of tenants.", - }, - ), - limitedRequests: promauto.With(registerer).NewCounterVec( - prometheus.CounterOpts{ - Name: "thanos_receive_head_series_limited_requests_total", - Help: "The total number of remote write requests that have been dropped due to active series limiting.", - }, []string{"tenant"}, - ), - metaMonitoringErr: promauto.With(registerer).NewCounter( - prometheus.CounterOpts{ - Name: "thanos_receive_metamonitoring_failed_queries_total", - Help: "The total number of meta-monitoring queries that failed while limiting.", - }, - ), - }, writeTimeseriesTotal: promauto.With(registerer).NewHistogramVec( prometheus.HistogramOpts{ Namespace: "thanos", @@ -243,25 +211,9 @@ func NewHandler(logger log.Logger, o *Options) *Handler { h.replicationFactor.Set(1) } - h.ActiveSeriesLimit.configuredTenantLimit.Set(float64(o.MaxPerTenantLimit)) - h.ActiveSeriesLimit.tenantCurrentSeriesMap = map[string]float64{} - - if (h.receiverMode == RouterOnly || h.receiverMode == RouterIngestor) && h.options.MaxPerTenantLimit != 0 { - // Use specified HTTPConfig to make requests to meta-monitoring. - httpConfContentYaml, err := h.options.MetaMonitoringHttpClient.Content() - if err != nil { - level.Error(h.logger).Log("msg", "getting http client config", "err", err.Error()) - } - - httpClientConfig, err := httpconfig.NewClientConfigFromYAML(httpConfContentYaml) - if err != nil { - level.Error(h.logger).Log("msg", "parsing http config YAML", "err", err.Error()) - } - - h.ActiveSeriesLimit.metaMonitoringClient, err = httpconfig.NewHTTPClient(*httpClientConfig, "meta-mon-for-limit") - if err != nil { - level.Error(h.logger).Log("msg", "improper http client config", "err", err.Error()) - } + h.ActiveSeriesLimit = NewNopSeriesLimit() + if (h.receiverMode == RouterOnly || h.receiverMode == RouterIngestor) && o.MaxPerTenantLimit != 0 { + h.ActiveSeriesLimit = NewActiveSeriesLimit(h.options, registerer, h.receiverMode, logger) } ins := extpromhttp.NewNopInstrumentationMiddleware() @@ -564,12 +516,72 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples)) } +// activeSeriesLimit implements activeSeriesLimiter interface. +type activeSeriesLimit struct { + mtx sync.RWMutex + limit uint64 + tenantCurrentSeriesMap map[string]float64 + + metaMonitoringURL *url.URL + metaMonitoringClient *http.Client + metaMonitoringQuery string + + configuredTenantLimit prometheus.Gauge + limitedRequests *prometheus.CounterVec + metaMonitoringErr prometheus.Counter +} + +func NewActiveSeriesLimit(o *Options, registerer prometheus.Registerer, r ReceiverMode, logger log.Logger) *activeSeriesLimit { + limit := &activeSeriesLimit{ + limit: o.MaxPerTenantLimit, + metaMonitoringURL: o.MetaMonitoringUrl, + metaMonitoringQuery: o.MetaMonitoringLimitQuery, + configuredTenantLimit: promauto.With(registerer).NewGauge( + prometheus.GaugeOpts{ + Name: "thanos_receive_tenant_head_series_limit", + Help: "The configured limit for active (head) series of tenants.", + }, + ), + limitedRequests: promauto.With(registerer).NewCounterVec( + prometheus.CounterOpts{ + Name: "thanos_receive_head_series_limited_requests_total", + Help: "The total number of remote write requests that have been dropped due to active series limiting.", + }, []string{"tenant"}, + ), + metaMonitoringErr: promauto.With(registerer).NewCounter( + prometheus.CounterOpts{ + Name: "thanos_receive_metamonitoring_failed_queries_total", + Help: "The total number of meta-monitoring queries that failed while limiting.", + }, + ), + } + + limit.configuredTenantLimit.Set(float64(o.MaxPerTenantLimit)) + limit.tenantCurrentSeriesMap = map[string]float64{} + + // Use specified HTTPConfig to make requests to meta-monitoring. + httpConfContentYaml, err := o.MetaMonitoringHttpClient.Content() + if err != nil { + level.Error(logger).Log("msg", "getting http client config", "err", err.Error()) + } + + httpClientConfig, err := httpconfig.NewClientConfigFromYAML(httpConfContentYaml) + if err != nil { + level.Error(logger).Log("msg", "parsing http config YAML", "err", err.Error()) + } + + limit.metaMonitoringClient, err = httpconfig.NewHTTPClient(*httpClientConfig, "meta-mon-for-limit") + if err != nil { + level.Error(logger).Log("msg", "improper http client config", "err", err.Error()) + } + + return limit +} + // QueryMetaMonitoring queries any Prometheus Query API compatible meta-monitoring // solution with the configured query for getting current active (head) series of all tenants. // It then populates tenantCurrentSeries map with result. func (a *activeSeriesLimit) QueryMetaMonitoring(ctx context.Context, logger log.Logger) error { - a.mtx.Lock() - defer a.mtx.Unlock() c := promclient.NewWithTracingClient(logger, a.metaMonitoringClient, httpconfig.ThanosUserAgent) vectorRes, _, err := c.QueryInstant(ctx, a.metaMonitoringURL, a.metaMonitoringQuery, time.Now(), promclient.QueryOptions{}) @@ -580,6 +592,8 @@ func (a *activeSeriesLimit) QueryMetaMonitoring(ctx context.Context, logger log. level.Debug(logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes)) + a.mtx.Lock() + defer a.mtx.Unlock() // Construct map of tenant name and current HEAD series. for _, e := range vectorRes { for k, v := range e.Metric { @@ -597,8 +611,8 @@ func (a *activeSeriesLimit) QueryMetaMonitoring(ctx context.Context, logger log. // It does so in a best-effort way, i.e, in case meta-monitoring is unreachable, it does not impose limits. // TODO(saswatamcode): Add capability to configure different limits for different tenants. func (a *activeSeriesLimit) isUnderLimit(tenant string, logger log.Logger) (bool, error) { - a.mtx.Lock() - defer a.mtx.Unlock() + a.mtx.RLock() + defer a.mtx.RUnlock() if a.limit == 0 || a.metaMonitoringURL.Host == "" { return true, nil } @@ -622,6 +636,21 @@ func (a *activeSeriesLimit) isUnderLimit(tenant string, logger log.Logger) (bool return true, nil } +// nopSeriesLimit implements activeSeriesLimiter interface as no-op. +type nopSeriesLimit struct{} + +func NewNopSeriesLimit() *nopSeriesLimit { + return &nopSeriesLimit{} +} + +func (a *nopSeriesLimit) QueryMetaMonitoring(_ context.Context, _ log.Logger) error { + return nil +} + +func (a *nopSeriesLimit) isUnderLimit(_ string, _ log.Logger) (bool, error) { + return true, nil +} + // forward accepts a write request, batches its time series by // corresponding endpoint, and forwards them in parallel to the // correct endpoint. Requests destined for the local node are written