Skip to content

Commit

Permalink
Fix memory spike when compacting overwritten points
Browse files Browse the repository at this point in the history
If a large series contains a point that is overwritten, the compactor
would load the whole series into RAM during a full compaction.  If
the series was large, it could cause very large RAM spikes and OOMs.

The change reworks the compactor to merge blocks more incrementally
similar to the fix done in #6556.
  • Loading branch information
jwilder committed May 6, 2016
1 parent 9db1718 commit d99c5e2
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 150 deletions.
213 changes: 155 additions & 58 deletions tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,13 +640,42 @@ type tsmKeyIterator struct {
blocks blocks

buf []blocks

// mergeValues are decoded blocks that have been combined
mergedValues Values

// merged are encoded blocks that have been combined or used as is
// without decode
merged blocks
}

type block struct {
key string
minTime, maxTime int64
b []byte
tombstones []TimeRange

// readMin, readMax are the timestamps range of values have been
// read and encoded from this block.
readMin, readMax int64
}

func (b *block) overlapsTimeRange(min, max int64) bool {
return b.minTime <= max && b.maxTime >= min
}

func (b *block) read() bool {
return b.readMin <= b.minTime && b.readMax >= b.maxTime
}

func (b *block) markRead(min, max int64) {
if min < b.readMin {
b.readMin = min
}

if max > b.readMax {
b.readMax = max
}
}

type blocks []*block
Expand Down Expand Up @@ -680,11 +709,26 @@ func NewTSMKeyIterator(size int, fast bool, readers ...*TSMReader) (KeyIterator,
}

func (k *tsmKeyIterator) Next() bool {
// If we still have blocks from the last read, slice off the current one
// and return
// Any merged blocks pending?
if len(k.merged) > 0 {
k.merged = k.merged[1:]
if len(k.merged) > 0 {
return true
}
}

// Any merged values pending?
if len(k.mergedValues) > 0 {
k.merge()
if len(k.merged) > 0 {
return true
}
}

// If we still have blocks from the last read, merge them
if len(k.blocks) > 0 {
k.blocks = k.blocks[1:]
if len(k.blocks) > 0 {
k.merge()
if len(k.merged) > 0 {
return true
}
}
Expand All @@ -702,14 +746,14 @@ func (k *tsmKeyIterator) Next() bool {
// This block may have ranges of time removed from it that would
// reduce the block min and max time.
tombstones := iter.r.TombstoneRange(key)
minTime, maxTime = k.clampTombstoneRange(tombstones, minTime, maxTime)

k.buf[i] = append(k.buf[i], &block{
minTime: minTime,
maxTime: maxTime,
key: key,
b: b,
tombstones: tombstones,
readMin: math.MaxInt64,
readMax: math.MinInt64,
})

blockKey := key
Expand All @@ -721,14 +765,15 @@ func (k *tsmKeyIterator) Next() bool {
}

tombstones := iter.r.TombstoneRange(key)
minTime, maxTime = k.clampTombstoneRange(tombstones, minTime, maxTime)

k.buf[i] = append(k.buf[i], &block{
minTime: minTime,
maxTime: maxTime,
key: key,
b: b,
tombstones: tombstones,
readMin: math.MaxInt64,
readMax: math.MinInt64,
})
}
}
Expand All @@ -747,27 +792,34 @@ func (k *tsmKeyIterator) Next() bool {
minKey = b[0].key
}
}
k.key = minKey

// Now we need to find all blocks that match the min key so we can combine and dedupe
// the blocks if necessary
for i, b := range k.buf {
if len(b) == 0 {
continue
}
if b[0].key == minKey {
if b[0].key == k.key {
k.blocks = append(k.blocks, b...)
k.buf[i] = nil
}
}

// No blocks left, we're done
if len(k.blocks) == 0 {
return false
}

// Only one block and no tombstoned values, just return early everything after is wasted work
if len(k.blocks) == 1 && len(k.blocks[0].tombstones) == 0 {
return true
k.merge()

return len(k.merged) > 0
}

// merge combines the next set of blocks into merged blocks
func (k *tsmKeyIterator) merge() {
// No blocks left, we're done
if len(k.blocks) == 0 {
return
}

// If we have more than one block or any partially tombstoned blocks, we many need to dedup
Expand All @@ -779,49 +831,80 @@ func (k *tsmKeyIterator) Next() bool {
// Quickly scan each block to see if any overlap with the prior block, if they overlap then
// we need to dedup as there may be duplicate points now
for i := 1; !dedup && i < len(k.blocks); i++ {
if k.blocks[i].read() {
dedup = true
break
}
if k.blocks[i].minTime <= k.blocks[i-1].maxTime || len(k.blocks[i].tombstones) > 0 {
dedup = true
break
}
}
}

k.blocks = k.combine(dedup)

return len(k.blocks) > 0
k.merged = k.combine(dedup)
}

// combine returns a new set of blocks using the current blocks in the buffers. If dedup
// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false,
// only blocks that are smaller than the chunk size will be decoded and combined.
func (k *tsmKeyIterator) combine(dedup bool) blocks {
var decoded Values
if dedup {
// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
v, err := DecodeBlock(k.blocks[i].b, nil)
if err != nil {
k.err = err
return nil
for len(k.mergedValues) < k.size && len(k.blocks) > 0 {
for len(k.blocks) > 0 && k.blocks[0].read() {
k.blocks = k.blocks[1:]
}

// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = Values(v).Exclude(ts.Min, ts.Max)
if len(k.blocks) == 0 {
break
}
decoded = append(decoded, v...)
first := k.blocks[0]

// We have some overlapping blocks so decode all, append in order and then dedup
for i := 0; i < len(k.blocks); i++ {
if !k.blocks[i].overlapsTimeRange(first.minTime, first.maxTime) || k.blocks[i].read() {
continue
}

v, err := DecodeBlock(k.blocks[i].b, nil)
if err != nil {
k.err = err
return nil
}

// Remove values we already read
v = Values(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax)

// Filter out only the values for overlapping block
v = Values(v).Include(first.minTime, first.maxTime)
if len(v) > 0 {
// Recoder that we read a subset of the block
k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano())
}

// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = Values(v).Exclude(ts.Min, ts.Max)
}

k.mergedValues = k.mergedValues.Merge(v)
}
k.blocks = k.blocks[1:]
}
decoded = decoded.Deduplicate()

// Since we combined multiple blocks, we could have more values than we should put into
// a single block. We need to chunk them up into groups and re-encode them.
return k.chunk(nil, decoded)
return k.chunk(nil)
} else {
var chunked blocks
var i int

for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}
// If we this block is already full, just add it as is
if BlockCount(k.blocks[i].b) >= k.size {
chunked = append(chunked, k.blocks[i])
Expand All @@ -833,88 +916,102 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks {

if k.fast {
for i < len(k.blocks) {
// skip this block if it's values were already read
if k.blocks[i].read() {
i++
continue
}

chunked = append(chunked, k.blocks[i])
i++
}
}

// If we only have 1 blocks left, just append it as is and avoid decoding/recoding
if i == len(k.blocks)-1 {
chunked = append(chunked, k.blocks[i])
if !k.blocks[i].read() {
chunked = append(chunked, k.blocks[i])
}
i++
}

// The remaining blocks can be combined and we know that they do not overlap and
// so we can just append each, sort and re-encode.
for i < len(k.blocks) {
for i < len(k.blocks) && len(k.mergedValues) < k.size {
if k.blocks[i].read() {
i++
continue
}

v, err := DecodeBlock(k.blocks[i].b, nil)
if err != nil {
k.err = err
return nil
}

decoded = append(decoded, v...)
// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = Values(v).Exclude(ts.Min, ts.Max)
}

k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)

k.mergedValues = k.mergedValues.Merge(v)
i++
}

sort.Sort(Values(decoded))
return k.chunk(chunked, decoded)
k.blocks = k.blocks[i:]

return k.chunk(chunked)
}
}

func (k *tsmKeyIterator) chunk(dst blocks, values []Value) blocks {
for len(values) > k.size {
cb, err := Values(values[:k.size]).Encode(nil)
func (k *tsmKeyIterator) chunk(dst blocks) blocks {
k.mergedValues.assertOrdered()

for len(k.mergedValues) > k.size {
values := k.mergedValues[:k.size]
cb, err := Values(values).Encode(nil)
if err != nil {
k.err = err
return nil
}

dst = append(dst, &block{
minTime: values[0].UnixNano(),
maxTime: values[k.size-1].UnixNano(),
key: k.blocks[0].key,
maxTime: values[len(values)-1].UnixNano(),
key: k.key,
b: cb,
})
values = values[k.size:]
k.mergedValues = k.mergedValues[k.size:]
return dst
}

// Re-encode the remaining values into the last block
if len(values) > 0 {
cb, err := Values(values).Encode(nil)
if len(k.mergedValues) > 0 {
cb, err := Values(k.mergedValues).Encode(nil)
if err != nil {
k.err = err
return nil
}

dst = append(dst, &block{
minTime: values[0].UnixNano(),
maxTime: values[len(values)-1].UnixNano(),
key: k.blocks[0].key,
minTime: k.mergedValues[0].UnixNano(),
maxTime: k.mergedValues[len(k.mergedValues)-1].UnixNano(),
key: k.key,
b: cb,
})
k.mergedValues = k.mergedValues[:0]
}
return dst
}

func (k *tsmKeyIterator) clampTombstoneRange(tombstones []TimeRange, minTime, maxTime int64) (int64, int64) {
for _, t := range tombstones {
if t.Min > minTime {
minTime = t.Min
}
if t.Max < maxTime {
maxTime = t.Max
}
}
return minTime, maxTime
}

func (k *tsmKeyIterator) Read() (string, int64, int64, []byte, error) {
if len(k.blocks) == 0 {
if len(k.merged) == 0 {
return "", 0, 0, nil, k.err
}

block := k.blocks[0]
block := k.merged[0]
return block.key, block.minTime, block.maxTime, block.b, k.err
}

Expand Down
Loading

0 comments on commit d99c5e2

Please sign in to comment.