Skip to content

Commit

Permalink
Improve logql query statistics collection. (#1573)
Browse files Browse the repository at this point in the history
* Improve logql query statistics collection.
This also add information about ingester queries.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve documentation.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes bad copy/past in the result log.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes ingester tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve headchunk efficiency.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix bad commit on master.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes new interface of ingester grpc server.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve documentations of fields.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Jan 24, 2020
1 parent 887f0cc commit f1b8d4d
Show file tree
Hide file tree
Showing 16 changed files with 893 additions and 112 deletions.
42 changes: 0 additions & 42 deletions pkg/chunkenc/decompression/context.go

This file was deleted.

27 changes: 14 additions & 13 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (

"github.com/pkg/errors"

"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
)

const (
Expand Down Expand Up @@ -477,7 +477,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
}

if !c.head.isEmpty() {
its = append(its, c.head.iterator(mint, maxt, filter))
its = append(its, c.head.iterator(ctx, mint, maxt, filter))
}

iterForward := iter.NewTimeRangedIterator(
Expand All @@ -500,18 +500,21 @@ func (b block) iterator(ctx context.Context, pool ReaderPool, filter logql.Filte
return newBufferedIterator(ctx, pool, b.b, filter)
}

func (hb *headBlock) iterator(mint, maxt int64, filter logql.Filter) iter.EntryIterator {
func (hb *headBlock) iterator(ctx context.Context, mint, maxt int64, filter logql.Filter) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return emptyIterator
}

chunkStats := stats.GetChunkData(ctx)

// We are doing a copy everytime, this is because b.entries could change completely,
// the alternate would be that we allocate a new b.entries everytime we cut a block,
// but the tradeoff is that queries to near-realtime data would be much lower than
// cutting of blocks.

chunkStats.LinesUncompressed += int64(len(hb.entries))
entries := make([]entry, 0, len(hb.entries))
for _, e := range hb.entries {
chunkStats.BytesUncompressed += int64(len(e.s))
if filter == nil || filter([]byte(e.s)) {
entries = append(entries, e)
}
Expand Down Expand Up @@ -558,9 +561,8 @@ func (li *listIterator) Close() error { return nil }
func (li *listIterator) Labels() string { return "" }

type bufferedIterator struct {
origBytes []byte
rootCtx context.Context
bytesDecompressed int64
origBytes []byte
stats *stats.ChunkData

bufReader *bufio.Reader
reader io.Reader
Expand All @@ -579,8 +581,10 @@ type bufferedIterator struct {
}

func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, filter logql.Filter) *bufferedIterator {
chunkStats := stats.GetChunkData(ctx)
chunkStats.BytesCompressed += int64(len(b))
return &bufferedIterator{
rootCtx: ctx,
stats: chunkStats,
origBytes: b,
reader: nil, // will be initialized later
bufReader: nil, // will be initialized later
Expand All @@ -604,7 +608,8 @@ func (si *bufferedIterator) Next() bool {
return false
}
// we decode always the line length and ts as varint
si.bytesDecompressed += int64(len(line)) + 2*binary.MaxVarintLen64
si.stats.BytesDecompressed += int64(len(line)) + 2*binary.MaxVarintLen64
si.stats.LinesDecompressed++
if si.filter != nil && !si.filter(line) {
continue
}
Expand Down Expand Up @@ -682,10 +687,6 @@ func (si *bufferedIterator) Close() error {
}

func (si *bufferedIterator) close() {
decompression.Mutate(si.rootCtx, func(current *decompression.Stats) {
current.BytesDecompressed += si.bytesDecompressed
current.BytesCompressed += int64(len(si.origBytes))
})
if si.reader != nil {
si.pool.PutReader(si.reader)
si.reader = nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
iter := h.iterator(0, math.MaxInt64, nil)
iter := h.iterator(context.Background(), 0, math.MaxInt64, nil)

for iter.Next() {
_ = iter.Entry()
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/chunk"
Expand Down Expand Up @@ -252,6 +253,8 @@ type mockQuerierServer struct {
grpc.ServerStream
}

func (*mockQuerierServer) SetTrailer(metadata.MD){}

func (m *mockQuerierServer) Send(resp *logproto.QueryResponse) error {
m.resps = append(m.resps, resp)
return nil
Expand Down
55 changes: 32 additions & 23 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util"
)

Expand Down Expand Up @@ -186,6 +187,10 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels
}

func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
// initialize stats collection for ingester queries and set grpc trailer with stats.
ctx := stats.NewContext(queryServer.Context())
defer stats.SendAsTrailer(ctx, queryServer)

expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector()
if err != nil {
return err
Expand All @@ -195,12 +200,13 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return err
}

ingStats := stats.GetIngesterData(ctx)
var iters []iter.EntryIterator

err = i.forMatchingStreams(
expr.Matchers(),
func(stream *stream) error {
iter, err := stream.Iterator(queryServer.Context(), req.Start, req.End, req.Direction, filter)
ingStats.TotalChunksMatched += int64(len(stream.chunks))
iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, filter)
if err != nil {
return err
}
Expand All @@ -212,10 +218,10 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return err
}

iter := iter.NewHeapIterator(queryServer.Context(), iters, req.Direction)
iter := iter.NewHeapIterator(ctx, iters, req.Direction)
defer helpers.LogError("closing iterator", iter.Close)

return sendBatches(iter, queryServer, req.Limit)
return sendBatches(ctx, iter, queryServer, req.Limit)
}

func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
Expand Down Expand Up @@ -381,10 +387,28 @@ func isDone(ctx context.Context) bool {
}
}

func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error {
func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error {
ingStats := stats.GetIngesterData(ctx)
if limit == 0 {
return sendAllBatches(i, queryServer)
// send all batches.
for !isDone(ctx) {
batch, size, err := iter.ReadBatch(i, queryBatchSize)
if err != nil {
return err
}
if len(batch.Streams) == 0 {
return nil
}

if err := queryServer.Send(batch); err != nil {
return err
}
ingStats.TotalLinesSent += int64(size)
ingStats.TotalBatches++
}
return nil
}
// send until the limit is reached.
sent := uint32(0)
for sent < limit && !isDone(queryServer.Context()) {
batch, batchSize, err := iter.ReadBatch(i, helpers.MinUint32(queryBatchSize, limit-sent))
Expand All @@ -400,23 +424,8 @@ func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer,
if err := queryServer.Send(batch); err != nil {
return err
}
}
return nil
}

func sendAllBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer) error {
for !isDone(queryServer.Context()) {
batch, _, err := iter.ReadBatch(i, queryBatchSize)
if err != nil {
return err
}
if len(batch.Streams) == 0 {
return nil
}

if err := queryServer.Send(batch); err != nil {
return err
}
ingStats.TotalLinesSent += int64(batchSize)
ingStats.TotalBatches++
}
return nil
}
20 changes: 8 additions & 12 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"io"
"time"

"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
)

// EntryIterator iterates over entries in time-order.
Expand Down Expand Up @@ -132,19 +132,18 @@ type heapIterator struct {
}
is []EntryIterator
prefetched bool
ctx context.Context
stats *stats.ChunkData

tuples []tuple
currEntry logproto.Entry
currLabels string
errs []error
linesDuplicate int64
tuples []tuple
currEntry logproto.Entry
currLabels string
errs []error
}

// NewHeapIterator returns a new iterator which uses a heap to merge together
// entries for multiple interators.
func NewHeapIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator {
result := &heapIterator{is: is, ctx: ctx}
result := &heapIterator{is: is, stats: stats.GetChunkData(ctx)}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorMaxHeap{}
Expand Down Expand Up @@ -241,7 +240,7 @@ func (i *heapIterator) Next() bool {
i.requeue(i.tuples[j].EntryIterator, true)
continue
}
i.linesDuplicate++
i.stats.TotalDuplicates++
i.requeue(i.tuples[j].EntryIterator, false)
}
i.tuples = i.tuples[:0]
Expand Down Expand Up @@ -311,9 +310,6 @@ func (i *heapIterator) Error() error {
}

func (i *heapIterator) Close() error {
decompression.Mutate(i.ctx, func(m *decompression.Stats) {
m.TotalDuplicates += i.linesDuplicate
})
for i.heap.Len() > 0 {
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
return err
Expand Down
17 changes: 5 additions & 12 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/chunkenc/decompression"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -164,19 +163,13 @@ func (ng *Engine) exec(ctx context.Context, q *query) (promql.Value, error) {
return nil, err
}

ctx = decompression.NewContext(ctx)
ctx = stats.NewContext(ctx)
start := time.Now()
defer func() {
stats := decompression.GetStats(ctx)
level.Debug(log).Log(
"Time Fetching chunk (ms)", stats.TimeFetching.Nanoseconds()/int64(time.Millisecond),
"Total Duplicates", stats.TotalDuplicates,
"Fetched chunks", stats.FetchedChunks,
"Total bytes compressed (MB)", stats.BytesCompressed/1024/1024,
"Total bytes uncompressed (MB)", stats.BytesDecompressed/1024/1024,
"Total exec time (ms)", time.Since(start).Nanoseconds()/int64(time.Millisecond),
)
resultStats := stats.Snapshot(ctx, time.Since(start))
stats.Log(log, resultStats)
}()

switch e := expr.(type) {
case SampleExpr:
if err := ng.setupIterators(ctx, e, q); err != nil {
Expand Down
Loading

0 comments on commit f1b8d4d

Please sign in to comment.