Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

NumSeries() method for BlockReader interface #656

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## Master / unreleased

- [FEATURE] `chunckenc.Chunk.Iterator` method now takes a `chunckenc.Iterator` interface as an argument for reuse.
- [FEATURE] `BlockReader` interface gets `NumSeries()` method.

## 0.9.1

Expand Down
8 changes: 8 additions & 0 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ type BlockReader interface {

// MaxTime returns the max time of the block.
MaxTime() int64

// NumSeries returns the total number of series in the block.
NumSeries() uint64
}

// Appendable defines an entity to which data can be appended.
Expand Down Expand Up @@ -648,6 +651,11 @@ func (pb *Block) LabelNames() ([]string, error) {
return pb.indexr.LabelNames()
}

// NumSeries returns number of series in the block.
func (pb *Block) NumSeries() uint64 {
return pb.meta.Stats.NumSeries
}

func clampInterval(a, b, mint, maxt int64) (int64, int64) {
if a < mint {
a = mint
Expand Down
1 change: 1 addition & 0 deletions compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.
func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") }
func (erringBReader) MinTime() int64 { return 0 }
func (erringBReader) MaxTime() int64 { return 0 }
func (erringBReader) NumSeries() uint64 { return 0 }

type nopChunkWriter struct{}

Expand Down
21 changes: 17 additions & 4 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Head struct {
logger log.Logger
appendPool sync.Pool
bytesPool sync.Pool
numSeries uint64

minTime, maxTime int64 // Current min and max of the samples included in the head.
minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block.
Expand All @@ -84,7 +85,7 @@ type Head struct {

type headMetrics struct {
activeAppenders prometheus.Gauge
series prometheus.Gauge
series prometheus.GaugeFunc
seriesCreated prometheus.Counter
seriesRemoved prometheus.Counter
seriesNotFound prometheus.Counter
Expand Down Expand Up @@ -112,9 +113,11 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_head_active_appenders",
Help: "Number of currently active appender transactions",
})
m.series = prometheus.NewGauge(prometheus.GaugeOpts{
m.series = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_head_series",
Help: "Total number of series in the head block.",
}, func() float64 {
return float64(h.NumSeries())
})
m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_head_series_created_total",
Expand Down Expand Up @@ -698,6 +701,10 @@ func (h *rangeHead) MaxTime() int64 {
return h.maxt
}

func (h *rangeHead) NumSeries() uint64 {
return h.head.NumSeries()
}

// initAppender is a helper to initialize the time bounds of the head
// upon the first sample it receives.
type initAppender struct {
Expand Down Expand Up @@ -1022,9 +1029,10 @@ func (h *Head) gc() {
seriesRemoved := len(deleted)

h.metrics.seriesRemoved.Add(float64(seriesRemoved))
h.metrics.series.Sub(float64(seriesRemoved))
h.metrics.chunksRemoved.Add(float64(chunksRemoved))
h.metrics.chunks.Sub(float64(chunksRemoved))
// Ref: https://golang.org/pkg/sync/atomic/#AddUint64
atomic.AddUint64(&h.numSeries, ^uint64(seriesRemoved-1))

// Remove deleted series IDs from the postings lists.
h.postings.Delete(deleted)
Expand Down Expand Up @@ -1111,6 +1119,11 @@ func (h *Head) MaxTime() int64 {
return atomic.LoadInt64(&h.maxTime)
}

// NumSeries returns the number of active series in the head.
func (h *Head) NumSeries() uint64 {
return atomic.LoadUint64(&h.numSeries)
}

// compactable returns whether the head has a compactable range.
// The head has a compactable range when the head time range is 1.5 times the chunk range.
// The 0.5 acts as a buffer of the appendable window.
Expand Down Expand Up @@ -1347,8 +1360,8 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie
return s, false
}

h.metrics.series.Inc()
h.metrics.seriesCreated.Inc()
atomic.AddUint64(&h.numSeries, 1)

h.postings.Add(id, lset)

Expand Down
10 changes: 6 additions & 4 deletions mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,16 @@ func (mockIndexWriter) WritePostings(name, value string, it index.Postings) erro
func (mockIndexWriter) Close() error { return nil }

type mockBReader struct {
ir IndexReader
cr ChunkReader
mint int64
maxt int64
ir IndexReader
cr ChunkReader
mint int64
maxt int64
numSeries uint64
}

func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil }
func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil }
func (r *mockBReader) Tombstones() (TombstoneReader, error) { return newMemTombstones(), nil }
func (r *mockBReader) MinTime() int64 { return r.mint }
func (r *mockBReader) MaxTime() int64 { return r.maxt }
func (r *mockBReader) NumSeries() uint64 { return r.numSeries }