Skip to content

Commit

Permalink
querier: Added regressions tests for counter missed bug.
Browse files Browse the repository at this point in the history
PR with just tests, not fix yet.

Reproduces: #2401

* Added regressions tests for CounterSeriesIterator; Simplified aggregators.
* Fixes edge dedup cases for Next and added tests for deduplication.
* Refactored downsampling tests, added more realistic cases.
* Added check for duplicated chunks during downsampling.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Apr 30, 2020
1 parent 9b94b90 commit ef83090
Show file tree
Hide file tree
Showing 7 changed files with 1,755 additions and 1,689 deletions.
35 changes: 27 additions & 8 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ func Downsample(
if err := indexr.Series(postings.At(), &lset, &chks); err != nil {
return id, errors.Wrapf(err, "get series %d", postings.At())
}

for i, c := range chks[1:] {
if chks[i].MaxTime >= c.MinTime {
return id, errors.Errorf("found overlapping chunks within series %d. Chunks expected to be ordered by min time and non-overlapping, got: %v", postings.At(), chks)
}
}

// While #183 exists, we sanitize the chunks we retrieved from the block
// before retrieving their samples.
for i, c := range chks {
Expand All @@ -129,6 +136,9 @@ func Downsample(
// Raw and already downsampled data need different processing.
if origMeta.Thanos.Downsample.Resolution == 0 {
for _, c := range chks {
// TODO(bwplotka): We can optimze this further by using in WriteSeries iterators of each chunk instead of
// samples. Also ensure 120 sample limit, otherwise we have gigantic chunks.
// https://github.com/thanos-io/thanos/issues/2542
if err := expandChunkIterator(c.Chunk.Iterator(reuseIt), &all); err != nil {
return id, errors.Wrapf(err, "expand chunk %d, series %d", c.Ref, postings.At())
}
Expand All @@ -139,7 +149,11 @@ func Downsample(
} else {
// Downsample a block that contains aggregated chunks already.
for _, c := range chks {
aggrChunks = append(aggrChunks, c.Chunk.(*AggrChunk))
ac, ok := c.Chunk.(*AggrChunk)
if !ok {
return id, errors.Errorf("expected downsampled chunk (*downsample.AggrChunk) got %T instead for series: %d", c.Chunk, postings.At())
}
aggrChunks = append(aggrChunks, ac)
}
downsampledChunks, err := downsampleAggr(
aggrChunks,
Expand Down Expand Up @@ -334,12 +348,12 @@ func downsampleRawLoop(data []sample, resolution int64, numChunks int) []chunks.

ab := newAggrChunkBuilder()

// Encode first raw value; see CounterSeriesIterator.
// Encode first raw value; see CounterAggrChunksIterator.
ab.apps[AggrCounter].Append(batch[0].t, batch[0].v)

lastT := downsampleBatch(batch, resolution, ab.add)

// Encode last raw value; see CounterSeriesIterator.
// Encode last raw value; see CounterAggrChunksIterator.
ab.apps[AggrCounter].Append(lastT, batch[len(batch)-1].v)

chks = append(chks, ab.encode())
Expand Down Expand Up @@ -524,7 +538,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
ab.chunks[AggrCounter] = chunkenc.NewXORChunk()
ab.apps[AggrCounter], _ = ab.chunks[AggrCounter].Appender()

// Retain first raw value; see CounterSeriesIterator.
// Retain first raw value; see CounterAggrChunksIterator.
ab.apps[AggrCounter].Append((*buf)[0].t, (*buf)[0].v)

lastT := downsampleBatch(*buf, resolution, func(t int64, a *aggregator) {
Expand All @@ -536,7 +550,7 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (ch
ab.apps[AggrCounter].Append(t, a.counter)
})

// Retain last raw value; see CounterSeriesIterator.
// Retain last raw value; see CounterAggrChunksIterator.
ab.apps[AggrCounter].Append(lastT, it.lastV)

ab.mint = mint
Expand All @@ -551,16 +565,21 @@ type sample struct {

// CounterSeriesIterator generates monotonically increasing values by iterating
// over an ordered sequence of chunks, which should be raw or aggregated chunks
// of counter values. The generated samples can be used by PromQL functions
// like 'rate' that calculate differences between counter values.
// of counter values. The generated samples can be used by PromQL functions
// like 'rate' that calculate differences between counter values. Stale Markers
// are removed as well.
//
// Counter aggregation chunks must have the first and last values from their
// original raw series: the first raw value should be the first value encoded
// in the chunk, and the last raw value is encoded by the duplication of the
// previous sample's timestamp. As iteration occurs between chunks, the
// previous sample's timestamp. As iteration occurs between chunks, the
// comparison between the last raw value of the earlier chunk and the first raw
// value of the later chunk ensures that counter resets between chunks are
// recognized and that the correct value delta is calculated.
//
// It handles overlapped chunks (removes overlaps.
// NOTE: It is important to deduplicate with care ensuring that you don't hit
// issue https://github.com/thanos-io/thanos/issues/2401#issuecomment-621958839.
type CounterSeriesIterator struct {
chks []chunkenc.Iterator
i int // Current chunk.
Expand Down
Loading

0 comments on commit ef83090

Please sign in to comment.