diff --git a/CHANGELOG.md b/CHANGELOG.md index 3bed7e54de60..b8fced886441 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ * [9252](https://github.com/grafana/loki/pull/9252) **jeschkies**: Use un-escaped regex literal for string matching. * [9176](https://github.com/grafana/loki/pull/9176) **DylanGuedes**: Fix incorrect association of per-stream rate limit when sharding is enabled. * [9463](https://github.com/grafana/loki/pull/9463) **Totalus**: Fix OpenStack Swift client object listing to fetch all the objects properly. +* [9471](https://github.com/grafana/loki/pull/9471) **sandeepsukhani**: query-scheduler: fix query distribution in SSD mode. * [9495](https://github.com/grafana/loki/pull/9495) **thampiotr**: Promtail: Fix potential goroutine leak in file tailer. * [9629](https://github.com/grafana/loki/pull/9629) **periklis**: Fix duplicate label values from ingester streams. diff --git a/integration/cluster/cluster.go b/integration/cluster/cluster.go index 754df3eba8b4..56020f81be41 100644 --- a/integration/cluster/cluster.go +++ b/integration/cluster/cluster.go @@ -33,8 +33,6 @@ import ( ) var ( - wrapRegistryOnce sync.Once - configTemplate = template.Must(template.New("").Parse(` auth_enabled: true @@ -108,18 +106,18 @@ ruler: `)) ) -func wrapRegistry() { - wrapRegistryOnce.Do(func() { - prometheus.DefaultRegisterer = &wrappedRegisterer{Registerer: prometheus.DefaultRegisterer} - }) +func resetMetricRegistry() { + registry := &wrappedRegisterer{Registry: prometheus.NewRegistry()} + prometheus.DefaultRegisterer = registry + prometheus.DefaultGatherer = registry } type wrappedRegisterer struct { - prometheus.Registerer + *prometheus.Registry } func (w *wrappedRegisterer) Register(collector prometheus.Collector) error { - if err := w.Registerer.Register(collector); err != nil { + if err := w.Registry.Register(collector); err != nil { var aErr prometheus.AlreadyRegisteredError if errors.As(err, &aErr) { return nil @@ -151,7 +149,7 @@ func New(logLevel level.Value, opts ...func(*Cluster)) *Cluster { util_log.Logger = level.NewFilter(log.NewLogfmtLogger(os.Stderr), level.Allow(logLevel)) } - wrapRegistry() + resetMetricRegistry() sharedPath, err := os.MkdirTemp("", "loki-shared-data") if err != nil { panic(err.Error()) diff --git a/integration/loki_micro_services_delete_test.go b/integration/loki_micro_services_delete_test.go index 4dce910e285c..1ba0e3e2c20c 100644 --- a/integration/loki_micro_services_delete_test.go +++ b/integration/loki_micro_services_delete_test.go @@ -310,8 +310,13 @@ func checkUserLabelAndMetricValue(t *testing.T, metricName, metrics, tenantID st } func checkMetricValue(t *testing.T, metricName, metrics string, expectedValue float64) { + t.Helper() + require.Equal(t, expectedValue, getMetricValue(t, metricName, metrics)) +} + +func getMetricValue(t *testing.T, metricName, metrics string) float64 { t.Helper() val, _, err := extractMetric(metricName, metrics) require.NoError(t, err) - require.Equal(t, expectedValue, val) + return val } diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go index dbc6364426fc..359e904b5b46 100644 --- a/integration/loki_micro_services_test.go +++ b/integration/loki_micro_services_test.go @@ -259,3 +259,121 @@ func TestMicroServicesMultipleBucketSingleProvider(t *testing.T) { }) } } + +func TestSchedulerRing(t *testing.T) { + clu := cluster.New(nil) + defer func() { + assert.NoError(t, clu.Cleanup()) + }() + + // run initially the compactor, indexgateway, and distributor. + var ( + tCompactor = clu.AddComponent( + "compactor", + "-target=compactor", + "-boltdb.shipper.compactor.compaction-interval=1s", + "-boltdb.shipper.compactor.retention-delete-delay=1s", + // By default, a minute is added to the delete request start time. This compensates for that. + "-boltdb.shipper.compactor.delete-request-cancel-period=-60s", + "-compactor.deletion-mode=filter-and-delete", + ) + tIndexGateway = clu.AddComponent( + "index-gateway", + "-target=index-gateway", + ) + tDistributor = clu.AddComponent( + "distributor", + "-target=distributor", + ) + ) + require.NoError(t, clu.Run()) + + // then, run only the ingester and query scheduler. + var ( + tIngester = clu.AddComponent( + "ingester", + "-target=ingester", + "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + ) + tQueryScheduler = clu.AddComponent( + "query-scheduler", + "-target=query-scheduler", + "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + "-query-scheduler.use-scheduler-ring=true", + ) + ) + require.NoError(t, clu.Run()) + + // finally, run the query-frontend and querier. + var ( + tQueryFrontend = clu.AddComponent( + "query-frontend", + "-target=query-frontend", + "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + "-common.compactor-address="+tCompactor.HTTPURL(), + "-querier.per-request-limits-enabled=true", + "-query-scheduler.use-scheduler-ring=true", + "-frontend.scheduler-worker-concurrency=5", + ) + _ = clu.AddComponent( + "querier", + "-target=querier", + "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), + "-common.compactor-address="+tCompactor.HTTPURL(), + "-query-scheduler.use-scheduler-ring=true", + "-querier.max-concurrent=4", + ) + ) + require.NoError(t, clu.Run()) + + tenantID := randStringRunes() + + now := time.Now() + cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL()) + cliDistributor.Now = now + cliIngester := client.New(tenantID, "", tIngester.HTTPURL()) + cliIngester.Now = now + cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL()) + cliQueryFrontend.Now = now + cliQueryScheduler := client.New(tenantID, "", tQueryScheduler.HTTPURL()) + cliQueryScheduler.Now = now + + t.Run("verify-scheduler-connections", func(t *testing.T) { + require.Eventually(t, func() bool { + // Check metrics to see if query scheduler is connected with query-frontend + metrics, err := cliQueryScheduler.Metrics() + require.NoError(t, err) + return getMetricValue(t, "cortex_query_scheduler_connected_frontend_clients", metrics) == 5 + }, 5*time.Second, 500*time.Millisecond) + + require.Eventually(t, func() bool { + // Check metrics to see if query scheduler is connected with query-frontend + metrics, err := cliQueryScheduler.Metrics() + require.NoError(t, err) + return getMetricValue(t, "cortex_query_scheduler_connected_querier_clients", metrics) == 4 + }, 5*time.Second, 500*time.Millisecond) + }) + + t.Run("ingest-logs", func(t *testing.T) { + // ingest some log lines + require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", now.Add(-45*time.Minute), map[string]string{"job": "fake"})) + require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", now.Add(-45*time.Minute), map[string]string{"job": "fake"})) + + require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"})) + require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"})) + }) + + t.Run("query", func(t *testing.T) { + resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`) + require.NoError(t, err) + assert.Equal(t, "streams", resp.Data.ResultType) + + var lines []string + for _, stream := range resp.Data.Stream { + for _, val := range stream.Values { + lines = append(lines, val[1]) + } + } + assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines) + }) +} diff --git a/integration/loki_rule_eval_test.go b/integration/loki_rule_eval_test.go index 01b711d0b4da..dc840f939f91 100644 --- a/integration/loki_rule_eval_test.go +++ b/integration/loki_rule_eval_test.go @@ -71,6 +71,7 @@ func testRuleEval(t *testing.T, mode string) { // and we have a circular dependency with the backend "-common.compactor-address=http://fake", "-legacy-read-mode=false", + "-query-scheduler.use-scheduler-ring=false", ) require.NoError(t, clu.Run()) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index ccd8417e0ee1..d70034ac0d16 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -342,33 +342,34 @@ type Loki struct { deps map[string][]string SignalHandler *signals.Handler - Server *server.Server - InternalServer *server.Server - ring *ring.Ring - Overrides limiter.CombinedLimits - tenantConfigs *runtime.TenantConfigs - TenantLimits validation.TenantLimits - distributor *distributor.Distributor - Ingester ingester.Interface - Querier querier.Querier - cacheGenerationLoader queryrangebase.CacheGenNumberLoader - querierAPI *querier.QuerierAPI - ingesterQuerier *querier.IngesterQuerier - Store storage.Store - tableManager *index.TableManager - frontend Frontend - ruler *base_ruler.Ruler - ruleEvaluator ruler.Evaluator - RulerStorage rulestore.RuleStore - rulerAPI *base_ruler.API - stopper queryrange.Stopper - runtimeConfig *runtimeconfig.Manager - MemberlistKV *memberlist.KVInitService - compactor *compactor.Compactor - QueryFrontEndTripperware basetripper.Tripperware - queryScheduler *scheduler.Scheduler - usageReport *analytics.Reporter - indexGatewayRingManager *indexgateway.RingManager + Server *server.Server + InternalServer *server.Server + ring *ring.Ring + Overrides limiter.CombinedLimits + tenantConfigs *runtime.TenantConfigs + TenantLimits validation.TenantLimits + distributor *distributor.Distributor + Ingester ingester.Interface + Querier querier.Querier + cacheGenerationLoader queryrangebase.CacheGenNumberLoader + querierAPI *querier.QuerierAPI + ingesterQuerier *querier.IngesterQuerier + Store storage.Store + tableManager *index.TableManager + frontend Frontend + ruler *base_ruler.Ruler + ruleEvaluator ruler.Evaluator + RulerStorage rulestore.RuleStore + rulerAPI *base_ruler.API + stopper queryrange.Stopper + runtimeConfig *runtimeconfig.Manager + MemberlistKV *memberlist.KVInitService + compactor *compactor.Compactor + QueryFrontEndTripperware basetripper.Tripperware + queryScheduler *scheduler.Scheduler + querySchedulerRingManager *scheduler.RingManager + usageReport *analytics.Reporter + indexGatewayRingManager *indexgateway.RingManager clientMetrics storage.ClientMetrics deleteClientMetrics *deletion.DeleteRequestClientMetrics @@ -634,8 +635,9 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(TableManager, t.initTableManager) mm.RegisterModule(Compactor, t.initCompactor) mm.RegisterModule(IndexGateway, t.initIndexGateway) - mm.RegisterModule(QueryScheduler, t.initQueryScheduler) mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule) + mm.RegisterModule(QueryScheduler, t.initQueryScheduler) + mm.RegisterModule(QuerySchedulerRing, t.initQuerySchedulerRing, modules.UserInvisibleModule) mm.RegisterModule(Analytics, t.initAnalytics) mm.RegisterModule(CacheGenerationLoader, t.initCacheGenerationLoader) @@ -654,16 +656,17 @@ func (t *Loki) setupModuleManager() error { Distributor: {Ring, Server, Overrides, TenantConfigs, Analytics}, Store: {Overrides, IndexGatewayRing, IngesterQuerier}, Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics}, - Querier: {Store, Ring, Server, Overrides, Analytics, CacheGenerationLoader}, + Querier: {Store, Ring, Server, Overrides, Analytics, CacheGenerationLoader, QuerySchedulerRing}, QueryFrontendTripperware: {Server, Overrides, TenantConfigs}, - QueryFrontend: {QueryFrontendTripperware, Analytics, CacheGenerationLoader}, - QueryScheduler: {Server, Overrides, MemberlistKV, Analytics}, + QueryFrontend: {QueryFrontendTripperware, Analytics, CacheGenerationLoader, QuerySchedulerRing}, + QueryScheduler: {Server, Overrides, MemberlistKV, Analytics, QuerySchedulerRing}, Ruler: {Ring, Server, RulerStorage, RuleEvaluator, Overrides, TenantConfigs, Analytics}, RuleEvaluator: {Ring, Server, Store, Overrides, TenantConfigs, Analytics}, TableManager: {Server, Analytics}, Compactor: {Server, Overrides, MemberlistKV, Analytics}, IndexGateway: {Server, Store, Overrides, Analytics, MemberlistKV, IndexGatewayRing}, IngesterQuerier: {Ring}, + QuerySchedulerRing: {RuntimeConfig, Server, MemberlistKV}, IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, Read: {QueryFrontend, Querier}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 20dcc8d9c792..0c97aaf93b7a 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -105,6 +105,7 @@ const ( IndexGateway string = "index-gateway" IndexGatewayRing string = "index-gateway-ring" QueryScheduler string = "query-scheduler" + QuerySchedulerRing string = "query-scheduler-ring" All string = "all" Read string = "read" Write string = "write" @@ -361,7 +362,7 @@ func (t *Loki) initQuerier() (services.Service, error) { QuerierWorkerConfig: &t.Cfg.Worker, QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend), QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler), - SchedulerRing: scheduler.SafeReadRing(t.queryScheduler), + SchedulerRing: scheduler.SafeReadRing(t.querySchedulerRingManager), } toMerge := []middleware.Interface{ @@ -781,7 +782,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { } roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend( combinedCfg, - scheduler.SafeReadRing(t.queryScheduler), + scheduler.SafeReadRing(t.querySchedulerRingManager), disabledShuffleShardingLimits{}, t.Cfg.Server.GRPCListenPort, util_log.Logger, @@ -1227,24 +1228,45 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) { } func (t *Loki) initQueryScheduler() (services.Service, error) { - // Set some config sections from other config sections in the config struct - t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort - - s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, t.querySchedulerRingManager, prometheus.DefaultRegisterer) if err != nil { return nil, err } schedulerpb.RegisterSchedulerForFrontendServer(t.Server.GRPC, s) schedulerpb.RegisterSchedulerForQuerierServer(t.Server.GRPC, s) - t.Server.HTTP.Path("/scheduler/ring").Methods("GET", "POST").Handler(s) + + t.queryScheduler = s + return s, nil +} + +func (t *Loki) initQuerySchedulerRing() (_ services.Service, err error) { + if !t.Cfg.QueryScheduler.UseSchedulerRing { + return + } + + // Set some config sections from other config sections in the config struct + t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort + + managerMode := scheduler.RingManagerModeReader + if t.Cfg.isModuleEnabled(QueryScheduler) || t.Cfg.isModuleEnabled(Backend) || t.Cfg.isModuleEnabled(All) || (t.Cfg.LegacyReadTarget && t.Cfg.isModuleEnabled(Read)) { + managerMode = scheduler.RingManagerModeMember + } + rm, err := scheduler.NewRingManager(managerMode, t.Cfg.QueryScheduler, util_log.Logger, prometheus.DefaultRegisterer) + + if err != nil { + return nil, gerrors.Wrap(err, "new scheduler ring manager") + } + + t.querySchedulerRingManager = rm + + t.Server.HTTP.Path("/scheduler/ring").Methods("GET", "POST").Handler(t.querySchedulerRingManager) if t.Cfg.InternalServer.Enable { - t.InternalServer.HTTP.Path("/scheduler/ring").Methods("GET").Handler(s) + t.InternalServer.HTTP.Path("/scheduler/ring").Methods("GET").Handler(t.querySchedulerRingManager) } - t.queryScheduler = s - return s, nil + return t.querySchedulerRingManager, nil } func (t *Loki) initQueryLimiter() (services.Service, error) { diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go index b9d06bf83532..6ecb42ec6c17 100644 --- a/pkg/querier/worker_service.go +++ b/pkg/querier/worker_service.go @@ -93,9 +93,9 @@ func InitWorkerService( externalRouter.Path(route).Methods("GET", "POST").Handler(handlerMiddleware.Wrap(internalRouter)) } - //If no frontend or scheduler address has been configured, then there is no place for the + //If no scheduler ring or frontend or scheduler address has been configured, then there is no place for the //querier worker to request work from, so no need to start a worker service - if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" { + if cfg.SchedulerRing == nil && (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" { return nil, nil } diff --git a/pkg/scheduler/lifecycle.go b/pkg/scheduler/lifecycle.go new file mode 100644 index 000000000000..b29b96788f2b --- /dev/null +++ b/pkg/scheduler/lifecycle.go @@ -0,0 +1,28 @@ +package scheduler + +import ( + "github.com/grafana/dskit/ring" +) + +func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { + // When we initialize the scheduler instance in the ring we want to start from + // a clean situation, so whatever is the state we set it JOINING, while we keep existing + // tokens (if any) or the ones loaded from file. + var tokens []uint32 + if instanceExists { + tokens = instanceDesc.GetTokens() + } + + takenTokens := ringDesc.GetTokens() + newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens) + + // Tokens sorting will be enforced by the parent caller. + tokens = append(tokens, newTokens...) + + return ring.JOINING, tokens +} + +func (rm *RingManager) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {} +func (rm *RingManager) OnRingInstanceStopping(_ *ring.BasicLifecycler) {} +func (rm *RingManager) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { +} diff --git a/pkg/scheduler/ringmanager.go b/pkg/scheduler/ringmanager.go new file mode 100644 index 000000000000..b7c827243941 --- /dev/null +++ b/pkg/scheduler/ringmanager.go @@ -0,0 +1,252 @@ +package scheduler + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/services" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance + // in the ring will be automatically removed. + ringAutoForgetUnhealthyPeriods = 10 + + // ringKey is the key under which we store the store gateways ring in the KVStore. + ringKey = "scheduler" + + // ringNameForServer is the name of the ring used by the compactor server. + ringNameForServer = "scheduler" + + // ringReplicationFactor should be 2 because we want 2 schedulers. + ringReplicationFactor = 2 + + // ringNumTokens sets our single token in the ring, + // we only need to insert 1 token to be used for leader election purposes. + ringNumTokens = 1 + + // ringCheckPeriod is how often we check the ring to see if this instance is still in + // the replicaset of instances to act as schedulers. + ringCheckPeriod = 3 * time.Second +) + +// RingManagerMode defines the different modes for the RingManager to execute. +// +// The RingManager and its modes are only relevant if the Scheduler discovery is done using ring. +type RingManagerMode int + +const ( + // RingManagerModeReader is the RingManager mode executed by Loki components that want to discover Scheduler instances. + // The RingManager in reader mode will have its own ring key-value store client, but it won't try to register itself in the ring. + RingManagerModeReader RingManagerMode = iota + + // RingManagerModeMember is the RingManager mode execute by the Schedulers to register themselves in the ring. + RingManagerModeMember +) + +// RingManager is a component instantiated before all the others and is responsible for the ring setup. +// +// All Loki components that are involved with the Schedulers (including the Schedulers itself) will +// require a RingManager. However, the components that are clients of the Schedulers will run it in reader +// mode while the Schedulers itself will run the manager in member mode. +type RingManager struct { + services.Service + + subservices *services.Manager + subservicesWatcher *services.FailureWatcher + + RingLifecycler *ring.BasicLifecycler + Ring *ring.Ring + managerMode RingManagerMode + + cfg Config + + log log.Logger +} + +// NewRingManager is the recommended way of instantiating a RingManager. +// +// The other functions will assume the RingManager was instantiated through this function. +func NewRingManager(managerMode RingManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer) (*RingManager, error) { + rm := &RingManager{ + cfg: cfg, log: log, managerMode: managerMode, + } + + if !cfg.UseSchedulerRing { + return nil, fmt.Errorf("ring manager shouldn't be invoked when ring is not used for discovering schedulers") + } + + // instantiate kv store for both modes. + ringStore, err := kv.NewClient( + rm.cfg.SchedulerRing.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "scheduler"), + rm.log, + ) + if err != nil { + return nil, errors.Wrap(err, "scheduler ring manager create KV store client") + } + + // instantiate ring for both mode modes. + ringCfg := rm.cfg.SchedulerRing.ToRingConfig(ringReplicationFactor) + rm.Ring, err = ring.NewWithStoreClientAndStrategy( + ringCfg, + ringNameForServer, + ringKey, + ringStore, + ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), + prometheus.WrapRegistererWithPrefix("cortex_", registerer), + rm.log, + ) + if err != nil { + return nil, errors.Wrap(err, "failed to create ring client for scheduler ring manager") + } + + if managerMode == RingManagerModeMember { + if err := rm.startMemberMode(ringStore, registerer); err != nil { + return nil, err + } + return rm, nil + } + + if err := rm.startReaderMode(); err != nil { + return nil, err + } + return rm, nil +} + +func (rm *RingManager) startMemberMode(ringStore kv.Client, registerer prometheus.Registerer) error { + lifecyclerCfg, err := rm.cfg.SchedulerRing.ToLifecyclerConfig(ringNumTokens, rm.log) + if err != nil { + return errors.Wrap(err, "invalid ring lifecycler config") + } + + delegate := ring.BasicLifecyclerDelegate(rm) + delegate = ring.NewLeaveOnStoppingDelegate(delegate, rm.log) + delegate = ring.NewTokensPersistencyDelegate(rm.cfg.SchedulerRing.TokensFilePath, ring.JOINING, delegate, rm.log) + delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*rm.cfg.SchedulerRing.HeartbeatTimeout, delegate, rm.log) + + rm.RingLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, ringKey, ringStore, delegate, rm.log, registerer) + if err != nil { + return errors.Wrap(err, "failed to create ring lifecycler for scheduler ring manager") + } + + svcs := []services.Service{rm.RingLifecycler, rm.Ring} + rm.subservices, err = services.NewManager(svcs...) + if err != nil { + return errors.Wrap(err, "failed to create services manager for scheduler ring manager in member mode") + } + + rm.subservicesWatcher = services.NewFailureWatcher() + rm.subservicesWatcher.WatchManager(rm.subservices) + rm.Service = services.NewBasicService(rm.starting, rm.running, rm.stopping) + + return nil +} + +func (rm *RingManager) startReaderMode() error { + var err error + + svcs := []services.Service{rm.Ring} + rm.subservices, err = services.NewManager(svcs...) + if err != nil { + return errors.Wrap(err, "failed to create services manager for scheduler ring manager in reader mode") + } + + rm.subservicesWatcher = services.NewFailureWatcher() + rm.subservicesWatcher.WatchManager(rm.subservices) + + rm.Service = services.NewIdleService(func(ctx context.Context) error { + return services.StartManagerAndAwaitHealthy(ctx, rm.subservices) + }, func(failureCase error) error { + return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices) + }) + + return nil +} + +// starting implements the Lifecycler interface and is one of the lifecycle hooks. +func (rm *RingManager) starting(ctx context.Context) (err error) { + // In case this function will return error we want to unregister the instance + // from the ring. We do it ensuring dependencies are gracefully stopped if they + // were already started. + defer func() { + if err == nil || rm.subservices == nil { + return + } + + if stopErr := services.StopManagerAndAwaitStopped(context.Background(), rm.subservices); stopErr != nil { + level.Error(rm.log).Log("msg", "failed to gracefully stop scheduler ring manager dependencies", "err", stopErr) + } + }() + + if err := services.StartManagerAndAwaitHealthy(ctx, rm.subservices); err != nil { + return errors.Wrap(err, "unable to start scheduler ring manager subservices") + } + + // The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that + // someone wants to do can be done before becoming ACTIVE. For the schedulers we don't currently + // have any additional work so we can become ACTIVE right away. + // Wait until the ring client detected this instance in the JOINING + // state to make sure that when we'll run the initial sync we already + // know the tokens assigned to this instance. + level.Info(rm.log).Log("msg", "waiting until scheduler is JOINING in the ring") + if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.JOINING); err != nil { + return err + } + level.Info(rm.log).Log("msg", "scheduler is JOINING in the ring") + + if err = rm.RingLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { + return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE) + } + + // Wait until the ring client detected this instance in the ACTIVE state to + // make sure that when we'll run the loop it won't be detected as a ring + // topology change. + level.Info(rm.log).Log("msg", "waiting until scheduler is ACTIVE in the ring") + if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { + return err + } + level.Info(rm.log).Log("msg", "scheduler is ACTIVE in the ring") + + return nil +} + +// running implements the Lifecycler interface and is one of the lifecycle hooks. +func (rm *RingManager) running(ctx context.Context) error { + t := time.NewTicker(ringCheckPeriod) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return nil + case err := <-rm.subservicesWatcher.Chan(): + return errors.Wrap(err, "running scheduler ring manager subservice failed") + case <-t.C: + continue + } + } +} + +// stopping implements the Lifecycler interface and is one of the lifecycle hooks. +func (rm *RingManager) stopping(_ error) error { + level.Debug(rm.log).Log("msg", "stopping scheduler ring manager") + return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices) +} + +// ServeHTTP serves the HTTP route /scheduler/ring. +func (rm *RingManager) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if rm.cfg.UseSchedulerRing { + rm.Ring.ServeHTTP(w, req) + } else { + _, _ = w.Write([]byte("QueryScheduler running with '-query-scheduler.use-scheduler-ring' set to false.")) + } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index f7eb444836a6..de5ab19599ea 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -14,7 +14,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/grpcclient" - "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" otgrpc "github.com/opentracing-contrib/go-grpc" @@ -36,35 +35,11 @@ import ( "github.com/grafana/loki/pkg/util" lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" lokihttpreq "github.com/grafana/loki/pkg/util/httpreq" - util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/validation" ) var errSchedulerIsNotRunning = errors.New("scheduler is not running") -const ( - // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance - // in the ring will be automatically removed. - ringAutoForgetUnhealthyPeriods = 10 - - // ringKey is the key under which we store the store gateways ring in the KVStore. - ringKey = "scheduler" - - // ringNameForServer is the name of the ring used by the compactor server. - ringNameForServer = "scheduler" - - // ringReplicationFactor should be 2 because we want 2 schedulers. - ringReplicationFactor = 2 - - // ringNumTokens sets our single token in the ring, - // we only need to insert 1 token to be used for leader election purposes. - ringNumTokens = 1 - - // ringCheckPeriod is how often we check the ring to see if this instance is still in - // the replicaset of instances to act as schedulers. - ringCheckPeriod = 3 * time.Second -) - // Scheduler is responsible for queueing and dispatching queries to Queriers. type Scheduler struct { services.Service @@ -98,8 +73,7 @@ type Scheduler struct { inflightRequests prometheus.Summary // Ring used for finding schedulers - ringLifecycler *ring.BasicLifecycler - ring *ring.Ring + ringManager *RingManager // Controls for this being a chosen scheduler shouldRun atomic.Bool @@ -140,7 +114,15 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { } // NewScheduler creates a new Scheduler. -func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Scheduler, error) { +func NewScheduler(cfg Config, limits Limits, log log.Logger, ringManager *RingManager, registerer prometheus.Registerer) (*Scheduler, error) { + if cfg.UseSchedulerRing { + if ringManager == nil { + return nil, errors.New("ring manager can't be empty when use_scheduler_ring is true") + } else if ringManager.managerMode != RingManagerModeMember { + return nil, errors.New("ring manager must be initialized in RingManagerModeMember for query schedulers") + } + } + queueMetrics := queue.NewMetrics("query_scheduler", registerer) s := &Scheduler{ cfg: cfg, @@ -150,8 +132,8 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe pendingRequests: map[requestKey]*schedulerRequest{}, connectedFrontends: map[string]*connectedFrontend{}, queueMetrics: queueMetrics, - - requestQueue: queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, queueMetrics), + ringManager: ringManager, + requestQueue: queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, queueMetrics), } s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ @@ -185,39 +167,6 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe if cfg.UseSchedulerRing { s.shouldRun.Store(false) - ringStore, err := kv.NewClient( - cfg.SchedulerRing.KVStore, - ring.GetCodec(), - kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "scheduler"), - log, - ) - if err != nil { - return nil, errors.Wrap(err, "create KV store client") - } - lifecyclerCfg, err := cfg.SchedulerRing.ToLifecyclerConfig(ringNumTokens, log) - if err != nil { - return nil, errors.Wrap(err, "invalid ring lifecycler config") - } - - // Define lifecycler delegates in reverse order (last to be called defined first because they're - // chained via "next delegate"). - delegate := ring.BasicLifecyclerDelegate(s) - delegate = ring.NewLeaveOnStoppingDelegate(delegate, log) - delegate = ring.NewTokensPersistencyDelegate(cfg.SchedulerRing.TokensFilePath, ring.JOINING, delegate, log) - delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.SchedulerRing.HeartbeatTimeout, delegate, log) - - s.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, ringKey, ringStore, delegate, log, registerer) - if err != nil { - return nil, errors.Wrap(err, "create ring lifecycler") - } - - ringCfg := cfg.SchedulerRing.ToRingConfig(ringReplicationFactor) - s.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, ringKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("cortex_", registerer), util_log.Logger) - if err != nil { - return nil, errors.Wrap(err, "create ring client") - } - - svcs = append(svcs, s.ringLifecycler, s.ring) } else { // Always run if no scheduler ring is being used. s.shouldRun.Store(true) @@ -605,9 +554,6 @@ func (s *Scheduler) isRunningOrStopping() bool { } func (s *Scheduler) starting(ctx context.Context) (err error) { - // In case this function will return error we want to unregister the instance - // from the ring. We do it ensuring dependencies are gracefully stopped if they - // were already started. defer func() { if err == nil || s.subservices == nil { return @@ -622,35 +568,6 @@ func (s *Scheduler) starting(ctx context.Context) (err error) { return errors.Wrap(err, "unable to start scheduler subservices") } - if s.cfg.UseSchedulerRing { - // The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that - // someone wants to do can be done before becoming ACTIVE. For the query scheduler we don't currently - // have any additional work so we can become ACTIVE right away. - - // Wait until the ring client detected this instance in the JOINING state to - // make sure that when we'll run the initial sync we already know the tokens - // assigned to this instance. - level.Info(s.log).Log("msg", "waiting until scheduler is JOINING in the ring") - if err := ring.WaitInstanceState(ctx, s.ring, s.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { - return err - } - level.Info(s.log).Log("msg", "scheduler is JOINING in the ring") - - // Change ring state to ACTIVE - if err = s.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { - return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE) - } - - // Wait until the ring client detected this instance in the ACTIVE state to - // make sure that when we'll run the loop it won't be detected as a ring - // topology change. - level.Info(s.log).Log("msg", "waiting until scheduler is ACTIVE in the ring") - if err := ring.WaitInstanceState(ctx, s.ring, s.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { - return err - } - level.Info(s.log).Log("msg", "scheduler is ACTIVE in the ring") - } - return nil } @@ -675,7 +592,7 @@ func (s *Scheduler) running(ctx context.Context) error { if !s.cfg.UseSchedulerRing { continue } - isInSet, err := util.IsInReplicationSet(s.ring, util.RingKeyOfLeader, s.ringLifecycler.GetInstanceAddr()) + isInSet, err := util.IsInReplicationSet(s.ringManager.Ring, util.RingKeyOfLeader, s.ringManager.RingLifecycler.GetInstanceAddr()) if err != nil { level.Error(s.log).Log("msg", "failed to query the ring to see if scheduler instance is in ReplicatonSet, will try again", "err", err) continue @@ -745,41 +662,10 @@ func (s *Scheduler) getConnectedFrontendClientsMetric() float64 { // SafeReadRing does a nil check on the Scheduler before attempting to return it's ring // this is necessary as many callers of this function will only have a valid Scheduler // reference if the QueryScheduler target has been specified, which is not guaranteed -func SafeReadRing(s *Scheduler) ring.ReadRing { - if s == nil || s.ring == nil || !s.cfg.UseSchedulerRing { +func SafeReadRing(s *RingManager) ring.ReadRing { + if s == nil || s.Ring == nil || !s.cfg.UseSchedulerRing { return nil } - return s.ring -} - -func (s *Scheduler) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { - // When we initialize the scheduler instance in the ring we want to start from - // a clean situation, so whatever is the state we set it JOINING, while we keep existing - // tokens (if any) or the ones loaded from file. - var tokens []uint32 - if instanceExists { - tokens = instanceDesc.GetTokens() - } - - takenTokens := ringDesc.GetTokens() - newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens) - - // Tokens sorting will be enforced by the parent caller. - tokens = append(tokens, newTokens...) - - return ring.JOINING, tokens -} - -func (s *Scheduler) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {} -func (s *Scheduler) OnRingInstanceStopping(_ *ring.BasicLifecycler) {} -func (s *Scheduler) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { -} - -func (s *Scheduler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if s.cfg.UseSchedulerRing { - s.ring.ServeHTTP(w, req) - } else { - _, _ = w.Write([]byte("QueryScheduler running with '-query-scheduler.use-scheduler-ring' set to false.")) - } + return s.Ring } diff --git a/pkg/util/cfg/dynamic.go b/pkg/util/cfg/dynamic.go index 08f8aee896f5..59498a31e33b 100644 --- a/pkg/util/cfg/dynamic.go +++ b/pkg/util/cfg/dynamic.go @@ -23,6 +23,8 @@ func DynamicUnmarshal(dst DynamicCloneable, args []string, fs *flag.FlagSet) err // section of the config file by taking advantage of the code in ConfigFileLoader which will load // and process the config file. ConfigFileLoader(args, "config.file", true), + // Now load the flags again, this will supersede anything set from config file with flags from the command line. + Flags(args, fs), // Apply any dynamic logic to set other defaults in the config. This function is called after parsing the // config files so that values from a common, or shared, section can be used in // the dynamic evaluation