Skip to content

Commit

Permalink
pkg/querier: don't query all ingesters
Browse files Browse the repository at this point in the history
This commit uses replicationSet.Do for communicating with the ingesters
from the queriers. This mimicks the behavior of Cortex. An
extra_query_delay flag has been added to match the flag present in
Cortex's querier config.

Fixes grafana#1447.
  • Loading branch information
rfratto committed Jan 6, 2020
1 parent 4f2316f commit 94b1397
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 36 deletions.
4 changes: 4 additions & 0 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ The `querier_config` block configures the Loki Querier.
# served.
[tail_max_duration: <duration> | default = 1h]
# Time to wait before sending more than the minimum successful query
# requests.
[extra_query_delay: <duration> | default = 0s]
# Configuration options for the LogQL engine.
engine:
# Timeout for query execution
Expand Down
63 changes: 27 additions & 36 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ var readinessProbeSuccess = []byte("Ready")
type Config struct {
QueryTimeout time.Duration `yaml:"query_timeout"`
TailMaxDuration time.Duration `yaml:"tail_max_duration"`
ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"`
Engine logql.EngineOpts `yaml:"engine,omitempty"`
}

// RegisterFlags register flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.TailMaxDuration, "querier.tail-max-duration", 1*time.Hour, "Limit the duration for which live tailing request would be served")
f.DurationVar(&cfg.QueryTimeout, "querier.query_timeout", 1*time.Minute, "Timeout when querying backends (ingesters or storage) during the execution of a query request")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
}

// Querier handlers queries.
Expand Down Expand Up @@ -103,52 +105,41 @@ func (q *Querier) ReadinessHandler(w http.ResponseWriter, r *http.Request) {

// forAllIngesters runs f, in parallel, for all ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (q *Querier) forAllIngesters(f func(logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
func (q *Querier) forAllIngesters(ctx context.Context, f func(logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
replicationSet, err := q.ring.GetAll()
if err != nil {
return nil, err
}

return q.forGivenIngesters(replicationSet, f)
return q.forGivenIngesters(ctx, replicationSet, f)
}

// forGivenIngesters runs f, in parallel, for given ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (q *Querier) forGivenIngesters(replicationSet ring.ReplicationSet, f func(logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
resps, errs := make(chan responseFromIngesters), make(chan error)
for _, ingester := range replicationSet.Ingesters {
go func(ingester ring.IngesterDesc) {
client, err := q.pool.GetClientFor(ingester.Addr)
if err != nil {
errs <- err
return
}

resp, err := f(client.(logproto.QuerierClient))
if err != nil {
errs <- err
} else {
resps <- responseFromIngesters{ingester.Addr, resp}
}
}(ingester)
}
func (q *Querier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, f func(logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
results, err := replicationSet.Do(ctx, q.cfg.ExtraQueryDelay, func(ingester *ring.IngesterDesc) (interface{}, error) {
client, err := q.pool.GetClientFor(ingester.Addr)
if err != nil {
return nil, err
}

var lastErr error
result, numErrs := []responseFromIngesters{}, 0
for range replicationSet.Ingesters {
select {
case resp := <-resps:
result = append(result, resp)
case lastErr = <-errs:
numErrs++
resp, err := f(client.(logproto.QuerierClient))
if err != nil {
return nil, err
}

return responseFromIngesters{ingester.Addr, resp}, nil
})
if err != nil {
return nil, err
}

if numErrs > replicationSet.MaxErrors {
return nil, lastErr
responses := make([]responseFromIngesters, 0, len(results))
for _, result := range results {
responses = append(responses, result.(responseFromIngesters))
}

return result, nil
return responses, err
}

// Select Implements logql.Querier which select logs via matchers and regex filters.
Expand All @@ -171,7 +162,7 @@ func (q *Querier) Select(ctx context.Context, params logql.SelectParams) (iter.E
}

func (q *Querier) queryIngesters(ctx context.Context, params logql.SelectParams) ([]iter.EntryIterator, error) {
clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
clients, err := q.forAllIngesters(ctx, func(client logproto.QuerierClient) (interface{}, error) {
return client.Query(ctx, params.QueryRequest)
})
if err != nil {
Expand All @@ -191,7 +182,7 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
defer cancel()

resps, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(client logproto.QuerierClient) (interface{}, error) {
return client.Label(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -295,7 +286,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
queryCtx, cancelQuery := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
defer cancelQuery()

clients, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
clients, err := q.forAllIngesters(ctx, func(client logproto.QuerierClient) (interface{}, error) {
return client.Tail(tailCtx, req)
})
if err != nil {
Expand Down Expand Up @@ -364,7 +355,7 @@ func (q *Querier) tailDisconnectedIngesters(ctx context.Context, req *logproto.T
}

// Instance a tail client for each ingester to re(connect)
reconnectClients, err := q.forGivenIngesters(ring.ReplicationSet{Ingesters: reconnectIngesters}, func(client logproto.QuerierClient) (interface{}, error) {
reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Ingesters: reconnectIngesters}, func(client logproto.QuerierClient) (interface{}, error) {
return client.Tail(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -408,7 +399,7 @@ func (q *Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest)

go func() {
// fetch series identifiers from ingesters
resps, err := q.forAllIngesters(func(client logproto.QuerierClient) (interface{}, error) {
resps, err := q.forAllIngesters(ctx, func(client logproto.QuerierClient) (interface{}, error) {
return client.Series(ctx, req)
})
if err != nil {
Expand Down

0 comments on commit 94b1397

Please sign in to comment.