Skip to content

Commit

Permalink
store: add streamed snappy encoding for postings list
Browse files Browse the repository at this point in the history
We've noticed that decoding Snappy compressed postings list
takes a lot of RAM:

```
(pprof) top
Showing nodes accounting for 1427.30GB, 67.55% of 2112.82GB total
Dropped 1069 nodes (cum <= 10.56GB)
Showing top 10 nodes out of 82
      flat  flat%   sum%        cum   cum%
         0     0%     0%  1905.67GB 90.20%  golang.org/x/sync/errgroup.(*Group).Go.func1
    2.08GB 0.098% 0.098%  1456.94GB 68.96%  github.com/thanos-io/thanos/pkg/store.(*blockSeriesClient).ExpandPostings
    1.64GB 0.078%  0.18%  1454.87GB 68.86%  github.com/thanos-io/thanos/pkg/store.(*bucketIndexReader).ExpandedPostings
    2.31GB  0.11%  0.29%  1258.15GB 59.55%  github.com/thanos-io/thanos/pkg/store.(*bucketIndexReader).fetchPostings
    1.48GB  0.07%  0.36%  1219.67GB 57.73%  github.com/thanos-io/thanos/pkg/store.diffVarintSnappyDecode
 1215.21GB 57.52% 57.87%  1215.21GB 57.52%  github.com/klauspost/compress/s2.Decode
```

This is because we are creating a new []byte slice for the decoded data
each time. To avoid this RAM usage problem, let's stream the decoding
from a given buffer. Since Snappy block format doesn't support streamed
decoding, let's switch to Snappy stream format which is made for exactly
that.

Notice that our current `index.Postings` list does not
support going back through Seek() even if theoretically one could want
something like that. Fortunately, to search for posting intersection, we
need to only go forward.

Benchmark data:

```
name                                                          time/op
PostingsEncodingDecoding/10000/raw/encode-16                  71.6µs ± 3%
PostingsEncodingDecoding/10000/raw/decode-16                  76.3ns ± 4%
PostingsEncodingDecoding/10000#01/snappy/encode-16            73.3µs ± 1%
PostingsEncodingDecoding/10000#01/snappy/decode-16            1.63µs ± 6%
PostingsEncodingDecoding/10000#02/snappyStreamed/encode-16     111µs ± 2%
PostingsEncodingDecoding/10000#02/snappyStreamed/decode-16    14.5µs ± 7%
PostingsEncodingDecoding/100000/snappyStreamed/encode-16      1.09ms ± 2%
PostingsEncodingDecoding/100000/snappyStreamed/decode-16      14.4µs ± 4%
PostingsEncodingDecoding/100000#01/raw/encode-16               710µs ± 1%
PostingsEncodingDecoding/100000#01/raw/decode-16              79.3ns ±13%
PostingsEncodingDecoding/100000#02/snappy/encode-16            719µs ± 1%
PostingsEncodingDecoding/100000#02/snappy/decode-16           13.5µs ± 4%
PostingsEncodingDecoding/1000000/raw/encode-16                7.14ms ± 1%
PostingsEncodingDecoding/1000000/raw/decode-16                81.7ns ± 9%
PostingsEncodingDecoding/1000000#01/snappy/encode-16          7.52ms ± 3%
PostingsEncodingDecoding/1000000#01/snappy/decode-16           139µs ± 4%
PostingsEncodingDecoding/1000000#02/snappyStreamed/encode-16  11.4ms ± 4%
PostingsEncodingDecoding/1000000#02/snappyStreamed/decode-16  15.5µs ± 4%

name                                                          alloc/op
PostingsEncodingDecoding/10000/raw/encode-16                  13.6kB ± 0%
PostingsEncodingDecoding/10000/raw/decode-16                   96.0B ± 0%
PostingsEncodingDecoding/10000#01/snappy/encode-16            25.9kB ± 0%
PostingsEncodingDecoding/10000#01/snappy/decode-16            11.0kB ± 0%
PostingsEncodingDecoding/10000#02/snappyStreamed/encode-16    16.6kB ± 0%
PostingsEncodingDecoding/10000#02/snappyStreamed/decode-16     148kB ± 0%
PostingsEncodingDecoding/100000/snappyStreamed/encode-16       148kB ± 0%
PostingsEncodingDecoding/100000/snappyStreamed/decode-16       148kB ± 0%
PostingsEncodingDecoding/100000#01/raw/encode-16               131kB ± 0%
PostingsEncodingDecoding/100000#01/raw/decode-16               96.0B ± 0%
PostingsEncodingDecoding/100000#02/snappy/encode-16            254kB ± 0%
PostingsEncodingDecoding/100000#02/snappy/decode-16            107kB ± 0%
PostingsEncodingDecoding/1000000/raw/encode-16                1.25MB ± 0%
PostingsEncodingDecoding/1000000/raw/decode-16                 96.0B ± 0%
PostingsEncodingDecoding/1000000#01/snappy/encode-16          2.48MB ± 0%
PostingsEncodingDecoding/1000000#01/snappy/decode-16          1.05MB ± 0%
PostingsEncodingDecoding/1000000#02/snappyStreamed/encode-16  1.47MB ± 0%
PostingsEncodingDecoding/1000000#02/snappyStreamed/decode-16   148kB ± 0%

name                                                          allocs/op
PostingsEncodingDecoding/10000/raw/encode-16                    2.00 ± 0%
PostingsEncodingDecoding/10000/raw/decode-16                    2.00 ± 0%
PostingsEncodingDecoding/10000#01/snappy/encode-16              3.00 ± 0%
PostingsEncodingDecoding/10000#01/snappy/decode-16              4.00 ± 0%
PostingsEncodingDecoding/10000#02/snappyStreamed/encode-16      4.00 ± 0%
PostingsEncodingDecoding/10000#02/snappyStreamed/decode-16      5.00 ± 0%
PostingsEncodingDecoding/100000/snappyStreamed/encode-16        4.00 ± 0%
PostingsEncodingDecoding/100000/snappyStreamed/decode-16        5.00 ± 0%
PostingsEncodingDecoding/100000#01/raw/encode-16                2.00 ± 0%
PostingsEncodingDecoding/100000#01/raw/decode-16                2.00 ± 0%
PostingsEncodingDecoding/100000#02/snappy/encode-16             3.00 ± 0%
PostingsEncodingDecoding/100000#02/snappy/decode-16             4.00 ± 0%
PostingsEncodingDecoding/1000000/raw/encode-16                  2.00 ± 0%
PostingsEncodingDecoding/1000000/raw/decode-16                  2.00 ± 0%
PostingsEncodingDecoding/1000000#01/snappy/encode-16            3.00 ± 0%
PostingsEncodingDecoding/1000000#01/snappy/decode-16            4.00 ± 0%
PostingsEncodingDecoding/1000000#02/snappyStreamed/encode-16    4.00 ± 0%
PostingsEncodingDecoding/1000000#02/snappyStreamed/decode-16    5.00 ± 0%
```

Compression ratios are still the same like previously:

```
$ /bin/go test -v -timeout 10m -run ^TestDiffVarintCodec$ github.com/thanos-io/thanos/pkg/store
[snip]
=== RUN   TestDiffVarintCodec/snappy/i!~"2.*"
    postings_codec_test.go:73: postings entries: 944450
    postings_codec_test.go:74: original size (4*entries): 3777800 bytes
    postings_codec_test.go:80: encoded size 44498 bytes
    postings_codec_test.go:81: ratio: 0.012
=== RUN   TestDiffVarintCodec/snappyStreamed/i!~"2.*"
    postings_codec_test.go:73: postings entries: 944450
    postings_codec_test.go:74: original size (4*entries): 3777800 bytes
    postings_codec_test.go:80: encoded size 44670 bytes
    postings_codec_test.go:81: ratio: 0.012
```

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Apr 20, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 6d838e7 commit 51a984d
Showing 3 changed files with 209 additions and 17 deletions.
6 changes: 3 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
@@ -2340,9 +2340,9 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
l index.Postings
err error
)
if isDiffVarintSnappyEncodedPostings(b) {
if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) {
s := time.Now()
clPostings, err := diffVarintSnappyDecode(b)
clPostings, err := getDecodingFunction(b)(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
@@ -2442,7 +2442,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
compressions++
s := time.Now()
bep := newBigEndianPostings(pBytes[4:])
data, err := diffVarintSnappyEncode(bep, bep.length())
data, err := diffVarintSnappyStreamedEncode(bep, bep.length())
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
158 changes: 157 additions & 1 deletion pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,8 @@ package store

import (
"bytes"
"encoding/binary"
"io"
"sync"

"github.com/golang/snappy"
@@ -25,14 +27,168 @@ import (
// significantly (to about 20% of original), snappy then halves it to ~10% of the original.

const (
codecHeaderSnappy = "dvs" // As in "diff+varint+snappy".
codecHeaderSnappy = "dvs" // As in "diff+varint+snappy".
codecHeaderStreamedSnappy = "dss" // As in "diffvarint+streamed snappy".
)

func getDecodingFunction(input []byte) func([]byte) (closeablePostings, error) {
if isDiffVarintSnappyEncodedPostings(input) {
return diffVarintSnappyDecode
}
if isDiffVarintSnappyStreamedEncodedPostings(input) {
return diffVarintSnappyStreamedDecode
}
return nil
}

// isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec.
func isDiffVarintSnappyEncodedPostings(input []byte) bool {
return bytes.HasPrefix(input, []byte(codecHeaderSnappy))
}

// isDiffVarintSnappyStreamedEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy streamed codec.
func isDiffVarintSnappyStreamedEncodedPostings(input []byte) bool {
return bytes.HasPrefix(input, []byte(codecHeaderStreamedSnappy))
}

func writeUvarint(w io.Writer, oneByteSlice []byte, x uint64) error {
for x >= 0x80 {
oneByteSlice[0] = byte(x) | 0x80
n, err := w.Write(oneByteSlice)
if err != nil {
return err
}
if n != 1 {
return io.EOF
}
x >>= 7
}
oneByteSlice[0] = byte(x)
n, err := w.Write(oneByteSlice)
if err != nil {
return err
}
if n != 1 {
return io.EOF
}
return nil
}

var snappyWriterPool, snappyReaderPool sync.Pool

func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error) {
// 1.25 bytes per postings + header + snappy stream beginning.
out := make([]byte, 0, 10+snappy.MaxEncodedLen(5*length/4)+len(codecHeaderStreamedSnappy))
out = append(out, []byte(codecHeaderStreamedSnappy)...)
compressedBuf := bytes.NewBuffer(out[len(codecHeaderStreamedSnappy):])
var sw *snappy.Writer

oneByteSlice := make([]byte, 1)

pooledSW := snappyWriterPool.Get()
if pooledSW == nil {
sw = snappy.NewBufferedWriter(compressedBuf)
} else {
sw = pooledSW.(*snappy.Writer)
sw.Reset(compressedBuf)
}

defer func() {
snappyWriterPool.Put(sw)
}()

prev := storage.SeriesRef(0)
for p.Next() {
v := p.At()
if v < prev {
return nil, errors.Errorf("postings entries must be in increasing order, current: %d, previous: %d", v, prev)
}

if err := writeUvarint(sw, oneByteSlice, uint64(v-prev)); err != nil {
return nil, errors.Wrap(err, "writing uvarint encoded byte")
}
prev = v
}
if p.Err() != nil {
return nil, p.Err()
}
if err := sw.Close(); err != nil {
return nil, errors.Wrap(err, "closing snappy stream writer")
}

return out[:len(codecHeaderStreamedSnappy)+compressedBuf.Len()], nil
}

func diffVarintSnappyStreamedDecode(input []byte) (closeablePostings, error) {
if !isDiffVarintSnappyStreamedEncodedPostings(input) {
return nil, errors.New("header not found")
}

return newStreamedDiffVarintPostings(input[len(codecHeaderStreamedSnappy):]), nil
}

type streamedDiffVarintPostings struct {
cur storage.SeriesRef

sr *snappy.Reader
err error
}

func newStreamedDiffVarintPostings(input []byte) closeablePostings {
var sr *snappy.Reader

srPooled := snappyReaderPool.Get()
if srPooled == nil {
sr = snappy.NewReader(bytes.NewBuffer(input))
} else {
sr = srPooled.(*snappy.Reader)
sr.Reset(bytes.NewBuffer(input))
}

return &streamedDiffVarintPostings{sr: sr}
}

func (it *streamedDiffVarintPostings) close() {
snappyReaderPool.Put(it.sr)
}

func (it *streamedDiffVarintPostings) At() storage.SeriesRef {
return it.cur
}

func (it *streamedDiffVarintPostings) Next() bool {
val, err := binary.ReadUvarint(it.sr)
if err != nil {
if err != io.EOF {
it.err = err
}
return false
}

it.cur = it.cur + storage.SeriesRef(val)
return true
}

func (it *streamedDiffVarintPostings) Err() error {
return it.err
}

func (it *streamedDiffVarintPostings) Seek(x storage.SeriesRef) bool {
if it.cur >= x {
return true
}

// We cannot do any search due to how values are stored,
// so we simply advance until we find the right value.
for it.Next() {
if it.At() >= x {
return true
}
}

return false
}

// diffVarintSnappyEncode encodes postings into diff+varint representation,
// and applies snappy compression on the result.
// Returned byte slice starts with codecHeaderSnappy header.
62 changes: 49 additions & 13 deletions pkg/store/postings_codec_test.go
Original file line number Diff line number Diff line change
@@ -57,8 +57,9 @@ func TestDiffVarintCodec(t *testing.T) {
codingFunction func(index.Postings, int) ([]byte, error)
decodingFunction func([]byte) (closeablePostings, error)
}{
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
"snappyStreamed": {codingFunction: diffVarintSnappyStreamedEncode, decodingFunction: diffVarintSnappyStreamedDecode},
}

for postingName, postings := range postingsMap {
@@ -194,7 +195,7 @@ func (p *uint64Postings) len() int {
return len(p.vals)
}

func BenchmarkEncodePostings(b *testing.B) {
func BenchmarkPostingsEncodingDecoding(b *testing.B) {
const max = 1000000
r := rand.New(rand.NewSource(0))

@@ -208,16 +209,51 @@ func BenchmarkEncodePostings(b *testing.B) {
p[ix] = p[ix-1] + storage.SeriesRef(d)
}

codecs := map[string]struct {
codingFunction func(index.Postings, int) ([]byte, error)
decodingFunction func([]byte) (closeablePostings, error)
}{
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
"snappyStreamed": {codingFunction: diffVarintSnappyStreamedEncode, decodingFunction: diffVarintSnappyStreamedDecode},
}

b.ReportAllocs()

for _, count := range []int{10000, 100000, 1000000} {
b.Run(strconv.Itoa(count), func(b *testing.B) {
for i := 0; i < b.N; i++ {
ps := &uint64Postings{vals: p[:count]}

_, err := diffVarintEncodeNoHeader(ps, ps.len())
if err != nil {
b.Fatal(err)
}
}
})
for codecName, codecFns := range codecs {
b.Run(strconv.Itoa(count), func(b *testing.B) {
b.Run(codecName, func(b *testing.B) {
b.Run("encode", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
ps := &uint64Postings{vals: p[:count]}

_, err := codecFns.codingFunction(ps, ps.len())
if err != nil {
b.Fatal(err)
}
}
})
b.Run("decode", func(b *testing.B) {
ps := &uint64Postings{vals: p[:count]}

encoded, err := codecFns.codingFunction(ps, ps.len())
if err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := codecFns.decodingFunction(encoded)
if err != nil {
b.Fatal(err)
}
}
})

})
})
}
}
}

0 comments on commit 51a984d

Please sign in to comment.