Skip to content

Commit

Permalink
fix issue in distributing queries when running query schedulers in ri…
Browse files Browse the repository at this point in the history
…ng mode
  • Loading branch information
sandeepsukhani committed May 17, 2023
1 parent 01f0ded commit 4148dd2
Show file tree
Hide file tree
Showing 9 changed files with 481 additions and 174 deletions.
7 changes: 6 additions & 1 deletion integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,13 @@ func checkUserLabelAndMetricValue(t *testing.T, metricName, metrics, tenantID st
}

func checkMetricValue(t *testing.T, metricName, metrics string, expectedValue float64) {
t.Helper()
require.Equal(t, expectedValue, getMetricValue(t, metricName, metrics))
}

func getMetricValue(t *testing.T, metricName, metrics string) float64 {
t.Helper()
val, _, err := extractMetric(metricName, metrics)
require.NoError(t, err)
require.Equal(t, expectedValue, val)
return val
}
118 changes: 118 additions & 0 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,121 @@ func TestMicroServicesMultipleBucketSingleProvider(t *testing.T) {
})
}
}

func TestSchedulerRing(t *testing.T) {
clu := cluster.New(nil)
defer func() {
assert.NoError(t, clu.Cleanup())
}()

// run initially the compactor, indexgateway, and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-boltdb.shipper.compactor.compaction-interval=1s",
"-boltdb.shipper.compactor.retention-delete-delay=1s",
// By default, a minute is added to the delete request start time. This compensates for that.
"-boltdb.shipper.compactor.delete-request-cancel-period=-60s",
"-compactor.deletion-mode=filter-and-delete",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())

// then, run only the ingester and query scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-query-scheduler.use-scheduler-ring=true",
)
)
require.NoError(t, clu.Run())

// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.per-request-limits-enabled=true",
"-query-scheduler.use-scheduler-ring=true",
"-frontend.scheduler-worker-concurrency=5",
)
_ = clu.AddComponent(
"querier",
"-target=querier",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-query-scheduler.use-scheduler-ring=true",
"-querier.max-concurrent=4",
)
)
require.NoError(t, clu.Run())

tenantID := randStringRunes()

now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
cliQueryScheduler := client.New(tenantID, "", tQueryScheduler.HTTPURL())
cliQueryScheduler.Now = now

t.Run("verify-scheduler-connections", func(t *testing.T) {
require.Eventually(t, func() bool {
// Check metrics to see if query scheduler is connected with query-frontend
metrics, err := cliQueryScheduler.Metrics()
require.NoError(t, err)
return getMetricValue(t, "cortex_query_scheduler_connected_frontend_clients", metrics) == 5
}, 5*time.Second, 500*time.Millisecond)

require.Eventually(t, func() bool {
// Check metrics to see if query scheduler is connected with query-frontend
metrics, err := cliQueryScheduler.Metrics()
require.NoError(t, err)
return getMetricValue(t, "cortex_query_scheduler_connected_querier_clients", metrics) == 4
}, 5*time.Second, 500*time.Millisecond)
})

t.Run("ingest-logs", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))

require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
})

t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)

var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
}
1 change: 1 addition & 0 deletions integration/loki_rule_eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func testRuleEval(t *testing.T, mode string) {
// and we have a circular dependency with the backend
"-common.compactor-address=http://fake",
"-legacy-read-mode=false",
"-query-scheduler.use-scheduler-ring=false",
)

require.NoError(t, clu.Run())
Expand Down
65 changes: 34 additions & 31 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,33 +342,34 @@ type Loki struct {
deps map[string][]string
SignalHandler *signals.Handler

Server *server.Server
InternalServer *server.Server
ring *ring.Ring
Overrides limiter.CombinedLimits
tenantConfigs *runtime.TenantConfigs
TenantLimits validation.TenantLimits
distributor *distributor.Distributor
Ingester ingester.Interface
Querier querier.Querier
cacheGenerationLoader queryrangebase.CacheGenNumberLoader
querierAPI *querier.QuerierAPI
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
tableManager *index.TableManager
frontend Frontend
ruler *base_ruler.Ruler
ruleEvaluator ruler.Evaluator
RulerStorage rulestore.RuleStore
rulerAPI *base_ruler.API
stopper queryrange.Stopper
runtimeConfig *runtimeconfig.Manager
MemberlistKV *memberlist.KVInitService
compactor *compactor.Compactor
QueryFrontEndTripperware basetripper.Tripperware
queryScheduler *scheduler.Scheduler
usageReport *analytics.Reporter
indexGatewayRingManager *indexgateway.RingManager
Server *server.Server
InternalServer *server.Server
ring *ring.Ring
Overrides limiter.CombinedLimits
tenantConfigs *runtime.TenantConfigs
TenantLimits validation.TenantLimits
distributor *distributor.Distributor
Ingester ingester.Interface
Querier querier.Querier
cacheGenerationLoader queryrangebase.CacheGenNumberLoader
querierAPI *querier.QuerierAPI
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
tableManager *index.TableManager
frontend Frontend
ruler *base_ruler.Ruler
ruleEvaluator ruler.Evaluator
RulerStorage rulestore.RuleStore
rulerAPI *base_ruler.API
stopper queryrange.Stopper
runtimeConfig *runtimeconfig.Manager
MemberlistKV *memberlist.KVInitService
compactor *compactor.Compactor
QueryFrontEndTripperware basetripper.Tripperware
queryScheduler *scheduler.Scheduler
querySchedulerRingManager *scheduler.RingManager
usageReport *analytics.Reporter
indexGatewayRingManager *indexgateway.RingManager

clientMetrics storage.ClientMetrics
deleteClientMetrics *deletion.DeleteRequestClientMetrics
Expand Down Expand Up @@ -632,8 +633,9 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(TableManager, t.initTableManager)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(IndexGateway, t.initIndexGateway)
mm.RegisterModule(QueryScheduler, t.initQueryScheduler)
mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule)
mm.RegisterModule(QueryScheduler, t.initQueryScheduler)
mm.RegisterModule(QuerySchedulerRing, t.initQuerySchedulerRing, modules.UserInvisibleModule)
mm.RegisterModule(Analytics, t.initAnalytics)
mm.RegisterModule(CacheGenerationLoader, t.initCacheGenerationLoader)

Expand All @@ -652,16 +654,17 @@ func (t *Loki) setupModuleManager() error {
Distributor: {Ring, Server, Overrides, TenantConfigs, Analytics},
Store: {Overrides, IndexGatewayRing},
Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics},
Querier: {Store, Ring, Server, IngesterQuerier, Overrides, Analytics, CacheGenerationLoader},
Querier: {Store, Ring, Server, IngesterQuerier, Overrides, Analytics, CacheGenerationLoader, QuerySchedulerRing},
QueryFrontendTripperware: {Server, Overrides, TenantConfigs},
QueryFrontend: {QueryFrontendTripperware, Analytics, CacheGenerationLoader},
QueryScheduler: {Server, Overrides, MemberlistKV, Analytics},
QueryFrontend: {QueryFrontendTripperware, Analytics, CacheGenerationLoader, QuerySchedulerRing},
QueryScheduler: {Server, Overrides, MemberlistKV, Analytics, QuerySchedulerRing},
Ruler: {Ring, Server, RulerStorage, RuleEvaluator, Overrides, TenantConfigs, Analytics},
RuleEvaluator: {Ring, Server, Store, IngesterQuerier, Overrides, TenantConfigs, Analytics},
TableManager: {Server, Analytics},
Compactor: {Server, Overrides, MemberlistKV, Analytics},
IndexGateway: {Server, Store, Overrides, Analytics, MemberlistKV, IndexGatewayRing},
IngesterQuerier: {Ring},
QuerySchedulerRing: {RuntimeConfig, Server, MemberlistKV},
IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
Read: {QueryFrontend, Querier},
Expand Down
42 changes: 32 additions & 10 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const (
IndexGateway string = "index-gateway"
IndexGatewayRing string = "index-gateway-ring"
QueryScheduler string = "query-scheduler"
QuerySchedulerRing string = "query-scheduler-ring"
All string = "all"
Read string = "read"
Write string = "write"
Expand Down Expand Up @@ -356,7 +357,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
QuerierWorkerConfig: &t.Cfg.Worker,
QueryFrontendEnabled: t.Cfg.isModuleEnabled(QueryFrontend),
QuerySchedulerEnabled: t.Cfg.isModuleEnabled(QueryScheduler),
SchedulerRing: scheduler.SafeReadRing(t.queryScheduler),
SchedulerRing: scheduler.SafeReadRing(t.querySchedulerRingManager),
}

toMerge := []middleware.Interface{
Expand Down Expand Up @@ -776,7 +777,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
}
roundTripper, frontendV1, frontendV2, err := frontend.InitFrontend(
combinedCfg,
scheduler.SafeReadRing(t.queryScheduler),
scheduler.SafeReadRing(t.querySchedulerRingManager),
disabledShuffleShardingLimits{},
t.Cfg.Server.GRPCListenPort,
util_log.Logger,
Expand Down Expand Up @@ -1222,24 +1223,45 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {
}

func (t *Loki) initQueryScheduler() (services.Service, error) {
// Set some config sections from other config sections in the config struct
t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort

s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer)
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, t.querySchedulerRingManager, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

schedulerpb.RegisterSchedulerForFrontendServer(t.Server.GRPC, s)
schedulerpb.RegisterSchedulerForQuerierServer(t.Server.GRPC, s)
t.Server.HTTP.Path("/scheduler/ring").Methods("GET", "POST").Handler(s)

t.queryScheduler = s
return s, nil
}

func (t *Loki) initQuerySchedulerRing() (_ services.Service, err error) {
if !t.Cfg.QueryScheduler.UseSchedulerRing {
return
}

// Set some config sections from other config sections in the config struct
t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort

managerMode := scheduler.RingManagerModeReader
if t.Cfg.isModuleEnabled(QueryScheduler) || t.Cfg.isModuleEnabled(Backend) || t.Cfg.isModuleEnabled(All) {
managerMode = scheduler.RingManagerModeMember
}
rm, err := scheduler.NewRingManager(managerMode, t.Cfg.QueryScheduler, util_log.Logger, prometheus.DefaultRegisterer)

if err != nil {
return nil, gerrors.Wrap(err, "new scheduler ring manager")
}

t.querySchedulerRingManager = rm

t.Server.HTTP.Path("/scheduler/ring").Methods("GET", "POST").Handler(t.querySchedulerRingManager)

if t.Cfg.InternalServer.Enable {
t.InternalServer.HTTP.Path("/scheduler/ring").Methods("GET").Handler(s)
t.InternalServer.HTTP.Path("/scheduler/ring").Methods("GET").Handler(t.querySchedulerRingManager)
}

t.queryScheduler = s
return s, nil
return t.querySchedulerRingManager, nil
}

func (t *Loki) initQueryLimiter() (services.Service, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func InitWorkerService(
externalRouter.Path(route).Methods("GET", "POST").Handler(handlerMiddleware.Wrap(internalRouter))
}

//If no frontend or scheduler address has been configured, then there is no place for the
//If no scheduler ring or frontend or scheduler address has been configured, then there is no place for the
//querier worker to request work from, so no need to start a worker service
if (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
if cfg.SchedulerRing == nil && (*cfg.QuerierWorkerConfig).FrontendAddress == "" && (*cfg.QuerierWorkerConfig).SchedulerAddress == "" {
return nil, nil
}

Expand Down
28 changes: 28 additions & 0 deletions pkg/scheduler/lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package scheduler

import (
"github.com/grafana/dskit/ring"
)

func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) {
// When we initialize the index gateway instance in the ring we want to start from
// a clean situation, so whatever is the state we set it JOINING, while we keep existing
// tokens (if any) or the ones loaded from file.
var tokens []uint32
if instanceExists {
tokens = instanceDesc.GetTokens()
}

takenTokens := ringDesc.GetTokens()
newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens)

// Tokens sorting will be enforced by the parent caller.
tokens = append(tokens, newTokens...)

return ring.JOINING, tokens
}

func (rm *RingManager) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {}
func (rm *RingManager) OnRingInstanceStopping(_ *ring.BasicLifecycler) {}
func (rm *RingManager) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) {
}
Loading

0 comments on commit 4148dd2

Please sign in to comment.