Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): Enable querier to optionally query partition ingesters #14418

Merged
merged 7 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ run:
- cgo
- promtail_journal_enabled
- integration

# output configuration options
output:
formats:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.3
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf
github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1044,8 +1044,8 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf h1:ZafqZwIpdCCMifH9Ok6C98rYaCh5OZeyyHLbU0FPedg=
github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41 h1:a4O59OU3FJZ+EJUVnlvvNTvdAc4uRN1P6EaGwqL9CnA=
github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func (t *Loki) setupModuleManager() error {
PatternRingClient: {Server, MemberlistKV, Analytics},
PatternIngesterTee: {Server, MemberlistKV, Analytics, PatternRingClient},
PatternIngester: {Server, MemberlistKV, Analytics, PatternRingClient, PatternIngesterTee},
IngesterQuerier: {Ring},
IngesterQuerier: {Ring, PartitionRing, Overrides},
QuerySchedulerRing: {Overrides, MemberlistKV},
IndexGatewayRing: {Overrides, MemberlistKV},
PartitionRing: {MemberlistKV, Server, Ring},
Expand Down
12 changes: 9 additions & 3 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,8 +975,14 @@ func (t *Loki) setupAsyncStore() error {
}

func (t *Loki) initIngesterQuerier() (_ services.Service, err error) {
logger := log.With(util_log.Logger, "component", "querier")
t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay, t.Cfg.MetricsNamespace, logger)
logger := log.With(util_log.Logger, "component", "ingester-querier")

if !t.Cfg.Ingester.KafkaIngestion.Enabled {
// Disable querying partition-ingesters if Kafka ingestion is disabled
t.Cfg.Querier.QueryPartitionIngesters = false
benclive marked this conversation as resolved.
Show resolved Hide resolved
}

t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.Querier, t.Cfg.IngesterClient, t.ring, t.partitionRing, t.Overrides.IngestionPartitionsTenantShardSize, t.Cfg.MetricsNamespace, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1756,7 +1762,7 @@ func (t *Loki) initAnalytics() (services.Service, error) {

// The Ingest Partition Ring is responsible for watching the available ingesters and assigning partitions to incoming requests.
func (t *Loki) initPartitionRing() (services.Service, error) {
if !t.Cfg.Ingester.KafkaIngestion.Enabled {
if !t.Cfg.Ingester.KafkaIngestion.Enabled && !t.Cfg.Querier.QueryPartitionIngesters {
return nil, nil
}

Expand Down
73 changes: 54 additions & 19 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/user"
"golang.org/x/exp/slices"

"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"
Expand Down Expand Up @@ -40,28 +42,32 @@

// IngesterQuerier helps with querying the ingesters.
type IngesterQuerier struct {
ring ring.ReadRing
pool *ring_client.Pool
extraQueryDelay time.Duration
logger log.Logger
querierConfig Config
ring ring.ReadRing
partitionRing *ring.PartitionInstanceRing
getShardCountForTenant func(string) int
pool *ring_client.Pool
logger log.Logger
}

func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
func NewIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring.ReadRing, partitionRing *ring.PartitionInstanceRing, getShardCountForTenant func(string) int, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
factory := func(addr string) (ring_client.PoolClient, error) {
return client.New(clientCfg, addr)
}

return newIngesterQuerier(clientCfg, ring, extraQueryDelay, ring_client.PoolAddrFunc(factory), metricsNamespace, logger)
return newIngesterQuerier(querierConfig, clientCfg, ring, partitionRing, getShardCountForTenant, ring_client.PoolAddrFunc(factory), metricsNamespace, logger)
}

// newIngesterQuerier creates a new IngesterQuerier and allows to pass a custom ingester client factory
// used for testing purposes
func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
func newIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring.ReadRing, partitionRing *ring.PartitionInstanceRing, getShardCountForTenant func(string) int, clientFactory ring_client.PoolFactory, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
iq := IngesterQuerier{
ring: ring,
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ring, clientFactory, util_log.Logger, metricsNamespace),
extraQueryDelay: extraQueryDelay,
logger: logger,
querierConfig: querierConfig,
ring: ring,
partitionRing: partitionRing,
getShardCountForTenant: getShardCountForTenant, // limits?
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ring, clientFactory, util_log.Logger, metricsNamespace),
logger: logger,
}

err := services.StartAndAwaitRunning(context.Background(), iq.pool)
Expand All @@ -73,22 +79,51 @@
}

// forAllIngesters runs f, in parallel, for all ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
if q.querierConfig.QueryPartitionIngesters {
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
tenantShards := q.getShardCountForTenant(tenantID)
// TODO: Confirm the shuffle-shard lookback period == query-ingesters-within
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find an answer to this: is the lookback period the same as the query-ingesters-within parameter or something else?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is query-ingesters-within can we get confirmation from @pracucci ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shuffle sharding lookback period in Mimir is configured here:
https://github.com/grafana/mimir/blob/ab3e627b1da134f33c06dd09ee4402a2eaad68f1/pkg/mimir/modules.go#L448-L450

It's set equal to TSDB retention, but that's a recent change. In the past it was set to the query-ingesters-within parameter. The reason why we changed to TSDB retention is because now we let to customize the query-ingesters-within parameter on a per-tenant basis (via the live-reloaded runtime config) and so it's not the same setting for the entire cluster. The worst case scenario we want to cover is the TSDB retention period in ingesters (which is also set to 13h).

I think for your use case setting it to query-ingesters-within parameter is fine.

subring, err := q.partitionRing.ShuffleShardWithLookback(tenantID, tenantShards, q.querierConfig.QueryIngestersWithin, time.Now())
if err != nil {
return nil, err
}
replicationSets, err := subring.GetReplicationSetsForOperation(ring.Read)

Check failure on line 94 in pkg/querier/ingester_querier.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

ineffectual assignment to err (ineffassign)
benclive marked this conversation as resolved.
Show resolved Hide resolved
return q.forGivenIngesterSets(ctx, replicationSets, f)
}

replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
if err != nil {
return nil, err
}

return q.forGivenIngesters(ctx, replicationSet, f)
return q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig(), f)
}

// forGivenIngesters runs f, in parallel, for given ingesters
func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
cfg := ring.DoUntilQuorumConfig{
// forGivenIngesterSets runs f, in parallel, for given ingester sets
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
// Enable minimize requests so we initially query a single ingester per replication set, as each replication-set is one partition.
// Ingesters must supply zone information for this to have an effect.
config := ring.DoUntilQuorumConfig{
MinimizeRequests: true,
}
return concurrency.ForEachJobMergeResults[ring.ReplicationSet, responseFromIngesters](ctx, replicationSet, 0, func(ctx context.Context, set ring.ReplicationSet) ([]responseFromIngesters, error) {
return q.forGivenIngesters(ctx, set, config, f)
})
}

func defaultQuorumConfig() ring.DoUntilQuorumConfig {
return ring.DoUntilQuorumConfig{
// Nothing here
}
results, err := ring.DoUntilQuorum(ctx, replicationSet, cfg, func(ctx context.Context, ingester *ring.InstanceDesc) (responseFromIngesters, error) {
}

// forGivenIngesters runs f, in parallel, for given ingesters
func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, quorumConfig ring.DoUntilQuorumConfig, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
results, err := ring.DoUntilQuorum(ctx, replicationSet, quorumConfig, func(ctx context.Context, ingester *ring.InstanceDesc) (responseFromIngesters, error) {
client, err := q.pool.GetClientFor(ingester.Addr)
if err != nil {
return responseFromIngesters{addr: ingester.Addr}, err
Expand Down Expand Up @@ -212,7 +247,7 @@
}

// Instance a tail client for each ingester to re(connect)
reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Instances: reconnectIngesters}, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Instances: reconnectIngesters}, defaultQuorumConfig(), func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Tail(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -260,7 +295,7 @@
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found")
}

responses, err := q.forGivenIngesters(ctx, replicationSet, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
responses, err := q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig(), func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
resp, err := querierClient.TailersCount(ctx, &logproto.TailersCountRequest{})
if err != nil {
return nil, err
Expand Down
143 changes: 142 additions & 1 deletion pkg/querier/ingester_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"go.uber.org/atomic"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -224,6 +225,131 @@
}
}

func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) {
t.Parallel()
ctx := user.InjectOrgID(context.Background(), "test-user")
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

ingesters := []ring.InstanceDesc{
mockInstanceDescWithZone("1.1.1.1", ring.ACTIVE, "A"),
mockInstanceDescWithZone("2.2.2.2", ring.ACTIVE, "B"),
mockInstanceDescWithZone("3.3.3.3", ring.ACTIVE, "A"),
mockInstanceDescWithZone("4.4.4.4", ring.ACTIVE, "B"),
mockInstanceDescWithZone("5.5.5.5", ring.ACTIVE, "A"),
mockInstanceDescWithZone("6.6.6.6", ring.ACTIVE, "B"),
}

tests := map[string]struct {
method string
testFn func(*IngesterQuerier) error
retVal interface{}
shards int
}{
"label": {
method: "Label",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.Label(ctx, nil)
return err
},
retVal: new(logproto.LabelResponse),
},
"series": {
method: "Series",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.Series(ctx, nil)
return err
},
retVal: new(logproto.SeriesResponse),
},
"get_chunk_ids": {
method: "GetChunkIDs",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.GetChunkIDs(ctx, model.Time(0), model.Time(0))
return err
},
retVal: new(logproto.GetChunkIDsResponse),
},
"select_logs": {
method: "Query",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.SelectLogs(ctx, logql.SelectLogParams{
QueryRequest: new(logproto.QueryRequest),
})
return err
},
retVal: newQueryClientMock(),
},
"select_sample": {
method: "QuerySample",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.SelectSample(ctx, logql.SelectSampleParams{
SampleQueryRequest: new(logproto.SampleQueryRequest),
})
return err
},
retVal: newQuerySampleClientMock(),
},
"select_logs_shuffle_sharded": {
method: "Query",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.SelectLogs(ctx, logql.SelectLogParams{
QueryRequest: new(logproto.QueryRequest),
})
return err
},
retVal: newQueryClientMock(),
shards: 2, // Must be less than number of partitions
},
}

for testName, testData := range tests {
testName, testData := testName, testData

Check failure on line 307 in pkg/querier/ingester_querier_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

The copy of the 'for' variable "testName" can be deleted (Go 1.22+) (copyloopvar)

cnt := atomic.NewInt32(0)

t.Run(testName, func(t *testing.T) {
cnt.Store(0)
runFn := func(args mock.Arguments) {
ctx := args[0].(context.Context)

select {
case <-ctx.Done():
// should not be cancelled by the tracker
require.NoError(t, ctx.Err())
default:
cnt.Add(1)
}
}

instanceRing := newReadRingMock(ingesters, 0)
ingesterClient := newQuerierClientMock()
ingesterClient.On(testData.method, mock.Anything, mock.Anything, mock.Anything).Return(testData.retVal, nil).Run(runFn)

partitions := 3
ingestersPerPartition := len(ingesters) / partitions
assert.Greaterf(t, ingestersPerPartition, 1, "must have more than one ingester per partition")

ingesterQuerier, err := newTestPartitionIngesterQuerier(ingesterClient, instanceRing, newPartitionInstanceRingMock(instanceRing, ingesters, partitions, ingestersPerPartition), testData.shards)
require.NoError(t, err)

ingesterQuerier.querierConfig.QueryPartitionIngesters = true

err = testData.testFn(ingesterQuerier)
require.NoError(t, err)

if testData.shards == 0 {
testData.shards = partitions
}
expectedCalls := min(testData.shards, partitions)
// Wait for responses: We expect one request per queried partition because we have request minimization enabled & ingesters are in multiple zones.
// If shuffle sharding is enabled, we expect one query per shard as we write to a subset of partitions.
require.Eventually(t, func() bool { return cnt.Load() >= int32(expectedCalls) }, time.Millisecond*100, time.Millisecond*1, "expected all ingesters to respond")
ingesterClient.AssertNumberOfCalls(t, testData.method, expectedCalls)
})
}
}

func TestQuerier_tailDisconnectedIngesters(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -400,9 +526,24 @@

func newTestIngesterQuerier(readRingMock *readRingMock, ingesterClient *querierClientMock) (*IngesterQuerier, error) {
return newIngesterQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
readRingMock,
mockQuerierConfig().ExtraQueryDelay,
nil,
func(string) int { return 0 },
newIngesterClientMockFactory(ingesterClient),
constants.Loki,
log.NewNopLogger(),
)
}

func newTestPartitionIngesterQuerier(ingesterClient *querierClientMock, instanceRing *readRingMock, partitionRing *ring.PartitionInstanceRing, tenantShards int) (*IngesterQuerier, error) {
return newIngesterQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
instanceRing,
partitionRing,
func(string) int { return tenantShards },
newIngesterClientMockFactory(ingesterClient),
constants.Loki,
log.NewNopLogger(),
Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Config struct {
QueryIngesterOnly bool `yaml:"query_ingester_only"`
MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"`
PerRequestLimitsEnabled bool `yaml:"per_request_limits_enabled"`
QueryPartitionIngesters bool `yaml:"query_partition_ingesters" category:"experimental"`
}

// RegisterFlags register flags.
Expand All @@ -85,6 +86,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.QueryIngesterOnly, "querier.query-ingester-only", false, "When true, queriers only query the ingesters, and not stored data. This is useful when the object store is unavailable.")
f.BoolVar(&cfg.MultiTenantQueriesEnabled, "querier.multi-tenant-queries-enabled", false, "When true, allow queries to span multiple tenants.")
f.BoolVar(&cfg.PerRequestLimitsEnabled, "querier.per-request-limits-enabled", false, "When true, querier limits sent via a header are enforced.")
f.BoolVar(&cfg.QueryPartitionIngesters, "querier.query_partition_ingesters", false, "When true, querier directs ingester queries to the partition-ingesters instead of the normal ingesters.")
}

// Validate validates the config.
Expand Down
Loading
Loading