Skip to content

Commit

Permalink
Store: queryStats sizeSum field use units.Base2Bytes to log with huma…
Browse files Browse the repository at this point in the history
…n-readable format.

Signed-off-by: Jimmie Han <hanjinming@outlook.com>
  • Loading branch information
hanjm committed Nov 25, 2021
1 parent ea1c6ab commit d863d2f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 37 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/felixge/fgprof v0.9.1
github.com/fortytw2/leaktest v1.3.0
github.com/fsnotify/fsnotify v1.5.1
github.com/go-kit/kit v0.12.0
github.com/go-kit/log v0.2.0
github.com/go-openapi/strfmt v0.21.0
github.com/gogo/protobuf v1.3.2
Expand Down
75 changes: 38 additions & 37 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/alecthomas/units"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/types"
"github.com/oklog/ulid"
"github.com/pkg/errors"
Expand Down Expand Up @@ -1091,25 +1092,25 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
defer func() {
s.metrics.seriesDataTouched.WithLabelValues("postings").Observe(float64(stats.postingsTouched))
s.metrics.seriesDataFetched.WithLabelValues("postings").Observe(float64(stats.postingsFetched))
s.metrics.seriesDataSizeTouched.WithLabelValues("postings").Observe(float64(stats.postingsTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("postings").Observe(float64(stats.postingsFetchedSizeSum))
s.metrics.seriesDataSizeTouched.WithLabelValues("postings").Observe(float64(stats.PostingsTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("postings").Observe(float64(stats.PostingsFetchedSizeSum))
s.metrics.seriesDataTouched.WithLabelValues("series").Observe(float64(stats.seriesTouched))
s.metrics.seriesDataFetched.WithLabelValues("series").Observe(float64(stats.seriesFetched))
s.metrics.seriesDataSizeTouched.WithLabelValues("series").Observe(float64(stats.seriesTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("series").Observe(float64(stats.seriesFetchedSizeSum))
s.metrics.seriesDataSizeTouched.WithLabelValues("series").Observe(float64(stats.SeriesTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("series").Observe(float64(stats.SeriesFetchedSizeSum))
s.metrics.seriesDataTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouched))
s.metrics.seriesDataFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetched))
s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.chunksTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.chunksFetchedSizeSum))
s.metrics.seriesDataSizeTouched.WithLabelValues("chunks").Observe(float64(stats.ChunksTouchedSizeSum))
s.metrics.seriesDataSizeFetched.WithLabelValues("chunks").Observe(float64(stats.ChunksFetchedSizeSum))
s.metrics.resultSeriesCount.Observe(float64(stats.mergedSeriesCount))
s.metrics.cachedPostingsCompressions.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressions))
s.metrics.cachedPostingsCompressions.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressions))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelEncode).Add(float64(stats.cachedPostingsCompressionErrors))
s.metrics.cachedPostingsCompressionErrors.WithLabelValues(labelDecode).Add(float64(stats.cachedPostingsDecompressionErrors))
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelEncode).Add(stats.CachedPostingsCompressionTimeSum.Seconds())
s.metrics.cachedPostingsCompressionTimeSeconds.WithLabelValues(labelDecode).Add(stats.CachedPostingsDecompressionTimeSum.Seconds())
s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.cachedPostingsOriginalSizeSum))
s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.cachedPostingsCompressedSizeSum))
s.metrics.cachedPostingsOriginalSizeBytes.Add(float64(stats.CachedPostingsOriginalSizeSum))
s.metrics.cachedPostingsCompressedSizeBytes.Add(float64(stats.CachedPostingsCompressedSizeSum))

level.Debug(s.logger).Log("msg", "stats query processed",
"request", req,
Expand Down Expand Up @@ -1990,7 +1991,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
// Get postings for the given key from cache first.
if b, ok := fromCache[key]; ok {
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(b)
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(b))

// Even if this instance is not using compression, there may be compressed
// entries in the cache written by other stores.
Expand Down Expand Up @@ -2066,7 +2067,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
r.stats.postingsFetchCount++
r.stats.postingsFetched += j - i
r.stats.PostingsFetchDurationSum += fetchTime
r.stats.postingsFetchedSizeSum += int(length)
r.stats.PostingsFetchedSizeSum += units.Base2Bytes(int(length))
r.mtx.Unlock()

for _, p := range ptrs[i:j] {
Expand Down Expand Up @@ -2106,11 +2107,11 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab

// If we just fetched it we still have to update the stats for touched postings.
r.stats.postingsTouched++
r.stats.postingsTouchedSizeSum += len(pBytes)
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(pBytes))
r.stats.cachedPostingsCompressions += compressions
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.cachedPostingsOriginalSizeSum += len(pBytes)
r.stats.cachedPostingsCompressedSizeSum += compressedSize
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(pBytes))
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsCompressionTimeSum += compressionTime
r.mtx.Unlock()
}
Expand Down Expand Up @@ -2228,7 +2229,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
r.stats.seriesFetchCount++
r.stats.seriesFetched += len(ids)
r.stats.SeriesFetchDurationSum += time.Since(begin)
r.stats.seriesFetchedSizeSum += int(end - start)
r.stats.SeriesFetchedSizeSum += units.Base2Bytes(int(end - start))
r.mtx.Unlock()

for i, id := range ids {
Expand Down Expand Up @@ -2332,7 +2333,7 @@ func (r *bucketIndexReader) LoadSeriesForTime(ref storage.SeriesRef, lset *[]sym
}

r.stats.seriesTouched++
r.stats.seriesTouchedSizeSum += len(b)
r.stats.SeriesTouchedSizeSum += units.Base2Bytes(len(b))
return decodeSeriesForTime(b, lset, chks, skipChunks, mint, maxt)
}

Expand Down Expand Up @@ -2516,7 +2517,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
r.stats.chunksFetchCount++
r.stats.chunksFetched += len(pIdxs)
r.stats.ChunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += int(part.End - part.Start)
r.stats.ChunksFetchedSizeSum += units.Base2Bytes(int(part.End - part.Start))

var (
buf []byte
Expand Down Expand Up @@ -2577,7 +2578,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
return errors.Wrap(err, "populate chunk")
}
r.stats.chunksTouched++
r.stats.chunksTouchedSizeSum += int(chunkDataLen)
r.stats.ChunksTouchedSizeSum += units.Base2Bytes(int(chunkDataLen))
continue
}

Expand All @@ -2602,14 +2603,14 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a

r.stats.chunksFetchCount++
r.stats.ChunksFetchDurationSum += time.Since(fetchBegin)
r.stats.chunksFetchedSizeSum += len(*nb)
r.stats.ChunksFetchedSizeSum += units.Base2Bytes(len(*nb))
err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save)
if err != nil {
r.block.chunkPool.Put(nb)
return errors.Wrap(err, "populate chunk")
}
r.stats.chunksTouched++
r.stats.chunksTouchedSizeSum += int(chunkDataLen)
r.stats.ChunksTouchedSizeSum += units.Base2Bytes(int(chunkDataLen))

r.block.chunkPool.Put(nb)
}
Expand Down Expand Up @@ -2663,33 +2664,33 @@ type queryStats struct {
blocksQueried int

postingsTouched int
postingsTouchedSizeSum int
PostingsTouchedSizeSum units.Base2Bytes
postingsToFetch int
postingsFetched int
postingsFetchedSizeSum int
PostingsFetchedSizeSum units.Base2Bytes
postingsFetchCount int
PostingsFetchDurationSum time.Duration

cachedPostingsCompressions int
cachedPostingsCompressionErrors int
cachedPostingsOriginalSizeSum int
cachedPostingsCompressedSizeSum int
CachedPostingsOriginalSizeSum units.Base2Bytes
CachedPostingsCompressedSizeSum units.Base2Bytes
CachedPostingsCompressionTimeSum time.Duration
cachedPostingsDecompressions int
cachedPostingsDecompressionErrors int
CachedPostingsDecompressionTimeSum time.Duration

seriesTouched int
seriesTouchedSizeSum int
SeriesTouchedSizeSum units.Base2Bytes
seriesFetched int
seriesFetchedSizeSum int
SeriesFetchedSizeSum units.Base2Bytes
seriesFetchCount int
SeriesFetchDurationSum time.Duration

chunksTouched int
chunksTouchedSizeSum int
ChunksTouchedSizeSum units.Base2Bytes
chunksFetched int
chunksFetchedSizeSum int
ChunksFetchedSizeSum units.Base2Bytes
chunksFetchCount int
ChunksFetchDurationSum time.Duration

Expand All @@ -2703,32 +2704,32 @@ func (s queryStats) merge(o *queryStats) *queryStats {
s.blocksQueried += o.blocksQueried

s.postingsTouched += o.postingsTouched
s.postingsTouchedSizeSum += o.postingsTouchedSizeSum
s.PostingsTouchedSizeSum += o.PostingsTouchedSizeSum
s.postingsFetched += o.postingsFetched
s.postingsFetchedSizeSum += o.postingsFetchedSizeSum
s.PostingsFetchedSizeSum += o.PostingsFetchedSizeSum
s.postingsFetchCount += o.postingsFetchCount
s.PostingsFetchDurationSum += o.PostingsFetchDurationSum

s.cachedPostingsCompressions += o.cachedPostingsCompressions
s.cachedPostingsCompressionErrors += o.cachedPostingsCompressionErrors
s.cachedPostingsOriginalSizeSum += o.cachedPostingsOriginalSizeSum
s.cachedPostingsCompressedSizeSum += o.cachedPostingsCompressedSizeSum
s.CachedPostingsOriginalSizeSum += o.CachedPostingsOriginalSizeSum
s.CachedPostingsCompressedSizeSum += o.CachedPostingsCompressedSizeSum
s.CachedPostingsCompressionTimeSum += o.CachedPostingsCompressionTimeSum
s.cachedPostingsDecompressions += o.cachedPostingsDecompressions
s.cachedPostingsDecompressionErrors += o.cachedPostingsDecompressionErrors
s.CachedPostingsDecompressionTimeSum += o.CachedPostingsDecompressionTimeSum

s.seriesTouched += o.seriesTouched
s.seriesTouchedSizeSum += o.seriesTouchedSizeSum
s.SeriesTouchedSizeSum += o.SeriesTouchedSizeSum
s.seriesFetched += o.seriesFetched
s.seriesFetchedSizeSum += o.seriesFetchedSizeSum
s.SeriesFetchedSizeSum += o.SeriesFetchedSizeSum
s.seriesFetchCount += o.seriesFetchCount
s.SeriesFetchDurationSum += o.SeriesFetchDurationSum

s.chunksTouched += o.chunksTouched
s.chunksTouchedSizeSum += o.chunksTouchedSizeSum
s.ChunksTouchedSizeSum += o.ChunksTouchedSizeSum
s.chunksFetched += o.chunksFetched
s.chunksFetchedSizeSum += o.chunksFetchedSizeSum
s.ChunksFetchedSizeSum += o.ChunksFetchedSizeSum
s.chunksFetchCount += o.chunksFetchCount
s.ChunksFetchDurationSum += o.ChunksFetchDurationSum

Expand Down

0 comments on commit d863d2f

Please sign in to comment.