Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/querysharding ii #1927

Merged
merged 74 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from 65 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
f00ec35
[wip] sharding evaluator/ast
owen-d Feb 6, 2020
406ce2b
[wip] continues experimenting with ast mapping
owen-d Feb 6, 2020
d58aad6
refactoring in preparation for binops
owen-d Feb 14, 2020
a2dca17
evaluators can pass state to other evaluators
owen-d Mar 19, 2020
161016c
compiler alignment
owen-d Mar 19, 2020
f6f47f7
Evaluator method renamed to StepEvaluator
owen-d Mar 19, 2020
3147f1e
chained evaluator impl
owen-d Mar 19, 2020
66efd8c
tidying up sharding code
owen-d Mar 20, 2020
738dd17
handling for ConcatSampleExpr
owen-d Mar 22, 2020
16d7016
downstream iterator
owen-d Mar 22, 2020
67385fc
structure for downstreaming asts
owen-d Mar 22, 2020
1271dfc
outlines sharding optimizations
owen-d Mar 22, 2020
84fe72c
work on sharding mapper
owen-d Mar 23, 2020
1c0d630
ast sharding optimizations
owen-d Mar 23, 2020
654618f
test for different logrange positions
owen-d Mar 23, 2020
0f67155
shard mapper tests
owen-d Mar 23, 2020
648cd4c
stronger ast sharding & tests
owen-d Mar 23, 2020
101ef57
shardmapper tests for string->string
owen-d Mar 24, 2020
55d41b9
removes sharding evaluator code
owen-d Mar 24, 2020
3bf5df1
removes unused ctx arg
owen-d Mar 24, 2020
951f035
Revert "removes sharding evaluator code"
owen-d Mar 24, 2020
47ac2a4
interfaces for downstreaming, type conversions
owen-d Mar 25, 2020
7498d6f
sharding plumbing on frontend
owen-d Mar 27, 2020
4a76119
Merge remote-tracking branch 'upstream/master' into feature/queryshar…
owen-d Mar 27, 2020
170d2c7
type alignment in queryrange to downstream sharded queriers
owen-d Mar 28, 2020
0240587
downstreaming support for sharding incl storage code
owen-d Mar 28, 2020
b1c8faf
removes chainedevaluator
owen-d Apr 8, 2020
2af43ff
comment alignment
owen-d Apr 8, 2020
82a0d5c
Merge remote-tracking branch 'upstream/master' into feature/queryshar…
owen-d Apr 8, 2020
f87553b
storage shard injection
owen-d Apr 8, 2020
66b217c
speccing out testware for sharding equivalence
owen-d Apr 9, 2020
7fffd35
[wip] shared engine refactor
owen-d Apr 9, 2020
1dcd308
sorting streams, sharding eval fixes
owen-d Apr 9, 2020
90dcc17
downstream evaluator embeds defaultevaluator
owen-d Apr 9, 2020
1793dce
other pkgs adopt logql changes
owen-d Apr 9, 2020
03ae942
metrics & logs use same middleware instantiation process
owen-d Apr 10, 2020
b43d995
wires up shardingware
owen-d Apr 10, 2020
78de015
middleware per metrics/logfilter
owen-d Apr 10, 2020
9b14115
Merge remote-tracking branch 'upstream/master' into feature/queryshar…
owen-d Apr 15, 2020
16881d3
empty step populating StepEvaluator promql.Matrix adapter
owen-d Apr 15, 2020
6219b8a
sharding metrics
owen-d Apr 16, 2020
4996c3a
log/span injection into sharded engine
owen-d Apr 16, 2020
4188116
sharding metrics avoids multiple instantiation
owen-d Apr 16, 2020
77d09d1
downstreamhandler tracing
owen-d Apr 16, 2020
c0897f8
sharding parameterized libsonnet
owen-d Apr 17, 2020
6629775
Merge remote-tracking branch 'upstream/master' into feature/queryshar…
owen-d Apr 20, 2020
430af96
removes querier replicas
owen-d Apr 20, 2020
539e432
default 32 concurrency for workers
owen-d Apr 21, 2020
54334d8
jsonnet correct level override
owen-d Apr 21, 2020
e07c2d8
unquote true in yaml
owen-d Apr 21, 2020
f0361c5
Merge remote-tracking branch 'upstream/master' into feature/queryshar…
owen-d Apr 21, 2020
5ae035e
lowercase error + downstreamEvaluator defaults to embedded defaultEva…
owen-d Apr 23, 2020
1d6e513
makes shardRecorder private
owen-d Apr 23, 2020
42438e6
logs query on failed parse
owen-d Apr 23, 2020
2fa2900
refactors engine to be multi-use, minimizes logger injection, general…
owen-d Apr 23, 2020
9b82ebc
basic tests for querysharding mware
owen-d Apr 24, 2020
e8ff7c3
[wip] concurrent evaluator
owen-d May 2, 2020
192b018
Merge remote-tracking branch 'upstream/master' into feature/queryshar…
owen-d May 12, 2020
2dc37ee
integrates stat propagation into sharding evaluator
owen-d May 14, 2020
5dc90a2
splitby histogram
owen-d May 14, 2020
76423b6
extends le bounds for bytes processed
owen-d May 14, 2020
34b045a
byte throughput histogram buckets to 40gb
owen-d May 18, 2020
e9b6f69
chunk duration mixin
owen-d May 18, 2020
cac5f22
Merge remote-tracking branch 'upstream/master' into feature/queryshar…
owen-d May 18, 2020
404487d
fixes merge w/ field rename
owen-d May 18, 2020
15a7461
derives logger in sharded engine via ctx & logs some downstream evalu…
owen-d May 22, 2020
fd33919
moves sharded engine to top, adds comments
owen-d May 22, 2020
e8c9e66
logs failed merge results in stats ctx
owen-d May 22, 2020
ab3e7aa
snapshotting stats merge logic is done more effectively
owen-d May 22, 2020
9547ec2
per query concurrency controlled via downstreamer
owen-d May 22, 2020
b13abf9
unexports decodereq
owen-d May 22, 2020
8f3471b
queryrange testware
owen-d May 22, 2020
df85eeb
downstreamer tests
owen-d May 22, 2020
388c547
pr requests
owen-d May 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,29 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string

eng := logql.NewEngine(conf.Querier.Engine, querier)
var query logql.Query

if q.isInstant() {
query = eng.NewInstantQuery(q.QueryString, q.Start, q.resultsDirection(), uint32(q.Limit))
query = eng.Query(logql.NewLiteralParams(
q.QueryString,
q.Start,
q.Start,
0,
0,
q.resultsDirection(),
uint32(q.Limit),
nil,
))
} else {
query = eng.NewRangeQuery(q.QueryString, q.Start, q.End, q.Step, q.Interval, q.resultsDirection(), uint32(q.Limit))
query = eng.Query(logql.NewLiteralParams(
q.QueryString,
q.Start,
q.End,
q.Step,
q.Interval,
q.resultsDirection(),
uint32(q.Limit),
nil,
))
}

// execute the query
Expand Down
4 changes: 4 additions & 0 deletions pkg/loghttp/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func direction(r *http.Request) (logproto.Direction, error) {
return parseDirection(r.Form.Get("direction"), logproto.BACKWARD)
}

func shards(r *http.Request) []string {
return r.Form["shards"]
}

func bounds(r *http.Request) (time.Time, time.Time, error) {
now := time.Now()
start, err := parseTimestamp(r.Form.Get("start"), now.Add(-defaultSince))
Expand Down
3 changes: 3 additions & 0 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ type RangeQuery struct {
Query string
Direction logproto.Direction
Limit uint32
Shards []string
}

// ParseRangeQuery parses a RangeQuery request from an http request.
Expand Down Expand Up @@ -280,6 +281,8 @@ func ParseRangeQuery(r *http.Request) (*RangeQuery, error) {
return nil, errNegativeStep
}

result.Shards = shards(r)

// For safety, limit the number of returned points per timeseries.
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
if (result.End.Sub(result.Start) / result.Step) > 11000 {
Expand Down
7 changes: 7 additions & 0 deletions pkg/logproto/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logproto

import "github.com/prometheus/prometheus/pkg/labels"

// Note, this is not very efficient and use should be minimized as it requires label construction on each comparison
type SeriesIdentifiers []SeriesIdentifier

func (ids SeriesIdentifiers) Len() int { return len(ids) }
Expand All @@ -10,3 +11,9 @@ func (ids SeriesIdentifiers) Less(i, j int) bool {
a, b := labels.FromMap(ids[i].Labels), labels.FromMap(ids[j].Labels)
return labels.Compare(a, b) <= 0
}

type Streams []Stream

func (xs Streams) Len() int { return len(xs) }
func (xs Streams) Swap(i, j int) { xs[i], xs[j] = xs[j], xs[i] }
func (xs Streams) Less(i, j int) bool { return xs[i].Labels <= xs[j].Labels }
221 changes: 147 additions & 74 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ message QueryRequest {
google.protobuf.Timestamp end = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
Direction direction = 5;
reserved 6;
repeated string shards = 7 [(gogoproto.jsontag) = "shards,omitempty"];

}

enum Direction {
Expand Down
26 changes: 0 additions & 26 deletions pkg/logql/astmapper.go

This file was deleted.

138 changes: 52 additions & 86 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logql

import (
"context"
"errors"
"sort"
"time"

Expand Down Expand Up @@ -35,6 +36,12 @@ const ValueTypeStreams = "streams"
// Streams is promql.Value
type Streams []logproto.Stream

func (streams Streams) Len() int { return len(streams) }
func (streams Streams) Swap(i, j int) { streams[i], streams[j] = streams[j], streams[i] }
func (streams Streams) Less(i, j int) bool {
return streams[i].Labels <= streams[j].Labels
}

// Type implements `promql.Value`
func (Streams) Type() promql.ValueType { return ValueTypeStreams }

Expand Down Expand Up @@ -67,32 +74,28 @@ func (opts *EngineOpts) applyDefault() {
}
}

// Engine interface used to construct queries
type Engine interface {
NewRangeQuery(qs string, start, end time.Time, step, interval time.Duration, direction logproto.Direction, limit uint32) Query
NewInstantQuery(qs string, ts time.Time, direction logproto.Direction, limit uint32) Query
}

// engine is the LogQL engine.
type engine struct {
// Engine is the LogQL engine.
type Engine struct {
timeout time.Duration
evaluator Evaluator
}

// NewEngine creates a new LogQL engine.
func NewEngine(opts EngineOpts, q Querier) Engine {
if q == nil {
panic("nil Querier")
}

// NewEngine creates a new LogQL Engine.
func NewEngine(opts EngineOpts, q Querier) *Engine {
opts.applyDefault()
return &Engine{
timeout: opts.Timeout,
evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod),
}
}

return &engine{
timeout: opts.Timeout,
evaluator: &defaultEvaluator{
querier: q,
maxLookBackPeriod: opts.MaxLookBackPeriod,
},
// Query creates a new LogQL query. Instant/Range type is derived from the parameters.
func (ng *Engine) Query(params Params) Query {
return &query{
timeout: ng.timeout,
params: params,
evaluator: ng.evaluator,
parse: ParseExpr,
}
}

Expand All @@ -103,17 +106,18 @@ type Query interface {
}

type query struct {
LiteralParams

ng *engine
timeout time.Duration
params Params
parse func(string) (Expr, error)
evaluator Evaluator
}

// Exec Implements `Query`
// Exec Implements `Query`. It handles instrumentation & defers to Eval.
func (q *query) Exec(ctx context.Context) (Result, error) {
log, ctx := spanlogger.New(ctx, "Engine.Exec")
defer log.Finish()

rangeType := GetRangeType(q)
rangeType := GetRangeType(q.params)
timer := prometheus.NewTimer(queryTime.WithLabelValues(string(rangeType)))
defer timer.ObserveDuration()

Expand All @@ -122,7 +126,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
start := time.Now()
ctx = stats.NewContext(ctx)

data, err := q.ng.exec(ctx, q)
data, err := q.Eval(ctx)

statResult = stats.Snapshot(ctx, time.Since(start))
statResult.Log(level.Debug(log))
Expand All @@ -134,88 +138,49 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
status = "400"
}
}
RecordMetrics(ctx, q, status, statResult)
RecordMetrics(ctx, q.params, status, statResult)

return Result{
Data: data,
Statistics: statResult,
}, err
}

// NewRangeQuery creates a new LogQL range query.
func (ng *engine) NewRangeQuery(
qs string,
start, end time.Time, step time.Duration, interval time.Duration,
direction logproto.Direction, limit uint32) Query {
return &query{
LiteralParams: LiteralParams{
qs: qs,
start: start,
end: end,
step: step,
interval: interval,
direction: direction,
limit: limit,
},
ng: ng,
}
}

// NewInstantQuery creates a new LogQL instant query.
func (ng *engine) NewInstantQuery(
qs string,
ts time.Time,
direction logproto.Direction, limit uint32) Query {
return &query{
LiteralParams: LiteralParams{
qs: qs,
start: ts,
end: ts,
step: 0,
interval: 0,
direction: direction,
limit: limit,
},
ng: ng,
}
}

func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
func (q *query) Eval(ctx context.Context) (promql.Value, error) {
ctx, cancel := context.WithTimeout(ctx, q.timeout)
defer cancel()

qs := q.Query()

expr, err := ParseExpr(qs)
expr, err := q.parse(q.params.Query())
if err != nil {
return nil, err
}

switch e := expr.(type) {
case SampleExpr:
value, err := ng.evalSample(ctx, e, q)
value, err := q.evalSample(ctx, e)
return value, err

case LogSelectorExpr:
iter, err := ng.evaluator.Iterator(ctx, e, q)
iter, err := q.evaluator.Iterator(ctx, e, q.params)
if err != nil {
return nil, err
}

defer helpers.LogErrorWithContext(ctx, "closing iterator", iter.Close)
streams, err := readStreams(iter, q.limit, q.direction, q.interval)
streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval())
return streams, err
default:
return nil, errors.New("Unexpected type (%T): cannot evaluate")
}

return nil, nil
}

// evalSample evaluate a sampleExpr
func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (promql.Value, error) {
func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql.Value, error) {
if lit, ok := expr.(*literalExpr); ok {
return ng.evalLiteral(ctx, lit, q)
return q.evalLiteral(ctx, lit)
}

stepEvaluator, err := ng.evaluator.StepEvaluator(ctx, ng.evaluator, expr, q)
stepEvaluator, err := q.evaluator.StepEvaluator(ctx, q.evaluator, expr, q.params)
if err != nil {
return nil, err
}
Expand All @@ -224,7 +189,7 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
seriesIndex := map[uint64]*promql.Series{}

next, ts, vec := stepEvaluator.Next()
if GetRangeType(q) == InstantType {
if GetRangeType(q.params) == InstantType {
sort.Slice(vec, func(i, j int) bool { return labels.Compare(vec[i].Metric, vec[j].Metric) < 0 })
return vec, nil
}
Expand Down Expand Up @@ -262,21 +227,21 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
return result, nil
}

func (ng *engine) evalLiteral(_ context.Context, expr *literalExpr, q *query) (promql.Value, error) {
func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (promql.Value, error) {
s := promql.Scalar{
T: q.Start().UnixNano() / int64(time.Millisecond),
T: q.params.Start().UnixNano() / int64(time.Millisecond),
V: expr.value,
}

if GetRangeType(q) == InstantType {
if GetRangeType(q.params) == InstantType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this everywhere, should we cache it into the query struct ? Now that we do have a query struct we could add RangeType property to it ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@owen-d WDYT ?

return s, nil
}

return PopulateMatrixFromScalar(s, q.LiteralParams), nil
return PopulateMatrixFromScalar(s, q.params), nil

}

func PopulateMatrixFromScalar(data promql.Scalar, params LiteralParams) promql.Matrix {
func PopulateMatrixFromScalar(data promql.Scalar, params Params) promql.Matrix {
var (
start = params.Start()
end = params.End()
Expand All @@ -286,7 +251,7 @@ func PopulateMatrixFromScalar(data promql.Scalar, params LiteralParams) promql.M
[]promql.Point,
0,
// allocate enough space for all needed entries
int(params.End().Sub(params.Start())/params.Step())+1,
int(end.Sub(start)/step)+1,
),
}
)
Expand Down Expand Up @@ -329,10 +294,11 @@ func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, inte
}
}

result := make([]logproto.Stream, 0, len(streams))
result := make(Streams, 0, len(streams))
for _, stream := range streams {
result = append(result, *stream)
}
sort.Sort(result)
return result, i.Error()
}

Expand Down
Loading