Skip to content

Commit

Permalink
Stub multi tenant querier. (#5490)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
- Introduce Queries interface and Querier API.
- Configure a multi-tenant querier that just passes requests through.

**Which issue(s) this PR fixes**:
Closes #231

**Special notes for your reviewer**:

**Checklist**
- [x] Documentation added
- [ ] Tests updated
- [ ] Add an entry in the `CHANGELOG.md` about the changes.
  • Loading branch information
jeschkies authored Mar 1, 2022
1 parent afb94b2 commit ae39676
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 57 deletions.
4 changes: 4 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ The `querier` block configures the Loki Querier.
# CLI flag: -querier.query-store-only
[query_store_only: <boolean> | default = false]
# Allow queries for multiple tenants.
# CLI flag: -querier.multi-tenant-queries-enabled
[multi_tenant_queries_enabled: <boolean> | default = false]
# Configuration options for the LogQL engine.
engine:
# Timeout for query execution
Expand Down
3 changes: 2 additions & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ type Loki struct {
TenantLimits validation.TenantLimits
distributor *distributor.Distributor
Ingester ingester.Interface
Querier *querier.Querier
Querier querier.Querier
querierAPI *querier.QuerierAPI
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
tableManager *chunk.TableManager
Expand Down
34 changes: 21 additions & 13 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,19 @@ func (t *Loki) initQuerier() (services.Service, error) {
// Querier worker's max concurrent requests must be the same as the querier setting
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent

logger := log.With(util_log.Logger, "component", "querier")
var err error
t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides)
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides)
if err != nil {
return nil, err
}

if t.Cfg.Querier.MultiTenantQueriesEnabled {
t.Querier = querier.NewMultiTenantQuerier(*q, util_log.Logger)
} else {
t.Querier = q
}

querierWorkerServiceConfig := querier.WorkerServiceConfig{
AllEnabled: t.Cfg.isModuleEnabled(All),
ReadEnabled: t.Cfg.isModuleEnabled(Read),
Expand All @@ -242,18 +249,19 @@ func (t *Loki) initQuerier() (services.Service, error) {
httpreq.ExtractQueryMetricsMiddleware(),
)

t.querierAPI = querier.NewQuerierAPI(t.Cfg.Querier, t.Querier, t.overrides, logger)
queryHandlers := map[string]http.Handler{
"/loki/api/v1/query_range": httpMiddleware.Wrap(http.HandlerFunc(t.Querier.RangeQueryHandler)),
"/loki/api/v1/query": httpMiddleware.Wrap(http.HandlerFunc(t.Querier.InstantQueryHandler)),
"/loki/api/v1/label": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/labels": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/loki/api/v1/series": http.HandlerFunc(t.Querier.SeriesHandler),
"/loki/api/v1/query_range": httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.RangeQueryHandler)),
"/loki/api/v1/query": httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.InstantQueryHandler)),
"/loki/api/v1/label": http.HandlerFunc(t.querierAPI.LabelHandler),
"/loki/api/v1/labels": http.HandlerFunc(t.querierAPI.LabelHandler),
"/loki/api/v1/label/{name}/values": http.HandlerFunc(t.querierAPI.LabelHandler),
"/loki/api/v1/series": http.HandlerFunc(t.querierAPI.SeriesHandler),

"/api/prom/query": httpMiddleware.Wrap(http.HandlerFunc(t.Querier.LogQueryHandler)),
"/api/prom/label": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/label/{name}/values": http.HandlerFunc(t.Querier.LabelHandler),
"/api/prom/series": http.HandlerFunc(t.Querier.SeriesHandler),
"/api/prom/query": httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.LogQueryHandler)),
"/api/prom/label": http.HandlerFunc(t.querierAPI.LabelHandler),
"/api/prom/label/{name}/values": http.HandlerFunc(t.querierAPI.LabelHandler),
"/api/prom/series": http.HandlerFunc(t.querierAPI.SeriesHandler),
}

// We always want to register tail routes externally, tail requests are different from normal queries, they
Expand All @@ -266,8 +274,8 @@ func (t *Loki) initQuerier() (services.Service, error) {
// we disable the proxying of the tail routes in initQueryFrontend() and we still want these routes regiestered
// on the external router.
alwaysExternalHandlers := map[string]http.Handler{
"/loki/api/v1/tail": http.HandlerFunc(t.Querier.TailHandler),
"/api/prom/tail": http.HandlerFunc(t.Querier.TailHandler),
"/loki/api/v1/tail": http.HandlerFunc(t.querierAPI.TailHandler),
"/api/prom/tail": http.HandlerFunc(t.querierAPI.TailHandler),
}

return querier.InitWorkerService(
Expand Down
41 changes: 31 additions & 10 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/websocket"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/util/marshal"
marshal_legacy "github.com/grafana/loki/pkg/util/marshal/legacy"
serverutil "github.com/grafana/loki/pkg/util/server"
"github.com/grafana/loki/pkg/validation"
)

const (
Expand All @@ -31,8 +33,27 @@ type QueryResponse struct {
Result parser.Value `json:"result"`
}

//nolint // QurierAPI defines HTTP handler functions for the querier.
type QuerierAPI struct {
querier Querier
cfg Config
limits *validation.Overrides
engine *logql.Engine
}

// NewQuerierAPI returns an instance of the QuerierAPI.
func NewQuerierAPI(cfg Config, querier Querier, limits *validation.Overrides, logger log.Logger) *QuerierAPI {
engine := logql.NewEngine(cfg.Engine, querier, limits, logger)
return &QuerierAPI{
cfg: cfg,
limits: limits,
querier: querier,
engine: engine,
}
}

// RangeQueryHandler is a http.HandlerFunc for range queries.
func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {
func (q *QuerierAPI) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {
// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
defer cancel()
Expand Down Expand Up @@ -71,7 +92,7 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {
}

// InstantQueryHandler is a http.HandlerFunc for instant queries.
func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
func (q *QuerierAPI) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
defer cancel()
Expand Down Expand Up @@ -111,7 +132,7 @@ func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
}

// LogQueryHandler is a http.HandlerFunc for log only queries.
func (q *Querier) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
func (q *QuerierAPI) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
defer cancel()
Expand Down Expand Up @@ -169,14 +190,14 @@ func (q *Querier) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
}

// LabelHandler is a http.HandlerFunc for handling label queries.
func (q *Querier) LabelHandler(w http.ResponseWriter, r *http.Request) {
func (q *QuerierAPI) LabelHandler(w http.ResponseWriter, r *http.Request) {
req, err := loghttp.ParseLabelQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

resp, err := q.Label(r.Context(), req)
resp, err := q.querier.Label(r.Context(), req)
if err != nil {
serverutil.WriteError(err, w)
return
Expand All @@ -194,7 +215,7 @@ func (q *Querier) LabelHandler(w http.ResponseWriter, r *http.Request) {
}

// TailHandler is a http.HandlerFunc for handling tail queries.
func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
Expand Down Expand Up @@ -236,7 +257,7 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {
}
}()

tailer, err := q.Tail(r.Context(), req)
tailer, err := q.querier.Tail(r.Context(), req)
if err != nil {
if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil {
level.Error(logger).Log("msg", "Error connecting to ingesters for tailing", "err", err)
Expand Down Expand Up @@ -318,14 +339,14 @@ func (q *Querier) TailHandler(w http.ResponseWriter, r *http.Request) {

// SeriesHandler returns the list of time series that match a certain label set.
// See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers
func (q *Querier) SeriesHandler(w http.ResponseWriter, r *http.Request) {
func (q *QuerierAPI) SeriesHandler(w http.ResponseWriter, r *http.Request) {
req, err := logql.ParseAndValidateSeriesQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

resp, err := q.Series(r.Context(), req)
resp, err := q.querier.Series(r.Context(), req)
if err != nil {
serverutil.WriteError(err, w)
return
Expand Down Expand Up @@ -357,7 +378,7 @@ func parseRegexQuery(httpRequest *http.Request) (string, error) {
return query, nil
}

func (q *Querier) validateEntriesLimits(ctx context.Context, query string, limit uint32) error {
func (q *QuerierAPI) validateEntriesLimits(ctx context.Context, query string, limit uint32) error {
userID, err := tenant.TenantID(ctx)
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
Expand Down
15 changes: 15 additions & 0 deletions pkg/querier/multi_tenant_querier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package querier

import (
"github.com/go-kit/log"
)

// MultiTenantQuerier is able to query across different tenants.
type MultiTenantQuerier struct {
SingleTenantQuerier
}

// NewMultiTenantQuerier returns a new querier able to query across different tenants.
func NewMultiTenantQuerier(querier SingleTenantQuerier, logger log.Logger) *MultiTenantQuerier {
return &MultiTenantQuerier{SingleTenantQuerier: querier}
}
51 changes: 26 additions & 25 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"net/http"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
Expand All @@ -20,7 +19,6 @@ import (
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/tenant"
listutil "github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
util_validation "github.com/grafana/loki/pkg/util/validation"
"github.com/grafana/loki/pkg/validation"
Expand Down Expand Up @@ -50,6 +48,7 @@ type Config struct {
MaxConcurrent int `yaml:"max_concurrent"`
QueryStoreOnly bool `yaml:"query_store_only"`
QueryIngesterOnly bool `yaml:"query_ingester_only"`
MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"`
}

// RegisterFlags register flags.
Expand All @@ -62,6 +61,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 10, "The maximum number of concurrent queries.")
f.BoolVar(&cfg.QueryStoreOnly, "querier.query-store-only", false, "Queriers should only query the store and not try to query any ingesters")
f.BoolVar(&cfg.QueryIngesterOnly, "querier.query-ingester-only", false, "Queriers should only query the ingesters and not try to query any store")
f.BoolVar(&cfg.MultiTenantQueriesEnabled, "querier.multi-tenant-queries-enabled", false, "Enable queries across multiple tenants. (Experimental)")
}

// Validate validates the config.
Expand All @@ -72,35 +72,36 @@ func (cfg *Config) Validate() error {
return nil
}

// Querier handlers queries.
type Querier struct {
// Querier can select logs and samples and handle query requests.
type Querier interface {
logql.Querier
Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error)
Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error)
Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error)
}

// SingleTenantQuerier handles single tenant queries.
type SingleTenantQuerier struct {
cfg Config
store storage.Store
engine *logql.Engine
limits *validation.Overrides
ingesterQuerier *IngesterQuerier
}

// New makes a new Querier.
func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limits *validation.Overrides) (*Querier, error) {
querier := Querier{
func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limits *validation.Overrides) (*SingleTenantQuerier, error) {
querier := SingleTenantQuerier{
cfg: cfg,
store: store,
ingesterQuerier: ingesterQuerier,
limits: limits,
}

querier.engine = logql.NewEngine(cfg.Engine, &querier, limits, log.With(util_log.Logger, "component", "querier"))

return &querier, nil
}

func (q *Querier) SetQueryable(queryable logql.Querier) {
q.engine = logql.NewEngine(q.cfg.Engine, queryable, q.limits, log.With(util_log.Logger, "component", "querier"))
}

// Select Implements logql.Querier which select logs via matchers and regex filters.
func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) {
func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) {
var err error
params.Start, params.End, err = q.validateQueryRequest(ctx, params)
if err != nil {
Expand Down Expand Up @@ -149,7 +150,7 @@ func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams)
return iter.NewMergeEntryIterator(ctx, iters, params.Direction), nil
}

func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) {
func (q *SingleTenantQuerier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) {
var err error
params.Start, params.End, err = q.validateQueryRequest(ctx, params)
if err != nil {
Expand Down Expand Up @@ -191,7 +192,7 @@ func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa
return iter.NewMergeSampleIterator(ctx, iters), nil
}

func (q *Querier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) {
func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) {
// limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries.
limitQueryInterval := false
// ingesterMLB having -1 means query ingester for whole duration.
Expand Down Expand Up @@ -267,7 +268,7 @@ func (q *Querier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval
}

// Label does the heavy lifting for a Label query.
func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -312,12 +313,12 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr
}

// Check implements the grpc healthcheck
func (*Querier) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
func (*SingleTenantQuerier) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

// Tail keeps getting matching logs from all ingesters for given query
func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) {
func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) {
err := q.checkTailRequestLimit(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -372,7 +373,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
}

// Series fetches any matching series for a list of matcher sets
func (q *Querier) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
func (q *SingleTenantQuerier) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand All @@ -389,7 +390,7 @@ func (q *Querier) Series(ctx context.Context, req *logproto.SeriesRequest) (*log
return q.awaitSeries(ctx, req)
}

func (q *Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
func (q *SingleTenantQuerier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
// buffer the channels to the # of calls they're expecting su
series := make(chan [][]logproto.SeriesIdentifier, 2)
errs := make(chan error, 2)
Expand Down Expand Up @@ -454,7 +455,7 @@ func (q *Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest)

// seriesForMatchers fetches series from the store for each matcher set
// TODO: make efficient if/when the index supports labels so we don't have to read chunks
func (q *Querier) seriesForMatchers(
func (q *SingleTenantQuerier) seriesForMatchers(
ctx context.Context,
from, through time.Time,
groups []string,
Expand Down Expand Up @@ -483,7 +484,7 @@ func (q *Querier) seriesForMatchers(
}

// seriesForMatcher fetches series from the store for a given matcher
func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string, shards []string) ([]logproto.SeriesIdentifier, error) {
func (q *SingleTenantQuerier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string, shards []string) ([]logproto.SeriesIdentifier, error) {
ids, err := q.store.GetSeries(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: matcher,
Expand All @@ -500,7 +501,7 @@ func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time,
return ids, nil
}

func (q *Querier) validateQueryRequest(ctx context.Context, req logql.QueryParams) (time.Time, time.Time, error) {
func (q *SingleTenantQuerier) validateQueryRequest(ctx context.Context, req logql.QueryParams) (time.Time, time.Time, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return time.Time{}, time.Time{}, err
Expand Down Expand Up @@ -548,7 +549,7 @@ func validateQueryTimeRangeLimits(ctx context.Context, userID string, limits tim
return from, through, nil
}

func (q *Querier) checkTailRequestLimit(ctx context.Context) error {
func (q *SingleTenantQuerier) checkTailRequestLimit(ctx context.Context) error {
userID, err := tenant.TenantID(ctx)
if err != nil {
return err
Expand Down
Loading

0 comments on commit ae39676

Please sign in to comment.