Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add EstimatedChunkCount() method to ChunkSeries #512

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@ type ChunkSeriesSet interface {
type ChunkSeries interface {
Labels
ChunkIterable

// EstimatedChunkCount returns an estimate of the number of chunks available from this ChunkSeries.
//
// This estimate is used by Mimir's ingesters to report the number of chunks expected to be returned by a query,
// which is used by queriers to enforce the 'max chunks per query' limit.
EstimatedChunkCount() int
}

// Labels represents an item that has labels e.g. time series.
Expand Down
87 changes: 87 additions & 0 deletions storage/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
if len(series) == 0 {
return nil
}

return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
Expand All @@ -670,8 +671,84 @@ func NewCompactingChunkSeriesMerger(mergeFunc VerticalSeriesMergeFunc) VerticalC
iterators: iterators,
}
},
ChunkCountFn: func() int {
return estimateCompactedChunkCount(series)
},
}
}
}

// estimateCompactedChunkCount computes an estimate of the resulting number of chunks
// after compacting series.
//
// The estimate is imperfect in a few ways, due to the fact it does not examine individual samples:
// - it does not check for duplicate samples in overlapping chunks, and so may overestimate
// the resulting number of chunks if duplicate samples are present, or if an entire chunk is
// duplicated
// - it does not check if overlapping chunks have samples that swap back and forth between
// different encodings over time, and so may underestimate the resulting number of chunks
func estimateCompactedChunkCount(series []ChunkSeries) int {
h := make(chunkIteratorHeap, 0, len(series))

for _, s := range series {
iter := s.Iterator(nil)
if iter.Next() {
h.Push(iter)
}
}

totalChunkCount := 0

for len(h) > 0 {
iter := heap.Pop(&h).(chunks.Iterator)
prev := iter.At()
if iter.Next() {
heap.Push(&h, iter)
}

chunkCountForThisTimePeriod := 0
sampleCount := prev.Chunk.NumSamples()
maxTime := prev.MaxTime

// Find all overlapping chunks and estimate the number of resulting chunks.
for len(h) > 0 && h[0].At().MinTime <= maxTime {
iter := heap.Pop(&h).(chunks.Iterator)
next := iter.At()
if iter.Next() {
heap.Push(&h, iter)
}

if next.MaxTime > maxTime {
maxTime = next.MaxTime
}

if prev.Chunk.Encoding() != next.Chunk.Encoding() {
// If we have more than seriesToChunkEncoderSplit samples, account for the additional chunks we'll create.
chunkCountForThisTimePeriod += sampleCount / seriesToChunkEncoderSplit
if sampleCount%seriesToChunkEncoderSplit > 0 {
chunkCountForThisTimePeriod++
}

sampleCount = 0
}

sampleCount += next.Chunk.NumSamples()
prev = next
}

// If we have more than seriesToChunkEncoderSplit samples, account for the additional chunks we'll create.
chunkCountForThisTimePeriod += sampleCount / seriesToChunkEncoderSplit
if sampleCount%seriesToChunkEncoderSplit > 0 {
chunkCountForThisTimePeriod++
}
if chunkCountForThisTimePeriod == 0 {
chunkCountForThisTimePeriod = 1 // We'll always create at least one chunk.
}

totalChunkCount += chunkCountForThisTimePeriod
}

return totalChunkCount
}

// compactChunkIterator is responsible to compact chunks from different iterators of the same time series into single chainSeries.
Expand Down Expand Up @@ -801,6 +878,7 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc {
if len(series) == 0 {
return nil
}

return &ChunkSeriesEntry{
Lset: series[0].Labels(),
ChunkIteratorFn: func(chunks.Iterator) chunks.Iterator {
Expand All @@ -812,6 +890,15 @@ func NewConcatenatingChunkSeriesMerger() VerticalChunkSeriesMergeFunc {
iterators: iterators,
}
},
ChunkCountFn: func() int {
chunkCount := 0

for _, series := range series {
chunkCount += series.EstimatedChunkCount()
}

return chunkCount
},
}
}
}
Expand Down
18 changes: 15 additions & 3 deletions storage/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,10 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
}

for _, tc := range []struct {
name string
input []ChunkSeries
expected ChunkSeries
name string
input []ChunkSeries
expected ChunkSeries
expectedChunksEstimate int
}{
{
name: "single empty series",
Expand Down Expand Up @@ -483,6 +484,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
tsdbutil.GenerateSamples(0, 110),
),
expectedChunksEstimate: 2, // Estimation doesn't consider duplicate series when estimating the number of chunks.
},
{
name: "150 overlapping samples, split chunk",
Expand Down Expand Up @@ -520,6 +522,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
[]tsdbutil.Sample{fSample{12, 12}, fSample{14, 14}},
[]tsdbutil.Sample{histogramSample(15)},
),
expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings.
},
{
name: "float histogram chunks overlapping",
Expand All @@ -546,6 +549,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
[]tsdbutil.Sample{fSample{12, 12}, fSample{14, 14}},
[]tsdbutil.Sample{floatHistogramSample(15)},
),
expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings.
},
{
name: "float histogram chunks overlapping with histogram chunks",
Expand All @@ -560,6 +564,7 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
[]tsdbutil.Sample{histogramSample(12), histogramSample(14)},
[]tsdbutil.Sample{floatHistogramSample(15)},
),
expectedChunksEstimate: 4, // Estimation assumes overlapping chunks don't swap back and forth between different encodings.
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -570,6 +575,12 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {

require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)

if tc.expectedChunksEstimate == 0 {
tc.expectedChunksEstimate = len(actChks)
}

require.Equalf(t, tc.expectedChunksEstimate, merged.EstimatedChunkCount(), "expected estimate of %v chunks, actual chunks are: %v", tc.expectedChunksEstimate, actChks)
})
}
}
Expand Down Expand Up @@ -704,6 +715,7 @@ func TestConcatenatingChunkSeriesMerger(t *testing.T) {

require.Equal(t, expErr, actErr)
require.Equal(t, expChks, actChks)
require.Equal(t, len(expChks), merged.EstimatedChunkCount())
})
}
}
Expand Down
40 changes: 40 additions & 0 deletions storage/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ func (s *SeriesEntry) Iterator(it chunkenc.Iterator) chunkenc.Iterator { return

type ChunkSeriesEntry struct {
Lset labels.Labels
ChunkCountFn func() int
ChunkIteratorFn func(chunks.Iterator) chunks.Iterator
}

func (s *ChunkSeriesEntry) Labels() labels.Labels { return s.Lset }
func (s *ChunkSeriesEntry) Iterator(it chunks.Iterator) chunks.Iterator { return s.ChunkIteratorFn(it) }
func (s *ChunkSeriesEntry) EstimatedChunkCount() int { return s.ChunkCountFn() }

// NewListSeries returns series entry with iterator that allows to iterate over provided samples.
func NewListSeries(lset labels.Labels, s []tsdbutil.Sample) *SeriesEntry {
Expand Down Expand Up @@ -78,6 +80,7 @@ func NewListChunkSeriesFromSamples(lset labels.Labels, samples ...[]tsdbutil.Sam
}
return NewListChunkSeriesIterator(chks...)
},
ChunkCountFn: func() int { return len(samples) }, // We create one chunk per slice of samples.
}
}

Expand Down Expand Up @@ -399,6 +402,43 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
return NewListChunkSeriesIterator(chks...)
}

// EstimatedChunkCount returns an estimate of the number of chunks produced by Iterator.
//
// It is perfectly accurate except when histograms are present in series. When histograms are
// present, EstimatedChunkCount will underestimate the number of chunks produced, as the
// estimation does not consider individual samples and so triggers for new chunks such as
// counter resets, changes to the bucket schema and changes to the zero threshold are not
// taken into account.
func (s *seriesToChunkEncoder) EstimatedChunkCount() int {
chunkCount := 0
seriesIter := s.Series.Iterator(nil)
lastType := chunkenc.ValNone
samplesInChunk := 0

for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() {
if chunkCount == 0 {
// We'll always have at least one chunk if there's at least one sample.
chunkCount++
} else {
if lastType != typ {
// If the sample type changes, then we'll cut a new chunk.
chunkCount++
samplesInChunk = 0
}
}

if samplesInChunk == seriesToChunkEncoderSplit {
chunkCount++
samplesInChunk = 0
}

lastType = typ
samplesInChunk++
}

return chunkCount
}

func appendChunk(chks []chunks.Meta, mint, maxt int64, chk chunkenc.Chunk) []chunks.Meta {
if chk != nil {
chks = append(chks, chunks.Meta{
Expand Down
Loading