From 51a8e66bb29e128b92dfdda1f57f77706fd104c9 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Tue, 8 Oct 2024 11:08:00 +0100 Subject: [PATCH 1/7] feat(kafka): Enable querier to use partition ingesters --- go.mod | 2 +- go.sum | 4 +- pkg/loki/loki.go | 2 +- pkg/loki/modules.go | 12 +- pkg/querier/ingester_querier.go | 73 ++++++--- pkg/querier/ingester_querier_test.go | 143 +++++++++++++++++- pkg/querier/querier.go | 2 + pkg/querier/querier_mock_test.go | 59 +++++++- pkg/querier/querier_test.go | 2 +- .../dskit/ring/partition_instance_ring.go | 16 +- vendor/modules.txt | 2 +- 11 files changed, 283 insertions(+), 34 deletions(-) diff --git a/go.mod b/go.mod index b1ff86617ebe1..0c577c967130a 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.3 github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 - github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf + github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41 github.com/grafana/go-gelf/v2 v2.0.1 github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc diff --git a/go.sum b/go.sum index c1129b0ba29a0..13a0bae65536a 100644 --- a/go.sum +++ b/go.sum @@ -1044,8 +1044,8 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw= -github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf h1:ZafqZwIpdCCMifH9Ok6C98rYaCh5OZeyyHLbU0FPedg= -github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41 h1:a4O59OU3FJZ+EJUVnlvvNTvdAc4uRN1P6EaGwqL9CnA= +github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 84af0a73504f8..f59218307e7d7 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -736,7 +736,7 @@ func (t *Loki) setupModuleManager() error { PatternRingClient: {Server, MemberlistKV, Analytics}, PatternIngesterTee: {Server, MemberlistKV, Analytics, PatternRingClient}, PatternIngester: {Server, MemberlistKV, Analytics, PatternRingClient, PatternIngesterTee}, - IngesterQuerier: {Ring}, + IngesterQuerier: {Ring, PartitionRing, Overrides}, QuerySchedulerRing: {Overrides, MemberlistKV}, IndexGatewayRing: {Overrides, MemberlistKV}, PartitionRing: {MemberlistKV, Server, Ring}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 54c6d574f6e51..75de7a961b402 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -975,8 +975,14 @@ func (t *Loki) setupAsyncStore() error { } func (t *Loki) initIngesterQuerier() (_ services.Service, err error) { - logger := log.With(util_log.Logger, "component", "querier") - t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay, t.Cfg.MetricsNamespace, logger) + logger := log.With(util_log.Logger, "component", "ingester-querier") + + if !t.Cfg.Ingester.KafkaIngestion.Enabled { + // Disable querying partition-ingesters if Kafka ingestion is disabled + t.Cfg.Querier.QueryPartitionIngesters = false + } + + t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.Querier, t.Cfg.IngesterClient, t.ring, t.partitionRing, t.Overrides.IngestionPartitionsTenantShardSize, t.Cfg.MetricsNamespace, logger) if err != nil { return nil, err } @@ -1756,7 +1762,7 @@ func (t *Loki) initAnalytics() (services.Service, error) { // The Ingest Partition Ring is responsible for watching the available ingesters and assigning partitions to incoming requests. func (t *Loki) initPartitionRing() (services.Service, error) { - if !t.Cfg.Ingester.KafkaIngestion.Enabled { + if !t.Cfg.Ingester.KafkaIngestion.Enabled && !t.Cfg.Querier.QueryPartitionIngesters { return nil, nil } diff --git a/pkg/querier/ingester_querier.go b/pkg/querier/ingester_querier.go index c18ca77930667..a87bfaa2dae24 100644 --- a/pkg/querier/ingester_querier.go +++ b/pkg/querier/ingester_querier.go @@ -8,6 +8,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/user" "golang.org/x/exp/slices" "github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume" @@ -40,28 +42,32 @@ type responseFromIngesters struct { // IngesterQuerier helps with querying the ingesters. type IngesterQuerier struct { - ring ring.ReadRing - pool *ring_client.Pool - extraQueryDelay time.Duration - logger log.Logger + querierConfig Config + ring ring.ReadRing + partitionRing *ring.PartitionInstanceRing + getShardCountForTenant func(string) int + pool *ring_client.Pool + logger log.Logger } -func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) { +func NewIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring.ReadRing, partitionRing *ring.PartitionInstanceRing, getShardCountForTenant func(string) int, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) { factory := func(addr string) (ring_client.PoolClient, error) { return client.New(clientCfg, addr) } - return newIngesterQuerier(clientCfg, ring, extraQueryDelay, ring_client.PoolAddrFunc(factory), metricsNamespace, logger) + return newIngesterQuerier(querierConfig, clientCfg, ring, partitionRing, getShardCountForTenant, ring_client.PoolAddrFunc(factory), metricsNamespace, logger) } // newIngesterQuerier creates a new IngesterQuerier and allows to pass a custom ingester client factory // used for testing purposes -func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) { +func newIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring.ReadRing, partitionRing *ring.PartitionInstanceRing, getShardCountForTenant func(string) int, clientFactory ring_client.PoolFactory, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) { iq := IngesterQuerier{ - ring: ring, - pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ring, clientFactory, util_log.Logger, metricsNamespace), - extraQueryDelay: extraQueryDelay, - logger: logger, + querierConfig: querierConfig, + ring: ring, + partitionRing: partitionRing, + getShardCountForTenant: getShardCountForTenant, // limits? + pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ring, clientFactory, util_log.Logger, metricsNamespace), + logger: logger, } err := services.StartAndAwaitRunning(context.Background(), iq.pool) @@ -73,22 +79,51 @@ func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryD } // forAllIngesters runs f, in parallel, for all ingesters -// TODO taken from Cortex, see if we can refactor out an usable interface. func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) { + if q.querierConfig.QueryPartitionIngesters { + tenantID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + tenantShards := q.getShardCountForTenant(tenantID) + // TODO: Confirm the shuffle-shard lookback period == query-ingesters-within + subring, err := q.partitionRing.ShuffleShardWithLookback(tenantID, tenantShards, q.querierConfig.QueryIngestersWithin, time.Now()) + if err != nil { + return nil, err + } + replicationSets, err := subring.GetReplicationSetsForOperation(ring.Read) + return q.forGivenIngesterSets(ctx, replicationSets, f) + } + replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read) if err != nil { return nil, err } - return q.forGivenIngesters(ctx, replicationSet, f) + return q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig(), f) } -// forGivenIngesters runs f, in parallel, for given ingesters -func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) { - cfg := ring.DoUntilQuorumConfig{ +// forGivenIngesterSets runs f, in parallel, for given ingester sets +func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) { + // Enable minimize requests so we initially query a single ingester per replication set, as each replication-set is one partition. + // Ingesters must supply zone information for this to have an effect. + config := ring.DoUntilQuorumConfig{ + MinimizeRequests: true, + } + return concurrency.ForEachJobMergeResults[ring.ReplicationSet, responseFromIngesters](ctx, replicationSet, 0, func(ctx context.Context, set ring.ReplicationSet) ([]responseFromIngesters, error) { + return q.forGivenIngesters(ctx, set, config, f) + }) +} + +func defaultQuorumConfig() ring.DoUntilQuorumConfig { + return ring.DoUntilQuorumConfig{ // Nothing here } - results, err := ring.DoUntilQuorum(ctx, replicationSet, cfg, func(ctx context.Context, ingester *ring.InstanceDesc) (responseFromIngesters, error) { +} + +// forGivenIngesters runs f, in parallel, for given ingesters +func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, quorumConfig ring.DoUntilQuorumConfig, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) { + results, err := ring.DoUntilQuorum(ctx, replicationSet, quorumConfig, func(ctx context.Context, ingester *ring.InstanceDesc) (responseFromIngesters, error) { client, err := q.pool.GetClientFor(ingester.Addr) if err != nil { return responseFromIngesters{addr: ingester.Addr}, err @@ -212,7 +247,7 @@ func (q *IngesterQuerier) TailDisconnectedIngesters(ctx context.Context, req *lo } // Instance a tail client for each ingester to re(connect) - reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Instances: reconnectIngesters}, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) { + reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Instances: reconnectIngesters}, defaultQuorumConfig(), func(_ context.Context, client logproto.QuerierClient) (interface{}, error) { return client.Tail(ctx, req) }) if err != nil { @@ -260,7 +295,7 @@ func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found") } - responses, err := q.forGivenIngesters(ctx, replicationSet, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) { + responses, err := q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig(), func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) { resp, err := querierClient.TailersCount(ctx, &logproto.TailersCountRequest{}) if err != nil { return nil, err diff --git a/pkg/querier/ingester_querier_test.go b/pkg/querier/ingester_querier_test.go index dbc37350f038a..5735a44a2f73f 100644 --- a/pkg/querier/ingester_querier_test.go +++ b/pkg/querier/ingester_querier_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/user" "go.uber.org/atomic" "google.golang.org/grpc/codes" @@ -224,6 +225,131 @@ func TestIngesterQuerier_earlyExitOnQuorum(t *testing.T) { } } +func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) { + t.Parallel() + ctx := user.InjectOrgID(context.Background(), "test-user") + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + ingesters := []ring.InstanceDesc{ + mockInstanceDescWithZone("1.1.1.1", ring.ACTIVE, "A"), + mockInstanceDescWithZone("2.2.2.2", ring.ACTIVE, "B"), + mockInstanceDescWithZone("3.3.3.3", ring.ACTIVE, "A"), + mockInstanceDescWithZone("4.4.4.4", ring.ACTIVE, "B"), + mockInstanceDescWithZone("5.5.5.5", ring.ACTIVE, "A"), + mockInstanceDescWithZone("6.6.6.6", ring.ACTIVE, "B"), + } + + tests := map[string]struct { + method string + testFn func(*IngesterQuerier) error + retVal interface{} + shards int + }{ + "label": { + method: "Label", + testFn: func(ingesterQuerier *IngesterQuerier) error { + _, err := ingesterQuerier.Label(ctx, nil) + return err + }, + retVal: new(logproto.LabelResponse), + }, + "series": { + method: "Series", + testFn: func(ingesterQuerier *IngesterQuerier) error { + _, err := ingesterQuerier.Series(ctx, nil) + return err + }, + retVal: new(logproto.SeriesResponse), + }, + "get_chunk_ids": { + method: "GetChunkIDs", + testFn: func(ingesterQuerier *IngesterQuerier) error { + _, err := ingesterQuerier.GetChunkIDs(ctx, model.Time(0), model.Time(0)) + return err + }, + retVal: new(logproto.GetChunkIDsResponse), + }, + "select_logs": { + method: "Query", + testFn: func(ingesterQuerier *IngesterQuerier) error { + _, err := ingesterQuerier.SelectLogs(ctx, logql.SelectLogParams{ + QueryRequest: new(logproto.QueryRequest), + }) + return err + }, + retVal: newQueryClientMock(), + }, + "select_sample": { + method: "QuerySample", + testFn: func(ingesterQuerier *IngesterQuerier) error { + _, err := ingesterQuerier.SelectSample(ctx, logql.SelectSampleParams{ + SampleQueryRequest: new(logproto.SampleQueryRequest), + }) + return err + }, + retVal: newQuerySampleClientMock(), + }, + "select_logs_shuffle_sharded": { + method: "Query", + testFn: func(ingesterQuerier *IngesterQuerier) error { + _, err := ingesterQuerier.SelectLogs(ctx, logql.SelectLogParams{ + QueryRequest: new(logproto.QueryRequest), + }) + return err + }, + retVal: newQueryClientMock(), + shards: 2, // Must be less than number of partitions + }, + } + + for testName, testData := range tests { + testName, testData := testName, testData + + cnt := atomic.NewInt32(0) + + t.Run(testName, func(t *testing.T) { + cnt.Store(0) + runFn := func(args mock.Arguments) { + ctx := args[0].(context.Context) + + select { + case <-ctx.Done(): + // should not be cancelled by the tracker + require.NoError(t, ctx.Err()) + default: + cnt.Add(1) + } + } + + instanceRing := newReadRingMock(ingesters, 0) + ingesterClient := newQuerierClientMock() + ingesterClient.On(testData.method, mock.Anything, mock.Anything, mock.Anything).Return(testData.retVal, nil).Run(runFn) + + partitions := 3 + ingestersPerPartition := len(ingesters) / partitions + assert.Greaterf(t, ingestersPerPartition, 1, "must have more than one ingester per partition") + + ingesterQuerier, err := newTestPartitionIngesterQuerier(ingesterClient, instanceRing, newPartitionInstanceRingMock(instanceRing, ingesters, partitions, ingestersPerPartition), testData.shards) + require.NoError(t, err) + + ingesterQuerier.querierConfig.QueryPartitionIngesters = true + + err = testData.testFn(ingesterQuerier) + require.NoError(t, err) + + if testData.shards == 0 { + testData.shards = partitions + } + expectedCalls := min(testData.shards, partitions) + // Wait for responses: We expect one request per queried partition because we have request minimization enabled & ingesters are in multiple zones. + // If shuffle sharding is enabled, we expect one query per shard as we write to a subset of partitions. + require.Eventually(t, func() bool { return cnt.Load() >= int32(expectedCalls) }, time.Millisecond*100, time.Millisecond*1, "expected all ingesters to respond") + ingesterClient.AssertNumberOfCalls(t, testData.method, expectedCalls) + }) + } +} + func TestQuerier_tailDisconnectedIngesters(t *testing.T) { t.Parallel() @@ -400,9 +526,24 @@ func TestIngesterQuerier_DetectedLabels(t *testing.T) { func newTestIngesterQuerier(readRingMock *readRingMock, ingesterClient *querierClientMock) (*IngesterQuerier, error) { return newIngesterQuerier( + mockQuerierConfig(), mockIngesterClientConfig(), readRingMock, - mockQuerierConfig().ExtraQueryDelay, + nil, + func(string) int { return 0 }, + newIngesterClientMockFactory(ingesterClient), + constants.Loki, + log.NewNopLogger(), + ) +} + +func newTestPartitionIngesterQuerier(ingesterClient *querierClientMock, instanceRing *readRingMock, partitionRing *ring.PartitionInstanceRing, tenantShards int) (*IngesterQuerier, error) { + return newIngesterQuerier( + mockQuerierConfig(), + mockIngesterClientConfig(), + instanceRing, + partitionRing, + func(string) int { return tenantShards }, newIngesterClientMockFactory(ingesterClient), constants.Loki, log.NewNopLogger(), diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 7c7f973b8c90c..a3a8309829f40 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -72,6 +72,7 @@ type Config struct { QueryIngesterOnly bool `yaml:"query_ingester_only"` MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"` PerRequestLimitsEnabled bool `yaml:"per_request_limits_enabled"` + QueryPartitionIngesters bool `yaml:"query_partition_ingesters" category:"experimental"` } // RegisterFlags register flags. @@ -85,6 +86,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.QueryIngesterOnly, "querier.query-ingester-only", false, "When true, queriers only query the ingesters, and not stored data. This is useful when the object store is unavailable.") f.BoolVar(&cfg.MultiTenantQueriesEnabled, "querier.multi-tenant-queries-enabled", false, "When true, allow queries to span multiple tenants.") f.BoolVar(&cfg.PerRequestLimitsEnabled, "querier.per-request-limits-enabled", false, "When true, querier limits sent via a header are enforced.") + f.BoolVar(&cfg.QueryPartitionIngesters, "querier.query_partition_ingesters", false, "When true, querier directs ingester queries to the partition-ingesters instead of the normal ingesters.") } // Validate validates the config. diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index df89d6b695611..980be3d06dfce 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -17,7 +17,6 @@ import ( "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" - logql_log "github.com/grafana/loki/v3/pkg/logql/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -25,6 +24,8 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" grpc_metadata "google.golang.org/grpc/metadata" + logql_log "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/distributor/clientpool" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/iter" @@ -153,7 +154,7 @@ func newIngesterClientMockFactory(c *querierClientMock) ring_client.PoolFactory func mockIngesterClientConfig() client.Config { return client.Config{ PoolConfig: clientpool.PoolConfig{ - ClientCleanupPeriod: 1 * time.Minute, + ClientCleanupPeriod: 1 * time.Hour, HealthCheckIngesters: false, RemoteTimeout: 1 * time.Second, }, @@ -419,6 +420,54 @@ func newReadRingMock(ingesters []ring.InstanceDesc, maxErrors int) *readRingMock } } +func (r *readRingMock) GetInstance(addr string) (ring.InstanceDesc, error) { + for _, ing := range r.replicationSet.Instances { + if ing.Addr == addr { + return ing, nil + } + } + return ring.InstanceDesc{}, errors.New("instance not found") +} + +// partitionRingMock is a mocked version of a ReadRing, used in querier unit tests +// to control the pool of ingesters available +type partitionRingMock struct { + ring *ring.PartitionRing +} + +func (p partitionRingMock) PartitionRing() *ring.PartitionRing { + return p.ring +} + +func newPartitionInstanceRingMock(ingesterRing ring.InstanceRingReader, ingesters []ring.InstanceDesc, numPartitions int, ingestersPerPartition int) *ring.PartitionInstanceRing { + partitions := make(map[int32]ring.PartitionDesc) + owners := make(map[string]ring.OwnerDesc) + for i := 0; i < numPartitions; i++ { + partitions[int32(i)] = ring.PartitionDesc{ + Id: int32(i), + State: ring.PartitionActive, + Tokens: []uint32{uint32(i)}, + } + + for j := 0; j < ingestersPerPartition; j++ { + ingesterIdx := i*ingestersPerPartition + j + if ingesterIdx < len(ingesters) { + owners[ingesters[ingesterIdx].Id] = ring.OwnerDesc{ + OwnedPartition: int32(i), + State: ring.OwnerActive, + } + } + } + } + partitionRing := partitionRingMock{ + ring: ring.NewPartitionRing(ring.PartitionRingDesc{ + Partitions: partitions, + Owners: owners, + }), + } + return ring.NewPartitionInstanceRing(partitionRing, ingesterRing, time.Hour) +} + func (r *readRingMock) Describe(_ chan<- *prometheus.Desc) { } @@ -518,11 +567,17 @@ func mockReadRingWithOneActiveIngester() *readRingMock { } func mockInstanceDesc(addr string, state ring.InstanceState) ring.InstanceDesc { + return mockInstanceDescWithZone(addr, state, "") +} + +func mockInstanceDescWithZone(addr string, state ring.InstanceState, zone string) ring.InstanceDesc { return ring.InstanceDesc{ + Id: addr, Addr: addr, Timestamp: time.Now().UnixNano(), State: state, Tokens: []uint32{1, 2, 3}, + Zone: zone, } } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index b24491b87f6ba..41265e00df59f 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1360,7 +1360,7 @@ func TestQuerier_SelectSamplesWithDeletes(t *testing.T) { } func newQuerier(cfg Config, clientCfg client.Config, clientFactory ring_client.PoolFactory, ring ring.ReadRing, dg *mockDeleteGettter, store storage.Store, limits *validation.Overrides) (*SingleTenantQuerier, error) { - iq, err := newIngesterQuerier(clientCfg, ring, cfg.ExtraQueryDelay, clientFactory, constants.Loki, util_log.Logger) + iq, err := newIngesterQuerier(cfg, clientCfg, ring, nil, nil, clientFactory, constants.Loki, util_log.Logger) if err != nil { return nil, err } diff --git a/vendor/github.com/grafana/dskit/ring/partition_instance_ring.go b/vendor/github.com/grafana/dskit/ring/partition_instance_ring.go index 2fb15d8af98d7..cffa4b2fcc5d7 100644 --- a/vendor/github.com/grafana/dskit/ring/partition_instance_ring.go +++ b/vendor/github.com/grafana/dskit/ring/partition_instance_ring.go @@ -13,15 +13,25 @@ type PartitionRingReader interface { PartitionRing() *PartitionRing } +type InstanceRingReader interface { + // GetInstance return the InstanceDesc for the given instanceID or an error + // if the instance doesn't exist in the ring. The returned InstanceDesc is NOT a + // deep copy, so the caller should never modify it. + GetInstance(string) (InstanceDesc, error) + + // InstancesCount returns the number of instances in the ring. + InstancesCount() int +} + // PartitionInstanceRing holds a partitions ring and a instances ring, and provide functions // to look up the intersection of the two (e.g. healthy instances by partition). type PartitionInstanceRing struct { partitionsRingReader PartitionRingReader - instancesRing *Ring + instancesRing InstanceRingReader heartbeatTimeout time.Duration } -func NewPartitionInstanceRing(partitionsRingWatcher PartitionRingReader, instancesRing *Ring, heartbeatTimeout time.Duration) *PartitionInstanceRing { +func NewPartitionInstanceRing(partitionsRingWatcher PartitionRingReader, instancesRing InstanceRingReader, heartbeatTimeout time.Duration) *PartitionInstanceRing { return &PartitionInstanceRing{ partitionsRingReader: partitionsRingWatcher, instancesRing: instancesRing, @@ -33,7 +43,7 @@ func (r *PartitionInstanceRing) PartitionRing() *PartitionRing { return r.partitionsRingReader.PartitionRing() } -func (r *PartitionInstanceRing) InstanceRing() *Ring { +func (r *PartitionInstanceRing) InstanceRing() InstanceRingReader { return r.instancesRing } diff --git a/vendor/modules.txt b/vendor/modules.txt index 73779f49f47d7..81d67b023fca7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -986,7 +986,7 @@ github.com/gorilla/websocket # github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 ## explicit; go 1.17 github.com/grafana/cloudflare-go -# github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf +# github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41 ## explicit; go 1.21 github.com/grafana/dskit/aws github.com/grafana/dskit/backoff From 6d0678443f125169fda9167780d46375a459f35c Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Tue, 8 Oct 2024 11:13:03 +0100 Subject: [PATCH 2/7] revert cleanup time --- pkg/querier/querier_mock_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 980be3d06dfce..ab70de4baacea 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -154,7 +154,7 @@ func newIngesterClientMockFactory(c *querierClientMock) ring_client.PoolFactory func mockIngesterClientConfig() client.Config { return client.Config{ PoolConfig: clientpool.PoolConfig{ - ClientCleanupPeriod: 1 * time.Hour, + ClientCleanupPeriod: 1 * time.Minute, HealthCheckIngesters: false, RemoteTimeout: 1 * time.Second, }, From 67b98b054bcd6eda6dbd73d356dd64d18f66b969 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Tue, 8 Oct 2024 11:45:53 +0100 Subject: [PATCH 3/7] line endings --- .golangci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.golangci.yml b/.golangci.yml index bc607cd1bbc1a..9a3e34b7754bf 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -24,7 +24,7 @@ run: - cgo - promtail_journal_enabled - integration - + # output configuration options output: formats: From 035218d348feaea8e9cc40c5816485d5d3b67fe7 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Tue, 8 Oct 2024 14:57:56 +0100 Subject: [PATCH 4/7] docs --- docs/sources/shared/configuration.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 9ee288b493706..b7a10bd142f98 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -4228,6 +4228,11 @@ engine: # When true, querier limits sent via a header are enforced. # CLI flag: -querier.per-request-limits-enabled [per_request_limits_enabled: | default = false] + +# When true, querier directs ingester queries to the partition-ingesters instead +# of the normal ingesters. +# CLI flag: -querier.query_partition_ingesters +[query_partition_ingesters: | default = false] ``` ### query_range From 9ba5f7c074cdf8be6fe3900cb5d5699f26cfee7b Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Wed, 9 Oct 2024 10:46:16 +0100 Subject: [PATCH 5/7] lint --- pkg/querier/ingester_querier.go | 3 +++ pkg/querier/ingester_querier_test.go | 2 -- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/querier/ingester_querier.go b/pkg/querier/ingester_querier.go index a87bfaa2dae24..06c446c99cc70 100644 --- a/pkg/querier/ingester_querier.go +++ b/pkg/querier/ingester_querier.go @@ -92,6 +92,9 @@ func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Co return nil, err } replicationSets, err := subring.GetReplicationSetsForOperation(ring.Read) + if err != nil { + return nil, err + } return q.forGivenIngesterSets(ctx, replicationSets, f) } diff --git a/pkg/querier/ingester_querier_test.go b/pkg/querier/ingester_querier_test.go index 5735a44a2f73f..788902c2624ed 100644 --- a/pkg/querier/ingester_querier_test.go +++ b/pkg/querier/ingester_querier_test.go @@ -304,8 +304,6 @@ func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) { } for testName, testData := range tests { - testName, testData := testName, testData - cnt := atomic.NewInt32(0) t.Run(testName, func(t *testing.T) { From 715f5bf7f2fa307021578f8e56eadf0b11d1c07d Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Wed, 9 Oct 2024 10:50:17 +0100 Subject: [PATCH 6/7] PR feedback --- pkg/loki/modules.go | 4 ---- pkg/querier/ingester_querier.go | 1 - 2 files changed, 5 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 75de7a961b402..5949bacff960e 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -322,10 +322,6 @@ func (t *Loki) initTenantConfigs() (_ services.Service, err error) { func (t *Loki) initDistributor() (services.Service, error) { t.Cfg.Distributor.KafkaConfig = t.Cfg.KafkaConfig - if t.Cfg.Distributor.KafkaEnabled && !t.Cfg.Ingester.KafkaIngestion.Enabled { - return nil, errors.New("kafka is enabled in distributor but not in ingester") - } - var err error logger := log.With(util_log.Logger, "component", "distributor") t.distributor, err = distributor.New( diff --git a/pkg/querier/ingester_querier.go b/pkg/querier/ingester_querier.go index 06c446c99cc70..cc076be1faefd 100644 --- a/pkg/querier/ingester_querier.go +++ b/pkg/querier/ingester_querier.go @@ -86,7 +86,6 @@ func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Co return nil, err } tenantShards := q.getShardCountForTenant(tenantID) - // TODO: Confirm the shuffle-shard lookback period == query-ingesters-within subring, err := q.partitionRing.ShuffleShardWithLookback(tenantID, tenantShards, q.querierConfig.QueryIngestersWithin, time.Now()) if err != nil { return nil, err From d836a661e56ba0662eef52e2b17e23bcf9d614a2 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Wed, 9 Oct 2024 10:53:19 +0100 Subject: [PATCH 7/7] Correct PR feedback --- pkg/loki/modules.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 5949bacff960e..24e19d7c58e05 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -322,6 +322,10 @@ func (t *Loki) initTenantConfigs() (_ services.Service, err error) { func (t *Loki) initDistributor() (services.Service, error) { t.Cfg.Distributor.KafkaConfig = t.Cfg.KafkaConfig + if t.Cfg.Distributor.KafkaEnabled && !t.Cfg.Ingester.KafkaIngestion.Enabled { + return nil, errors.New("kafka is enabled in distributor but not in ingester") + } + var err error logger := log.With(util_log.Logger, "component", "distributor") t.distributor, err = distributor.New( @@ -973,11 +977,6 @@ func (t *Loki) setupAsyncStore() error { func (t *Loki) initIngesterQuerier() (_ services.Service, err error) { logger := log.With(util_log.Logger, "component", "ingester-querier") - if !t.Cfg.Ingester.KafkaIngestion.Enabled { - // Disable querying partition-ingesters if Kafka ingestion is disabled - t.Cfg.Querier.QueryPartitionIngesters = false - } - t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.Querier, t.Cfg.IngesterClient, t.ring, t.partitionRing, t.Overrides.IngestionPartitionsTenantShardSize, t.Cfg.MetricsNamespace, logger) if err != nil { return nil, err