Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Vertical query merging and compaction #370

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
d14623d
Vertical series iterator
codesome Sep 1, 2018
bea7a5d
Select overlapped blocks first in compactor Plan()
codesome Sep 2, 2018
4afa1f0
Added vertical compaction
codesome Sep 3, 2018
f11143c
Code cleanup and comments
codesome Sep 4, 2018
55b13c0
Fix review comments
codesome Oct 6, 2018
f880bc9
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Oct 6, 2018
7de67b3
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Oct 25, 2018
ba9facf
Fix tests
codesome Oct 25, 2018
0f86bb0
Add benchmark for compaction
codesome Oct 25, 2018
0aae01d
Perform vertical compaction only when blocks are overlapping.
codesome Nov 3, 2018
cb9bb62
Benchmark for vertical compaction
codesome Nov 3, 2018
7ae4941
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Nov 3, 2018
94e5ec1
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Nov 13, 2018
4103678
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Nov 19, 2018
9df857d
Benchmark for query iterator and seek for non overlapping blocks
codesome Nov 30, 2018
ad4ef3f
Vertical query merge only for overlapping blocks
codesome Nov 30, 2018
5620350
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Nov 30, 2018
e9b05eb
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Dec 7, 2018
a8f4c26
Simplify logging in Compact(...)
codesome Dec 27, 2018
5e707bf
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Dec 27, 2018
d655420
Updated CHANGELOG.md
codesome Dec 27, 2018
6cb6f2a
Calculate overlapping inside populateBlock
codesome Jan 4, 2019
f53e648
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 5, 2019
6254595
MinTime and MaxTime for BlockReader.
codesome Jan 10, 2019
275eeb0
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 16, 2019
f43086f
Sort blocks w.r.t. MinTime in reload()
codesome Jan 19, 2019
48eed7c
Log about overlapping in LeveledCompactor.write() instead of returnin…
codesome Jan 19, 2019
9f288dc
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 19, 2019
b92a960
Log about overlapping inside LeveledCompactor.populateBlock()
codesome Jan 21, 2019
0d98331
Fix review comments
codesome Jan 21, 2019
acfbdb3
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 21, 2019
4d448d6
Refactor createBlock to take optional []Series
codesome Jan 21, 2019
159cbe3
review1
Jan 23, 2019
f1253d2
Merge pull request #6 from krasi-georgiev/pull/370-review
codesome Jan 23, 2019
1072f0f
Updated CHANGELOG and minor nits
codesome Jan 23, 2019
4a0e2e6
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 28, 2019
8047a2a
nits
codesome Jan 28, 2019
47436d0
Updated CHANGELOG
codesome Jan 28, 2019
ad7993e
Refactor iterator and seek benchmarks for Querier.
codesome Jan 30, 2019
117cef8
Additional test case
codesome Feb 8, 2019
5f8d911
genSeries takes optional labels. Updated BenchmarkQueryIterator and B…
codesome Feb 10, 2019
a23f030
Split genSeries into genSeries and populateSeries
codesome Feb 12, 2019
260665c
Check error in benchmark
codesome Feb 12, 2019
6a1d3f4
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Feb 12, 2019
d5e8479
Fix review comments
codesome Feb 14, 2019
58e534c
Warn about overlapping blocks in reload()
codesome Feb 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## master / unreleased
- [ENHANCEMENT] Time-ovelapping blocks are now allowed. [#370](https://github.com/prometheus/tsdb/pull/370)
- Added `MergeChunks` function in `chunkenc/xor.go` to merge 2 time-overlapping chunks.
- Added `MergeOverlappingChunks` function in `chunks/chunks.go` to merge multiple time-overlapping Chunk Metas.
- Added `MinTime` and `MaxTime` method for `BlockReader`.
- [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db.
- [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block.
- [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes`
Expand All @@ -7,9 +11,9 @@
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
- added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- new public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- Added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- New public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.
- [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read.
Expand All @@ -24,4 +28,4 @@
- [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)`
- [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field.
- [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset.
-[FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc.
- [FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc.
12 changes: 12 additions & 0 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ type BlockReader interface {

// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() (TombstoneReader, error)

// MinTime returns the min time of the block.
MinTime() int64

// MaxTime returns the max time of the block.
MaxTime() int64
}

// Appendable defines an entity to which data can be appended.
Expand Down Expand Up @@ -363,6 +369,12 @@ func (pb *Block) Dir() string { return pb.dir }
// Meta returns meta information about the block.
func (pb *Block) Meta() BlockMeta { return pb.meta }

// MinTime returns the min time of the meta.
func (pb *Block) MinTime() int64 { return pb.meta.MinTime }

// MaxTime returns the max time of the meta.
func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime }

// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }

Expand Down
21 changes: 20 additions & 1 deletion block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
if totalSeries == 0 || labelCount == 0 {
return nil
}
series := make([]Series, totalSeries)

series := make([]Series, totalSeries)
for i := 0; i < totalSeries; i++ {
lbls := make(map[string]string, labelCount)
for len(lbls) < labelCount {
Expand All @@ -114,7 +114,26 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
}
series[i] = newSeries(lbls, samples)
}
return series
}

// populateSeries generates series from given labels, mint and maxt.
func populateSeries(lbls []map[string]string, mint, maxt int64) []Series {
if len(lbls) == 0 {
return nil
}

series := make([]Series, 0, len(lbls))
for _, lbl := range lbls {
if len(lbl) == 0 {
continue
}
samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
for t := mint; t <= maxt; t++ {
samples = append(samples, sample{t: t, v: rand.Float64()})
}
series = append(series, newSeries(lbl, samples))
}
return series
}

Expand Down
78 changes: 78 additions & 0 deletions chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,84 @@ func (w *Writer) write(b []byte) error {
return err
}

// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The last appearing sample is retained in case there is overlapping.
// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if len(chks) < 2 {
return chks, nil
}
newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks.
newChks = append(newChks, chks[0])
last := 0
for _, c := range chks[1:] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
continue
}
nc := &newChks[last]
if c.MaxTime > nc.MaxTime {
nc.MaxTime = c.MaxTime
}
chk, err := MergeChunks(nc.Chunk, c.Chunk)
if err != nil {
return nil, err
}
nc.Chunk = chk
}

return newChks, nil
}

// MergeChunks vertically merges a and b, i.e., if there is any sample
// with same timestamp in both a and b, the sample in a is discarded.
func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return nil, err
}
ait := a.Iterator()
bit := b.Iterator()
aok, bok := ait.Next(), bit.Next()
for aok && bok {
at, av := ait.At()
bt, bv := bit.At()
if at < bt {
app.Append(at, av)
aok = ait.Next()
} else if bt < at {
app.Append(bt, bv)
bok = bit.Next()
} else {
app.Append(bt, bv)
aok = ait.Next()
bok = bit.Next()
}
}
for aok {
at, av := ait.At()
app.Append(at, av)
aok = ait.Next()
}
for bok {
bt, bv := bit.At()
app.Append(bt, bv)
bok = bit.Next()
}
if ait.Err() != nil {
return nil, ait.Err()
}
if bit.Err() != nil {
return nil, bit.Err()
}
return newChunk, nil
}

func (w *Writer) WriteChunks(chks ...Meta) error {
// Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one.
Expand Down
Loading