From f1b8d4d8ad05fce640e373fa0527b56d23609bd2 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 24 Jan 2020 15:25:12 -0500 Subject: [PATCH] Improve logql query statistics collection. (#1573) * Improve logql query statistics collection. This also add information about ingester queries. Signed-off-by: Cyril Tovena * Improve documentation. Signed-off-by: Cyril Tovena * Fixes bad copy/past in the result log. Signed-off-by: Cyril Tovena * Fixes ingester tests. Signed-off-by: Cyril Tovena * Improve headchunk efficiency. Signed-off-by: Cyril Tovena * Fix bad commit on master. Signed-off-by: Cyril Tovena * Fixes new interface of ingester grpc server. Signed-off-by: Cyril Tovena * Improve documentations of fields. Signed-off-by: Cyril Tovena --- pkg/chunkenc/decompression/context.go | 42 --- pkg/chunkenc/memchunk.go | 27 +- pkg/chunkenc/memchunk_test.go | 2 +- pkg/ingester/ingester_test.go | 3 + pkg/ingester/instance.go | 55 ++-- pkg/iter/iterator.go | 20 +- pkg/logql/engine.go | 17 +- pkg/logql/stats/context.go | 193 +++++++++++ pkg/logql/stats/context_test.go | 92 ++++++ pkg/logql/stats/grpc.go | 113 +++++++ pkg/logql/stats/grpc_test.go | 110 +++++++ pkg/querier/querier.go | 3 +- pkg/storage/iterator.go | 15 +- pkg/storage/store.go | 4 + .../grpc/test/bufconn/bufconn.go | 308 ++++++++++++++++++ vendor/modules.txt | 1 + 16 files changed, 893 insertions(+), 112 deletions(-) delete mode 100644 pkg/chunkenc/decompression/context.go create mode 100644 pkg/logql/stats/context.go create mode 100644 pkg/logql/stats/context_test.go create mode 100644 pkg/logql/stats/grpc.go create mode 100644 pkg/logql/stats/grpc_test.go create mode 100644 vendor/google.golang.org/grpc/test/bufconn/bufconn.go diff --git a/pkg/chunkenc/decompression/context.go b/pkg/chunkenc/decompression/context.go deleted file mode 100644 index 3137bc1add24..000000000000 --- a/pkg/chunkenc/decompression/context.go +++ /dev/null @@ -1,42 +0,0 @@ -package decompression - -import ( - "context" - "time" -) - -type ctxKeyType string - -const ctxKey ctxKeyType = "decompression" - -// Stats is decompression statistic -type Stats struct { - BytesDecompressed int64 // Total bytes decompressed data size - BytesCompressed int64 // Total bytes compressed read - FetchedChunks int64 // Total number of chunks fetched. - TotalDuplicates int64 // Total number of line duplicates from replication. - TimeFetching time.Duration // Time spent fetching chunks. -} - -// NewContext creates a new decompression context -func NewContext(ctx context.Context) context.Context { - return context.WithValue(ctx, ctxKey, &Stats{}) -} - -// GetStats returns decompression statistics from a context. -func GetStats(ctx context.Context) Stats { - d, ok := ctx.Value(ctxKey).(*Stats) - if !ok { - return Stats{} - } - return *d -} - -// Mutate mutates the current context statistic using a mutator function -func Mutate(ctx context.Context, mutator func(m *Stats)) { - d, ok := ctx.Value(ctxKey).(*Stats) - if !ok { - return - } - mutator(d) -} diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 1e6d29461bf4..e89d47021cd7 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -13,10 +13,10 @@ import ( "github.com/pkg/errors" - "github.com/grafana/loki/pkg/chunkenc/decompression" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/stats" ) const ( @@ -477,7 +477,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi } if !c.head.isEmpty() { - its = append(its, c.head.iterator(mint, maxt, filter)) + its = append(its, c.head.iterator(ctx, mint, maxt, filter)) } iterForward := iter.NewTimeRangedIterator( @@ -500,18 +500,21 @@ func (b block) iterator(ctx context.Context, pool ReaderPool, filter logql.Filte return newBufferedIterator(ctx, pool, b.b, filter) } -func (hb *headBlock) iterator(mint, maxt int64, filter logql.Filter) iter.EntryIterator { +func (hb *headBlock) iterator(ctx context.Context, mint, maxt int64, filter logql.Filter) iter.EntryIterator { if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { return emptyIterator } + chunkStats := stats.GetChunkData(ctx) + // We are doing a copy everytime, this is because b.entries could change completely, // the alternate would be that we allocate a new b.entries everytime we cut a block, // but the tradeoff is that queries to near-realtime data would be much lower than // cutting of blocks. - + chunkStats.LinesUncompressed += int64(len(hb.entries)) entries := make([]entry, 0, len(hb.entries)) for _, e := range hb.entries { + chunkStats.BytesUncompressed += int64(len(e.s)) if filter == nil || filter([]byte(e.s)) { entries = append(entries, e) } @@ -558,9 +561,8 @@ func (li *listIterator) Close() error { return nil } func (li *listIterator) Labels() string { return "" } type bufferedIterator struct { - origBytes []byte - rootCtx context.Context - bytesDecompressed int64 + origBytes []byte + stats *stats.ChunkData bufReader *bufio.Reader reader io.Reader @@ -579,8 +581,10 @@ type bufferedIterator struct { } func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, filter logql.Filter) *bufferedIterator { + chunkStats := stats.GetChunkData(ctx) + chunkStats.BytesCompressed += int64(len(b)) return &bufferedIterator{ - rootCtx: ctx, + stats: chunkStats, origBytes: b, reader: nil, // will be initialized later bufReader: nil, // will be initialized later @@ -604,7 +608,8 @@ func (si *bufferedIterator) Next() bool { return false } // we decode always the line length and ts as varint - si.bytesDecompressed += int64(len(line)) + 2*binary.MaxVarintLen64 + si.stats.BytesDecompressed += int64(len(line)) + 2*binary.MaxVarintLen64 + si.stats.LinesDecompressed++ if si.filter != nil && !si.filter(line) { continue } @@ -682,10 +687,6 @@ func (si *bufferedIterator) Close() error { } func (si *bufferedIterator) close() { - decompression.Mutate(si.rootCtx, func(current *decompression.Stats) { - current.BytesDecompressed += si.bytesDecompressed - current.BytesCompressed += int64(len(si.origBytes)) - }) if si.reader != nil { si.pool.PutReader(si.reader) si.reader = nil diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index af1fc8eafc1b..5c283578a19e 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -539,7 +539,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - iter := h.iterator(0, math.MaxInt64, nil) + iter := h.iterator(context.Background(), 0, math.MaxInt64, nil) for iter.Next() { _ = iter.Entry() diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 1dadd5da63b7..5617fb0876f3 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -11,6 +11,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "golang.org/x/net/context" + "google.golang.org/grpc/metadata" "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/chunk" @@ -252,6 +253,8 @@ type mockQuerierServer struct { grpc.ServerStream } +func (*mockQuerierServer) SetTrailer(metadata.MD){} + func (m *mockQuerierServer) Send(resp *logproto.QueryResponse) error { m.resps = append(m.resps, resp) return nil diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index ce896ba378dc..5724effb5756 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/stats" "github.com/grafana/loki/pkg/util" ) @@ -186,6 +187,10 @@ func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels } func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { + // initialize stats collection for ingester queries and set grpc trailer with stats. + ctx := stats.NewContext(queryServer.Context()) + defer stats.SendAsTrailer(ctx, queryServer) + expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector() if err != nil { return err @@ -195,12 +200,13 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie return err } + ingStats := stats.GetIngesterData(ctx) var iters []iter.EntryIterator - err = i.forMatchingStreams( expr.Matchers(), func(stream *stream) error { - iter, err := stream.Iterator(queryServer.Context(), req.Start, req.End, req.Direction, filter) + ingStats.TotalChunksMatched += int64(len(stream.chunks)) + iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, filter) if err != nil { return err } @@ -212,10 +218,10 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie return err } - iter := iter.NewHeapIterator(queryServer.Context(), iters, req.Direction) + iter := iter.NewHeapIterator(ctx, iters, req.Direction) defer helpers.LogError("closing iterator", iter.Close) - return sendBatches(iter, queryServer, req.Limit) + return sendBatches(ctx, iter, queryServer, req.Limit) } func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) { @@ -381,10 +387,28 @@ func isDone(ctx context.Context) bool { } } -func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error { +func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer logproto.Querier_QueryServer, limit uint32) error { + ingStats := stats.GetIngesterData(ctx) if limit == 0 { - return sendAllBatches(i, queryServer) + // send all batches. + for !isDone(ctx) { + batch, size, err := iter.ReadBatch(i, queryBatchSize) + if err != nil { + return err + } + if len(batch.Streams) == 0 { + return nil + } + + if err := queryServer.Send(batch); err != nil { + return err + } + ingStats.TotalLinesSent += int64(size) + ingStats.TotalBatches++ + } + return nil } + // send until the limit is reached. sent := uint32(0) for sent < limit && !isDone(queryServer.Context()) { batch, batchSize, err := iter.ReadBatch(i, helpers.MinUint32(queryBatchSize, limit-sent)) @@ -400,23 +424,8 @@ func sendBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer, if err := queryServer.Send(batch); err != nil { return err } - } - return nil -} - -func sendAllBatches(i iter.EntryIterator, queryServer logproto.Querier_QueryServer) error { - for !isDone(queryServer.Context()) { - batch, _, err := iter.ReadBatch(i, queryBatchSize) - if err != nil { - return err - } - if len(batch.Streams) == 0 { - return nil - } - - if err := queryServer.Send(batch); err != nil { - return err - } + ingStats.TotalLinesSent += int64(batchSize) + ingStats.TotalBatches++ } return nil } diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 3b030b33431b..2cc81ae99c91 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -7,9 +7,9 @@ import ( "io" "time" - "github.com/grafana/loki/pkg/chunkenc/decompression" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/stats" ) // EntryIterator iterates over entries in time-order. @@ -132,19 +132,18 @@ type heapIterator struct { } is []EntryIterator prefetched bool - ctx context.Context + stats *stats.ChunkData - tuples []tuple - currEntry logproto.Entry - currLabels string - errs []error - linesDuplicate int64 + tuples []tuple + currEntry logproto.Entry + currLabels string + errs []error } // NewHeapIterator returns a new iterator which uses a heap to merge together // entries for multiple interators. func NewHeapIterator(ctx context.Context, is []EntryIterator, direction logproto.Direction) HeapIterator { - result := &heapIterator{is: is, ctx: ctx} + result := &heapIterator{is: is, stats: stats.GetChunkData(ctx)} switch direction { case logproto.BACKWARD: result.heap = &iteratorMaxHeap{} @@ -241,7 +240,7 @@ func (i *heapIterator) Next() bool { i.requeue(i.tuples[j].EntryIterator, true) continue } - i.linesDuplicate++ + i.stats.TotalDuplicates++ i.requeue(i.tuples[j].EntryIterator, false) } i.tuples = i.tuples[:0] @@ -311,9 +310,6 @@ func (i *heapIterator) Error() error { } func (i *heapIterator) Close() error { - decompression.Mutate(i.ctx, func(m *decompression.Stats) { - m.TotalDuplicates += i.linesDuplicate - }) for i.heap.Len() > 0 { if err := i.heap.Pop().(EntryIterator).Close(); err != nil { return err diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 22afada71949..113ef5111f1d 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -8,11 +8,10 @@ import ( "time" "github.com/cortexproject/cortex/pkg/util/spanlogger" - "github.com/go-kit/kit/log/level" - "github.com/grafana/loki/pkg/chunkenc/decompression" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/stats" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -164,19 +163,13 @@ func (ng *Engine) exec(ctx context.Context, q *query) (promql.Value, error) { return nil, err } - ctx = decompression.NewContext(ctx) + ctx = stats.NewContext(ctx) start := time.Now() defer func() { - stats := decompression.GetStats(ctx) - level.Debug(log).Log( - "Time Fetching chunk (ms)", stats.TimeFetching.Nanoseconds()/int64(time.Millisecond), - "Total Duplicates", stats.TotalDuplicates, - "Fetched chunks", stats.FetchedChunks, - "Total bytes compressed (MB)", stats.BytesCompressed/1024/1024, - "Total bytes uncompressed (MB)", stats.BytesDecompressed/1024/1024, - "Total exec time (ms)", time.Since(start).Nanoseconds()/int64(time.Millisecond), - ) + resultStats := stats.Snapshot(ctx, time.Since(start)) + stats.Log(log, resultStats) }() + switch e := expr.(type) { case SampleExpr: if err := ng.setupIterators(ctx, e, q); err != nil { diff --git a/pkg/logql/stats/context.go b/pkg/logql/stats/context.go new file mode 100644 index 000000000000..02041744289f --- /dev/null +++ b/pkg/logql/stats/context.go @@ -0,0 +1,193 @@ +/* +Package stats provides primitives for recording metrics across the query path. +Statistics are passed through the query context. +To start a new query statistics context use: + + ctx := stats.NewContext(ctx) + +Then you can update statistics by mutating data by using: + + stats.GetChunkData(ctx) + stats.GetIngesterData(ctx) + stats.GetStoreData + +Finally to get a snapshot of the current query statistic use + + stats.Snapshot(ctx,time.Since(start)) + +Ingester statistics are sent across the GRPC stream using Trailers +see https://github.com/grpc/grpc-go/blob/master/Documentation/grpc-metadata.md +*/ +package stats + +import ( + "context" + "time" + + "github.com/dustin/go-humanize" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" +) + +type ctxKeyType string + +const ( + trailersKey ctxKeyType = "trailers" + chunksKey ctxKeyType = "chunks" + ingesterKey ctxKeyType = "ingester" + storeKey ctxKeyType = "store" +) + +// Result contains LogQL query statistics. +type Result struct { + Ingester Ingester + Store Store + Summary Summary +} + +// Log logs a query statistics result. +func Log(log log.Logger, r Result) { + level.Debug(log).Log( + "Ingester.TotalReached", r.Ingester.TotalReached, + "Ingester.TotalChunksMatched", r.Ingester.TotalChunksMatched, + "Ingester.TotalBatches", r.Ingester.TotalBatches, + "Ingester.TotalLinesSent", r.Ingester.TotalLinesSent, + + "Ingester.BytesUncompressed", humanize.Bytes(uint64(r.Ingester.BytesUncompressed)), + "Ingester.LinesUncompressed", r.Ingester.LinesUncompressed, + "Ingester.BytesDecompressed", humanize.Bytes(uint64(r.Ingester.BytesDecompressed)), + "Ingester.LinesDecompressed", r.Ingester.LinesDecompressed, + "Ingester.BytesCompressed", humanize.Bytes(uint64(r.Ingester.BytesCompressed)), + "Ingester.TotalDuplicates", r.Ingester.TotalDuplicates, + + "Store.TotalChunksRef", r.Store.TotalChunksRef, + "Store.TotalDownloadedChunks", r.Store.TotalDownloadedChunks, + "Store.TimeDownloadingChunks", r.Store.TimeDownloadingChunks, + + "Store.BytesUncompressed", humanize.Bytes(uint64(r.Store.BytesUncompressed)), + "Store.LinesUncompressed", r.Store.LinesUncompressed, + "Store.BytesDecompressed", humanize.Bytes(uint64(r.Store.BytesDecompressed)), + "Store.LinesDecompressed", r.Store.LinesDecompressed, + "Store.BytesCompressed", humanize.Bytes(uint64(r.Store.BytesCompressed)), + "Store.TotalDuplicates", r.Store.TotalDuplicates, + + "Summary.BytesProcessedPerSeconds", humanize.Bytes(uint64(r.Summary.BytesProcessedPerSeconds)), + "Summary.LinesProcessedPerSeconds", r.Summary.LinesProcessedPerSeconds, + "Summary.TotalBytesProcessed", humanize.Bytes(uint64(r.Summary.TotalBytesProcessed)), + "Summary.TotalLinesProcessed", r.Summary.TotalLinesProcessed, + "Summary.ExecTime", r.Summary.ExecTime, + ) +} + +// Summary is the summary of a query statistics. +type Summary struct { + BytesProcessedPerSeconds int64 // Total bytes processed per seconds. + LinesProcessedPerSeconds int64 // Total lines processed per seconds. + TotalBytesProcessed int64 // Total bytes processed. + TotalLinesProcessed int64 // Total lines processed. + ExecTime time.Duration // Execution time. +} + +// Ingester is the statistics result for ingesters queries. +type Ingester struct { + IngesterData + ChunkData + TotalReached int +} + +// Store is the statistics result of the store. +type Store struct { + StoreData + ChunkData +} + +// NewContext creates a new statistics context +func NewContext(ctx context.Context) context.Context { + ctx = injectTrailerCollector(ctx) + ctx = context.WithValue(ctx, storeKey, &StoreData{}) + ctx = context.WithValue(ctx, chunksKey, &ChunkData{}) + ctx = context.WithValue(ctx, ingesterKey, &IngesterData{}) + return ctx +} + +// ChunkData contains chunks specific statistics. +type ChunkData struct { + BytesUncompressed int64 // Total bytes processed but was already in memory. (found in the headchunk) + LinesUncompressed int64 // Total lines processed but was already in memory. (found in the headchunk) + BytesDecompressed int64 // Total bytes decompressed and processed from chunks. + LinesDecompressed int64 // Total lines decompressed and processed from chunks. + BytesCompressed int64 // Total bytes of compressed chunks (blocks) processed. + TotalDuplicates int64 // Total duplicates found while processing. +} + +// GetChunkData returns the chunks statistics data from the current context. +func GetChunkData(ctx context.Context) *ChunkData { + res, ok := ctx.Value(chunksKey).(*ChunkData) + if !ok { + return &ChunkData{} + } + return res +} + +// IngesterData contains ingester specific statistics. +type IngesterData struct { + TotalChunksMatched int64 // Total of chunks matched by the query from ingesters + TotalBatches int64 // Total of batches sent from ingesters. + TotalLinesSent int64 // Total lines sent by ingesters. +} + +// GetIngesterData returns the ingester statistics data from the current context. +func GetIngesterData(ctx context.Context) *IngesterData { + res, ok := ctx.Value(ingesterKey).(*IngesterData) + if !ok { + return &IngesterData{} + } + return res +} + +// StoreData contains store specific statistics. +type StoreData struct { + TotalChunksRef int64 // The total of chunk reference fetched from index. + TotalDownloadedChunks int64 // Total number of chunks fetched. + TimeDownloadingChunks time.Duration // Time spent fetching chunks. +} + +// GetStoreData returns the store statistics data from the current context. +func GetStoreData(ctx context.Context) *StoreData { + res, ok := ctx.Value(storeKey).(*StoreData) + if !ok { + return &StoreData{} + } + return res +} + +// Snapshot compute query statistics from a context using the total exec time. +func Snapshot(ctx context.Context, execTime time.Duration) Result { + var res Result + // ingester data is decoded from grpc trailers. + res.Ingester = decodeTrailers(ctx) + // collect data from store. + s, ok := ctx.Value(storeKey).(*StoreData) + if ok { + res.Store.StoreData = *s + } + // collect data from chunks iteration. + c, ok := ctx.Value(chunksKey).(*ChunkData) + if ok { + res.Store.ChunkData = *c + } + + // calculate the summary + res.Summary.TotalBytesProcessed = res.Store.BytesDecompressed + res.Store.BytesUncompressed + + res.Ingester.BytesDecompressed + res.Ingester.BytesUncompressed + res.Summary.BytesProcessedPerSeconds = + int64(float64(res.Summary.TotalBytesProcessed) / + execTime.Seconds()) + res.Summary.TotalLinesProcessed = res.Store.LinesDecompressed + res.Store.LinesUncompressed + + res.Ingester.LinesDecompressed + res.Ingester.LinesUncompressed + res.Summary.LinesProcessedPerSeconds = + int64(float64(res.Summary.TotalLinesProcessed) / + execTime.Seconds()) + res.Summary.ExecTime = execTime + return res +} diff --git a/pkg/logql/stats/context_test.go b/pkg/logql/stats/context_test.go new file mode 100644 index 000000000000..c77585ab5fe8 --- /dev/null +++ b/pkg/logql/stats/context_test.go @@ -0,0 +1,92 @@ +package stats + +import ( + "context" + "testing" + "time" + + jsoniter "github.com/json-iterator/go" + "github.com/stretchr/testify/require" +) + +func TestSnapshot(t *testing.T) { + ctx := NewContext(context.Background()) + + GetChunkData(ctx).BytesUncompressed += 10 + GetChunkData(ctx).LinesUncompressed += 20 + GetChunkData(ctx).BytesDecompressed += 40 + GetChunkData(ctx).LinesDecompressed += 20 + GetChunkData(ctx).BytesCompressed += 30 + GetChunkData(ctx).TotalDuplicates += 10 + + GetStoreData(ctx).TotalChunksRef += 50 + GetStoreData(ctx).TotalDownloadedChunks += 60 + GetStoreData(ctx).TimeDownloadingChunks += time.Second + + fakeIngesterQuery(ctx) + fakeIngesterQuery(ctx) + + res := Snapshot(ctx, 2*time.Second) + expected := Result{ + Ingester: Ingester{ + IngesterData: IngesterData{ + TotalChunksMatched: 200, + TotalBatches: 50, + TotalLinesSent: 60, + }, + ChunkData: ChunkData{ + BytesUncompressed: 10, + LinesUncompressed: 20, + BytesDecompressed: 24, + LinesDecompressed: 40, + BytesCompressed: 60, + TotalDuplicates: 2, + }, + TotalReached: 2, + }, + Store: Store{ + StoreData: StoreData{ + TotalChunksRef: 50, + TotalDownloadedChunks: 60, + TimeDownloadingChunks: time.Second, + }, + ChunkData: ChunkData{ + BytesUncompressed: 10, + LinesUncompressed: 20, + BytesDecompressed: 40, + LinesDecompressed: 20, + BytesCompressed: 30, + TotalDuplicates: 10, + }, + }, + Summary: Summary{ + ExecTime: 2 * time.Second, + BytesProcessedPerSeconds: int64(42), + LinesProcessedPerSeconds: int64(50), + TotalBytesProcessed: int64(84), + TotalLinesProcessed: int64(100), + }, + } + require.Equal(t, expected, res) +} + +func fakeIngesterQuery(ctx context.Context) { + d, _ := ctx.Value(trailersKey).(*trailerCollector) + meta := d.addTrailer() + + c, _ := jsoniter.MarshalToString(ChunkData{ + BytesUncompressed: 5, + LinesUncompressed: 10, + BytesDecompressed: 12, + LinesDecompressed: 20, + BytesCompressed: 30, + TotalDuplicates: 1, + }) + meta.Set(chunkDataKey, c) + i, _ := jsoniter.MarshalToString(IngesterData{ + TotalChunksMatched: 100, + TotalBatches: 25, + TotalLinesSent: 30, + }) + meta.Set(ingesterDataKey, i) +} diff --git a/pkg/logql/stats/grpc.go b/pkg/logql/stats/grpc.go new file mode 100644 index 000000000000..11b3b8a22463 --- /dev/null +++ b/pkg/logql/stats/grpc.go @@ -0,0 +1,113 @@ +package stats + +import ( + "context" + "sync" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + jsoniter "github.com/json-iterator/go" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +const ( + ingesterDataKey = "ingester_data" + chunkDataKey = "chunk_data" +) + +type trailerCollector struct { + trailers []*metadata.MD + sync.Mutex +} + +func (c *trailerCollector) addTrailer() *metadata.MD { + c.Lock() + defer c.Unlock() + meta := metadata.MD{} + c.trailers = append(c.trailers, &meta) + return &meta +} + +func injectTrailerCollector(ctx context.Context) context.Context { + return context.WithValue(ctx, trailersKey, &trailerCollector{}) +} + +// CollectTrailer register a new trailer that can be collected by the engine. +func CollectTrailer(ctx context.Context) grpc.CallOption { + d, ok := ctx.Value(trailersKey).(*trailerCollector) + if !ok { + return grpc.EmptyCallOption{} + + } + return grpc.Trailer(d.addTrailer()) +} + +func SendAsTrailer(ctx context.Context, stream grpc.ServerStream) { + trailer, err := encodeTrailer(ctx) + if err != nil { + level.Warn(util.Logger).Log("msg", "failed to encode trailer", "err", err) + return + } + stream.SetTrailer(trailer) +} + +func encodeTrailer(ctx context.Context) (metadata.MD, error) { + meta := metadata.MD{} + ingData, ok := ctx.Value(ingesterKey).(*IngesterData) + if ok { + data, err := jsoniter.MarshalToString(ingData) + if err != nil { + return meta, err + } + meta.Set(ingesterDataKey, data) + } + chunkData, ok := ctx.Value(chunksKey).(*ChunkData) + if ok { + data, err := jsoniter.MarshalToString(chunkData) + if err != nil { + return meta, err + } + meta.Set(chunkDataKey, data) + } + return meta, nil +} + +func decodeTrailers(ctx context.Context) Ingester { + var res Ingester + collector, ok := ctx.Value(trailersKey).(*trailerCollector) + if !ok { + return res + } + res.TotalReached = len(collector.trailers) + for _, meta := range collector.trailers { + ing := decodeTrailer(meta) + res.TotalChunksMatched += ing.TotalChunksMatched + res.TotalBatches += ing.TotalBatches + res.TotalLinesSent += ing.TotalLinesSent + res.BytesUncompressed += ing.BytesUncompressed + res.LinesUncompressed += ing.LinesUncompressed + res.BytesDecompressed += ing.BytesDecompressed + res.LinesDecompressed += ing.LinesDecompressed + res.BytesCompressed += ing.BytesCompressed + res.TotalDuplicates += ing.TotalDuplicates + } + return res +} + +func decodeTrailer(meta *metadata.MD) Ingester { + var res Ingester + values := meta.Get(ingesterDataKey) + if len(values) == 1 { + if err := jsoniter.UnmarshalFromString(values[0], &res.IngesterData); err != nil { + level.Warn(util.Logger).Log("msg", "could not unmarshal ingester data", "err", err) + } + } + values = meta.Get(chunkDataKey) + if len(values) == 1 { + if err := jsoniter.UnmarshalFromString(values[0], &res.ChunkData); err != nil { + level.Warn(util.Logger).Log("msg", "could not unmarshal chunk data", "err", err) + } + } + return res +} diff --git a/pkg/logql/stats/grpc_test.go b/pkg/logql/stats/grpc_test.go new file mode 100644 index 000000000000..02cf69b38542 --- /dev/null +++ b/pkg/logql/stats/grpc_test.go @@ -0,0 +1,110 @@ +package stats + +import ( + "context" + "io" + "log" + "net" + "testing" + + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" +) + +const bufSize = 1024 * 1024 + +var lis *bufconn.Listener +var server *grpc.Server + +func init() { + lis = bufconn.Listen(bufSize) + server = grpc.NewServer() +} + +func bufDialer(context.Context, string) (net.Conn, error) { + return lis.Dial() +} + +func TestCollectTrailer(t *testing.T) { + ctx := context.Background() + conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) + if err != nil { + t.Fatalf("Failed to dial bufnet: %v", err) + } + defer conn.Close() + ing := ingesterFn(func(req *logproto.QueryRequest, s logproto.Querier_QueryServer) error { + ingCtx := NewContext(s.Context()) + defer SendAsTrailer(ingCtx, s) + GetIngesterData(ingCtx).TotalChunksMatched++ + GetIngesterData(ingCtx).TotalBatches = +2 + GetIngesterData(ingCtx).TotalLinesSent = +3 + GetChunkData(ingCtx).BytesUncompressed++ + GetChunkData(ingCtx).LinesUncompressed++ + GetChunkData(ingCtx).BytesDecompressed++ + GetChunkData(ingCtx).LinesDecompressed++ + GetChunkData(ingCtx).BytesCompressed++ + GetChunkData(ingCtx).TotalDuplicates++ + return nil + }) + logproto.RegisterQuerierServer(server, ing) + go func() { + if err := server.Serve(lis); err != nil { + log.Fatalf("Server exited with error: %v", err) + } + }() + + ingClient := logproto.NewQuerierClient(conn) + + ctx = NewContext(ctx) + + // query the ingester twice. + clientStream, err := ingClient.Query(ctx, &logproto.QueryRequest{}, CollectTrailer(ctx)) + if err != nil { + t.Fatal(err) + } + _, err = clientStream.Recv() + if err != nil && err != io.EOF { + t.Fatal(err) + } + clientStream, err = ingClient.Query(ctx, &logproto.QueryRequest{}, CollectTrailer(ctx)) + if err != nil { + t.Fatal(err) + } + _, err = clientStream.Recv() + if err != nil && err != io.EOF { + t.Fatal(err) + } + err = clientStream.CloseSend() + if err != nil { + t.Fatal(err) + } + res := decodeTrailers(ctx) + require.Equal(t, 2, res.TotalReached) + require.Equal(t, int64(2), res.TotalChunksMatched) + require.Equal(t, int64(4), res.TotalBatches) + require.Equal(t, int64(6), res.TotalLinesSent) + require.Equal(t, int64(2), res.BytesUncompressed) + require.Equal(t, int64(2), res.LinesUncompressed) + require.Equal(t, int64(2), res.BytesDecompressed) + require.Equal(t, int64(2), res.LinesDecompressed) + require.Equal(t, int64(2), res.BytesCompressed) + require.Equal(t, int64(2), res.TotalDuplicates) +} + +type ingesterFn func(*logproto.QueryRequest, logproto.Querier_QueryServer) error + +func (i ingesterFn) Query(req *logproto.QueryRequest, s logproto.Querier_QueryServer) error { + return i(req, s) +} +func (ingesterFn) Label(context.Context, *logproto.LabelRequest) (*logproto.LabelResponse, error) { + return nil, nil +} +func (ingesterFn) Tail(*logproto.TailRequest, logproto.Querier_TailServer) error { return nil } +func (ingesterFn) Series(context.Context, *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { + return nil, nil +} +func (ingesterFn) TailersCount(context.Context, *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) { + return nil, nil +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index bc8c610d21b8..8358b8be41b0 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/marshal" + "github.com/grafana/loki/pkg/logql/stats" "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/util/validation" ) @@ -172,7 +173,7 @@ func (q *Querier) Select(ctx context.Context, params logql.SelectParams) (iter.E func (q *Querier) queryIngesters(ctx context.Context, params logql.SelectParams) ([]iter.EntryIterator, error) { clients, err := q.forAllIngesters(ctx, func(client logproto.QuerierClient) (interface{}, error) { - return client.Query(ctx, params.QueryRequest) + return client.Query(ctx, params.QueryRequest, stats.CollectTrailer(ctx)) }) if err != nil { return nil, err diff --git a/pkg/storage/iterator.go b/pkg/storage/iterator.go index 808fece43eeb..01a134790e72 100644 --- a/pkg/storage/iterator.go +++ b/pkg/storage/iterator.go @@ -13,10 +13,10 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/grafana/loki/pkg/chunkenc" - "github.com/grafana/loki/pkg/chunkenc/decompression" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/stats" ) // lazyChunks is a slice of lazy chunks that can ordered by chunk boundaries @@ -366,11 +366,13 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error { log, ctx := spanlogger.New(ctx, "LokiStore.fetchLazyChunks") defer log.Finish() start := time.Now() - defer decompression.Mutate(ctx, func(m *decompression.Stats) { - m.TimeFetching += time.Since(start) - }) + storeStats := stats.GetStoreData(ctx) + var totalChunks int64 + defer func(){ + storeStats.TimeDownloadingChunks += time.Since(start) + storeStats.TotalDownloadedChunks += totalChunks + }() - var totalChunks int chksByFetcher := map[*chunk.Fetcher][]*chunkenc.LazyChunk{} for _, c := range chunks { if c.Chunk.Data == nil { @@ -419,9 +421,6 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error { lastErr = err } } - decompression.Mutate(ctx, func(m *decompression.Stats) { - m.FetchedChunks += int64(totalChunks) - }) return lastErr } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 9ac5faa3689a..aea3cc0b2a3d 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/stats" "github.com/grafana/loki/pkg/util" ) @@ -55,6 +56,8 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf // LazyQuery returns an iterator that will query the store for more chunks while iterating instead of fetching all chunks upfront // for that request. func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.EntryIterator, error) { + storeStats := stats.GetStoreData(ctx) + expr, err := req.LogSelector() if err != nil { return nil, err @@ -85,6 +88,7 @@ func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.Ent var totalChunks int for i := range chks { + storeStats.TotalChunksRef += int64(len(chks[i])) chks[i] = filterChunksByTime(from, through, chks[i]) totalChunks += len(chks[i]) } diff --git a/vendor/google.golang.org/grpc/test/bufconn/bufconn.go b/vendor/google.golang.org/grpc/test/bufconn/bufconn.go new file mode 100644 index 000000000000..168cdb8578dd --- /dev/null +++ b/vendor/google.golang.org/grpc/test/bufconn/bufconn.go @@ -0,0 +1,308 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package bufconn provides a net.Conn implemented by a buffer and related +// dialing and listening functionality. +package bufconn + +import ( + "fmt" + "io" + "net" + "sync" + "time" +) + +// Listener implements a net.Listener that creates local, buffered net.Conns +// via its Accept and Dial method. +type Listener struct { + mu sync.Mutex + sz int + ch chan net.Conn + done chan struct{} +} + +// Implementation of net.Error providing timeout +type netErrorTimeout struct { + error +} + +func (e netErrorTimeout) Timeout() bool { return true } +func (e netErrorTimeout) Temporary() bool { return false } + +var errClosed = fmt.Errorf("closed") +var errTimeout net.Error = netErrorTimeout{error: fmt.Errorf("i/o timeout")} + +// Listen returns a Listener that can only be contacted by its own Dialers and +// creates buffered connections between the two. +func Listen(sz int) *Listener { + return &Listener{sz: sz, ch: make(chan net.Conn), done: make(chan struct{})} +} + +// Accept blocks until Dial is called, then returns a net.Conn for the server +// half of the connection. +func (l *Listener) Accept() (net.Conn, error) { + select { + case <-l.done: + return nil, errClosed + case c := <-l.ch: + return c, nil + } +} + +// Close stops the listener. +func (l *Listener) Close() error { + l.mu.Lock() + defer l.mu.Unlock() + select { + case <-l.done: + // Already closed. + break + default: + close(l.done) + } + return nil +} + +// Addr reports the address of the listener. +func (l *Listener) Addr() net.Addr { return addr{} } + +// Dial creates an in-memory full-duplex network connection, unblocks Accept by +// providing it the server half of the connection, and returns the client half +// of the connection. +func (l *Listener) Dial() (net.Conn, error) { + p1, p2 := newPipe(l.sz), newPipe(l.sz) + select { + case <-l.done: + return nil, errClosed + case l.ch <- &conn{p1, p2}: + return &conn{p2, p1}, nil + } +} + +type pipe struct { + mu sync.Mutex + + // buf contains the data in the pipe. It is a ring buffer of fixed capacity, + // with r and w pointing to the offset to read and write, respsectively. + // + // Data is read between [r, w) and written to [w, r), wrapping around the end + // of the slice if necessary. + // + // The buffer is empty if r == len(buf), otherwise if r == w, it is full. + // + // w and r are always in the range [0, cap(buf)) and [0, len(buf)]. + buf []byte + w, r int + + wwait sync.Cond + rwait sync.Cond + + // Indicate that a write/read timeout has occurred + wtimedout bool + rtimedout bool + + wtimer *time.Timer + rtimer *time.Timer + + closed bool + writeClosed bool +} + +func newPipe(sz int) *pipe { + p := &pipe{buf: make([]byte, 0, sz)} + p.wwait.L = &p.mu + p.rwait.L = &p.mu + + p.wtimer = time.AfterFunc(0, func() {}) + p.rtimer = time.AfterFunc(0, func() {}) + return p +} + +func (p *pipe) empty() bool { + return p.r == len(p.buf) +} + +func (p *pipe) full() bool { + return p.r < len(p.buf) && p.r == p.w +} + +func (p *pipe) Read(b []byte) (n int, err error) { + p.mu.Lock() + defer p.mu.Unlock() + // Block until p has data. + for { + if p.closed { + return 0, io.ErrClosedPipe + } + if !p.empty() { + break + } + if p.writeClosed { + return 0, io.EOF + } + if p.rtimedout { + return 0, errTimeout + } + + p.rwait.Wait() + } + wasFull := p.full() + + n = copy(b, p.buf[p.r:len(p.buf)]) + p.r += n + if p.r == cap(p.buf) { + p.r = 0 + p.buf = p.buf[:p.w] + } + + // Signal a blocked writer, if any + if wasFull { + p.wwait.Signal() + } + + return n, nil +} + +func (p *pipe) Write(b []byte) (n int, err error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { + return 0, io.ErrClosedPipe + } + for len(b) > 0 { + // Block until p is not full. + for { + if p.closed || p.writeClosed { + return 0, io.ErrClosedPipe + } + if !p.full() { + break + } + if p.wtimedout { + return 0, errTimeout + } + + p.wwait.Wait() + } + wasEmpty := p.empty() + + end := cap(p.buf) + if p.w < p.r { + end = p.r + } + x := copy(p.buf[p.w:end], b) + b = b[x:] + n += x + p.w += x + if p.w > len(p.buf) { + p.buf = p.buf[:p.w] + } + if p.w == cap(p.buf) { + p.w = 0 + } + + // Signal a blocked reader, if any. + if wasEmpty { + p.rwait.Signal() + } + } + return n, nil +} + +func (p *pipe) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + p.closed = true + // Signal all blocked readers and writers to return an error. + p.rwait.Broadcast() + p.wwait.Broadcast() + return nil +} + +func (p *pipe) closeWrite() error { + p.mu.Lock() + defer p.mu.Unlock() + p.writeClosed = true + // Signal all blocked readers and writers to return an error. + p.rwait.Broadcast() + p.wwait.Broadcast() + return nil +} + +type conn struct { + io.Reader + io.Writer +} + +func (c *conn) Close() error { + err1 := c.Reader.(*pipe).Close() + err2 := c.Writer.(*pipe).closeWrite() + if err1 != nil { + return err1 + } + return err2 +} + +func (c *conn) SetDeadline(t time.Time) error { + c.SetReadDeadline(t) + c.SetWriteDeadline(t) + return nil +} + +func (c *conn) SetReadDeadline(t time.Time) error { + p := c.Reader.(*pipe) + p.mu.Lock() + defer p.mu.Unlock() + p.rtimer.Stop() + p.rtimedout = false + if !t.IsZero() { + p.rtimer = time.AfterFunc(time.Until(t), func() { + p.mu.Lock() + defer p.mu.Unlock() + p.rtimedout = true + p.rwait.Broadcast() + }) + } + return nil +} + +func (c *conn) SetWriteDeadline(t time.Time) error { + p := c.Writer.(*pipe) + p.mu.Lock() + defer p.mu.Unlock() + p.wtimer.Stop() + p.wtimedout = false + if !t.IsZero() { + p.wtimer = time.AfterFunc(time.Until(t), func() { + p.mu.Lock() + defer p.mu.Unlock() + p.wtimedout = true + p.wwait.Broadcast() + }) + } + return nil +} + +func (*conn) LocalAddr() net.Addr { return addr{} } +func (*conn) RemoteAddr() net.Addr { return addr{} } + +type addr struct{} + +func (addr) Network() string { return "bufconn" } +func (addr) String() string { return "bufconn" } diff --git a/vendor/modules.txt b/vendor/modules.txt index b928e26e54f1..107bcd233316 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -830,6 +830,7 @@ google.golang.org/grpc/serviceconfig google.golang.org/grpc/stats google.golang.org/grpc/status google.golang.org/grpc/tap +google.golang.org/grpc/test/bufconn # gopkg.in/alecthomas/kingpin.v2 v2.2.6 gopkg.in/alecthomas/kingpin.v2 # gopkg.in/fsnotify.v1 v1.4.7