Skip to content

Commit

Permalink
feat(kafka): Enable querier to use partition ingesters
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive committed Oct 8, 2024
1 parent 39b57ec commit f316cdb
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 32 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.3
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
github.com/grafana/dskit v0.0.0-20241002104024-b69ac1b95024
github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,8 @@ github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
github.com/grafana/dskit v0.0.0-20241002104024-b69ac1b95024 h1:RflXv1xMlnqP0zr1apM3lJ0BK5SwvEOGLv980V+oGJ0=
github.com/grafana/dskit v0.0.0-20241002104024-b69ac1b95024/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41 h1:a4O59OU3FJZ+EJUVnlvvNTvdAc4uRN1P6EaGwqL9CnA=
github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func (t *Loki) setupModuleManager() error {
PatternRingClient: {Server, MemberlistKV, Analytics},
PatternIngesterTee: {Server, MemberlistKV, Analytics, PatternRingClient},
PatternIngester: {Server, MemberlistKV, Analytics, PatternRingClient, PatternIngesterTee},
IngesterQuerier: {Ring},
IngesterQuerier: {Ring, PartitionRing, Overrides},
QuerySchedulerRing: {Overrides, MemberlistKV},
IndexGatewayRing: {Overrides, MemberlistKV},
PartitionRing: {MemberlistKV, Server, Ring},
Expand Down
12 changes: 9 additions & 3 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,8 +971,14 @@ func (t *Loki) setupAsyncStore() error {
}

func (t *Loki) initIngesterQuerier() (_ services.Service, err error) {
logger := log.With(util_log.Logger, "component", "querier")
t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay, t.Cfg.MetricsNamespace, logger)
logger := log.With(util_log.Logger, "component", "ingester-querier")

if !t.Cfg.Ingester.KafkaIngestion.Enabled {
// Disable querying partition-ingesters if Kafka ingestion is disabled
t.Cfg.Querier.QueryPartitionIngesters = false
}

t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.Querier, t.Cfg.IngesterClient, t.ring, t.partitionRing, t.Overrides.IngestionPartitionsTenantShardSize, t.Cfg.MetricsNamespace, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1754,7 +1760,7 @@ func (t *Loki) initAnalytics() (services.Service, error) {

// The Ingest Partition Ring is responsible for watching the available ingesters and assigning partitions to incoming requests.
func (t *Loki) initPartitionRing() (services.Service, error) {
if !t.Cfg.Ingester.KafkaIngestion.Enabled {
if !t.Cfg.Ingester.KafkaIngestion.Enabled && !t.Cfg.Querier.QueryPartitionIngesters {
return nil, nil
}

Expand Down
73 changes: 54 additions & 19 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/user"
"golang.org/x/exp/slices"

"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"
Expand Down Expand Up @@ -40,28 +42,32 @@ type responseFromIngesters struct {

// IngesterQuerier helps with querying the ingesters.
type IngesterQuerier struct {
ring ring.ReadRing
pool *ring_client.Pool
extraQueryDelay time.Duration
logger log.Logger
querierConfig Config
ring ring.ReadRing
partitionRing *ring.PartitionInstanceRing
getShardCountForTenant func(string) int
pool *ring_client.Pool
logger log.Logger
}

func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
func NewIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring.ReadRing, partitionRing *ring.PartitionInstanceRing, getShardCountForTenant func(string) int, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
factory := func(addr string) (ring_client.PoolClient, error) {
return client.New(clientCfg, addr)
}

return newIngesterQuerier(clientCfg, ring, extraQueryDelay, ring_client.PoolAddrFunc(factory), metricsNamespace, logger)
return newIngesterQuerier(querierConfig, clientCfg, ring, partitionRing, getShardCountForTenant, ring_client.PoolAddrFunc(factory), metricsNamespace, logger)
}

// newIngesterQuerier creates a new IngesterQuerier and allows to pass a custom ingester client factory
// used for testing purposes
func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
func newIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring.ReadRing, partitionRing *ring.PartitionInstanceRing, getShardCountForTenant func(string) int, clientFactory ring_client.PoolFactory, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
iq := IngesterQuerier{
ring: ring,
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ring, clientFactory, util_log.Logger, metricsNamespace),
extraQueryDelay: extraQueryDelay,
logger: logger,
querierConfig: querierConfig,
ring: ring,
partitionRing: partitionRing,
getShardCountForTenant: getShardCountForTenant, // limits?
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ring, clientFactory, util_log.Logger, metricsNamespace),
logger: logger,
}

err := services.StartAndAwaitRunning(context.Background(), iq.pool)
Expand All @@ -73,22 +79,51 @@ func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryD
}

// forAllIngesters runs f, in parallel, for all ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
if q.querierConfig.QueryPartitionIngesters {
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
tenantShards := q.getShardCountForTenant(tenantID)
// TODO: Confirm the shuffle-shard lookback period == query-ingesters-within
subring, err := q.partitionRing.ShuffleShardWithLookback(tenantID, tenantShards, q.querierConfig.QueryIngestersWithin, time.Now())
if err != nil {
return nil, err
}
replicationSets, err := subring.GetReplicationSetsForOperation(ring.Read)
return q.forGivenIngesterSets(ctx, replicationSets, f)
}

replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
if err != nil {
return nil, err
}

return q.forGivenIngesters(ctx, replicationSet, f)
return q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig(), f)
}

// forGivenIngesters runs f, in parallel, for given ingesters
func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
cfg := ring.DoUntilQuorumConfig{
// forGivenIngesterSets runs f, in parallel, for given ingester sets
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
// Enable minimize requests so we initially query a single ingester per replication set, as each replication-set is one partition.
// Ingesters must supply zone information for this to have an effect.
config := ring.DoUntilQuorumConfig{
MinimizeRequests: true,
}
return concurrency.ForEachJobMergeResults[ring.ReplicationSet, responseFromIngesters](ctx, replicationSet, 0, func(ctx context.Context, set ring.ReplicationSet) ([]responseFromIngesters, error) {
return q.forGivenIngesters(ctx, set, config, f)
})
}

func defaultQuorumConfig() ring.DoUntilQuorumConfig {
return ring.DoUntilQuorumConfig{
// Nothing here
}
results, err := ring.DoUntilQuorum(ctx, replicationSet, cfg, func(ctx context.Context, ingester *ring.InstanceDesc) (responseFromIngesters, error) {
}

// forGivenIngesters runs f, in parallel, for given ingesters
func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, quorumConfig ring.DoUntilQuorumConfig, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
results, err := ring.DoUntilQuorum(ctx, replicationSet, quorumConfig, func(ctx context.Context, ingester *ring.InstanceDesc) (responseFromIngesters, error) {
client, err := q.pool.GetClientFor(ingester.Addr)
if err != nil {
return responseFromIngesters{addr: ingester.Addr}, err
Expand Down Expand Up @@ -212,7 +247,7 @@ func (q *IngesterQuerier) TailDisconnectedIngesters(ctx context.Context, req *lo
}

// Instance a tail client for each ingester to re(connect)
reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Instances: reconnectIngesters}, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Instances: reconnectIngesters}, defaultQuorumConfig(), func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Tail(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -260,7 +295,7 @@ func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found")
}

responses, err := q.forGivenIngesters(ctx, replicationSet, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
responses, err := q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig(), func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
resp, err := querierClient.TailersCount(ctx, &logproto.TailersCountRequest{})
if err != nil {
return nil, err
Expand Down
143 changes: 142 additions & 1 deletion pkg/querier/ingester_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"go.uber.org/atomic"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -226,6 +227,131 @@ func TestIngesterQuerier_earlyExitOnQuorum(t *testing.T) {
}
}

func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) {
t.Parallel()
ctx := user.InjectOrgID(context.Background(), "test-user")
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

ingesters := []ring.InstanceDesc{
mockInstanceDescWithZone("1.1.1.1", ring.ACTIVE, "A"),
mockInstanceDescWithZone("2.2.2.2", ring.ACTIVE, "B"),
mockInstanceDescWithZone("3.3.3.3", ring.ACTIVE, "A"),
mockInstanceDescWithZone("4.4.4.4", ring.ACTIVE, "B"),
mockInstanceDescWithZone("5.5.5.5", ring.ACTIVE, "A"),
mockInstanceDescWithZone("6.6.6.6", ring.ACTIVE, "B"),
}

tests := map[string]struct {
method string
testFn func(*IngesterQuerier) error
retVal interface{}
shards int
}{
"label": {
method: "Label",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.Label(ctx, nil)
return err
},
retVal: new(logproto.LabelResponse),
},
"series": {
method: "Series",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.Series(ctx, nil)
return err
},
retVal: new(logproto.SeriesResponse),
},
"get_chunk_ids": {
method: "GetChunkIDs",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.GetChunkIDs(ctx, model.Time(0), model.Time(0))
return err
},
retVal: new(logproto.GetChunkIDsResponse),
},
"select_logs": {
method: "Query",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.SelectLogs(ctx, logql.SelectLogParams{
QueryRequest: new(logproto.QueryRequest),
})
return err
},
retVal: newQueryClientMock(),
},
"select_sample": {
method: "QuerySample",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.SelectSample(ctx, logql.SelectSampleParams{
SampleQueryRequest: new(logproto.SampleQueryRequest),
})
return err
},
retVal: newQuerySampleClientMock(),
},
"select_logs_shuffle_sharded": {
method: "Query",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.SelectLogs(ctx, logql.SelectLogParams{
QueryRequest: new(logproto.QueryRequest),
})
return err
},
retVal: newQueryClientMock(),
shards: 2, // Must be less than number of partitions
},
}

for testName, testData := range tests {
testName, testData := testName, testData

cnt := atomic.NewInt32(0)

t.Run(testName, func(t *testing.T) {
cnt.Store(0)
runFn := func(args mock.Arguments) {
ctx := args[0].(context.Context)

select {
case <-ctx.Done():
// should not be cancelled by the tracker
require.NoError(t, ctx.Err())
default:
cnt.Add(1)
}
}

instanceRing := newReadRingMock(ingesters, 0)
ingesterClient := newQuerierClientMock()
ingesterClient.On(testData.method, mock.Anything, mock.Anything, mock.Anything).Return(testData.retVal, nil).Run(runFn)

partitions := 3
ingestersPerPartition := len(ingesters) / partitions
assert.Greaterf(t, ingestersPerPartition, 1, "must have more than one ingester per partition")

ingesterQuerier, err := newTestPartitionIngesterQuerier(ingesterClient, instanceRing, newPartitionInstanceRingMock(instanceRing, ingesters, partitions, ingestersPerPartition), testData.shards)
require.NoError(t, err)

ingesterQuerier.querierConfig.QueryPartitionIngesters = true

err = testData.testFn(ingesterQuerier)
require.NoError(t, err)

if testData.shards == 0 {
testData.shards = partitions
}
expectedCalls := min(testData.shards, partitions)
// Wait for responses: We expect one request per queried partition because we have request minimization enabled & ingesters are in multiple zones.
// If shuffle sharding is enabled, we expect one query per shard as we write to a subset of partitions.
require.Eventually(t, func() bool { return cnt.Load() >= int32(expectedCalls) }, time.Millisecond*100, time.Millisecond*1, "expected all ingesters to respond")
ingesterClient.AssertNumberOfCalls(t, testData.method, expectedCalls)
})
}
}

func TestQuerier_tailDisconnectedIngesters(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -404,9 +530,24 @@ func TestIngesterQuerier_DetectedLabels(t *testing.T) {

func newTestIngesterQuerier(readRingMock *readRingMock, ingesterClient *querierClientMock) (*IngesterQuerier, error) {
return newIngesterQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
readRingMock,
mockQuerierConfig().ExtraQueryDelay,
nil,
func(string) int { return 0 },
newIngesterClientMockFactory(ingesterClient),
constants.Loki,
log.NewNopLogger(),
)
}

func newTestPartitionIngesterQuerier(ingesterClient *querierClientMock, instanceRing *readRingMock, partitionRing *ring.PartitionInstanceRing, tenantShards int) (*IngesterQuerier, error) {
return newIngesterQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
instanceRing,
partitionRing,
func(string) int { return tenantShards },
newIngesterClientMockFactory(ingesterClient),
constants.Loki,
log.NewNopLogger(),
Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Config struct {
QueryIngesterOnly bool `yaml:"query_ingester_only"`
MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"`
PerRequestLimitsEnabled bool `yaml:"per_request_limits_enabled"`
QueryPartitionIngesters bool `yaml:"query_partition_ingesters" category:"experimental"`
}

// RegisterFlags register flags.
Expand All @@ -85,6 +86,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.QueryIngesterOnly, "querier.query-ingester-only", false, "When true, queriers only query the ingesters, and not stored data. This is useful when the object store is unavailable.")
f.BoolVar(&cfg.MultiTenantQueriesEnabled, "querier.multi-tenant-queries-enabled", false, "When true, allow queries to span multiple tenants.")
f.BoolVar(&cfg.PerRequestLimitsEnabled, "querier.per-request-limits-enabled", false, "When true, querier limits sent via a header are enforced.")
f.BoolVar(&cfg.QueryPartitionIngesters, "querier.query_partition_ingesters", false, "When true, querier directs ingester queries to the partition-ingesters instead of the normal ingesters.")
}

// Validate validates the config.
Expand Down
Loading

0 comments on commit f316cdb

Please sign in to comment.