Skip to content

Commit

Permalink
store: add streamed snappy encoding for postings list (#6303)
Browse files Browse the repository at this point in the history
* store: add streamed snappy encoding for postings list

We've noticed that decoding Snappy compressed postings list
takes a lot of RAM:

```
(pprof) top
Showing nodes accounting for 1427.30GB, 67.55% of 2112.82GB total
Dropped 1069 nodes (cum <= 10.56GB)
Showing top 10 nodes out of 82
      flat  flat%   sum%        cum   cum%
         0     0%     0%  1905.67GB 90.20%  golang.org/x/sync/errgroup.(*Group).Go.func1
    2.08GB 0.098% 0.098%  1456.94GB 68.96%  github.com/thanos-io/thanos/pkg/store.(*blockSeriesClient).ExpandPostings
    1.64GB 0.078%  0.18%  1454.87GB 68.86%  github.com/thanos-io/thanos/pkg/store.(*bucketIndexReader).ExpandedPostings
    2.31GB  0.11%  0.29%  1258.15GB 59.55%  github.com/thanos-io/thanos/pkg/store.(*bucketIndexReader).fetchPostings
    1.48GB  0.07%  0.36%  1219.67GB 57.73%  github.com/thanos-io/thanos/pkg/store.diffVarintSnappyDecode
 1215.21GB 57.52% 57.87%  1215.21GB 57.52%  github.com/klauspost/compress/s2.Decode
```

This is because we are creating a new []byte slice for the decoded data
each time. To avoid this RAM usage problem, let's stream the decoding
from a given buffer. Since Snappy block format doesn't support streamed
decoding, let's switch to Snappy stream format which is made for exactly
that.

Notice that our current `index.Postings` list does not
support going back through Seek() even if theoretically one could want
something like that. Fortunately, to search for posting intersection, we
need to only go forward.

Benchmark data:

```
name                                                          time/op
PostingsEncodingDecoding/10000/raw/encode-16                  71.6µs ± 3%
PostingsEncodingDecoding/10000/raw/decode-16                  76.3ns ± 4%
PostingsEncodingDecoding/10000#01/snappy/encode-16            73.3µs ± 1%
PostingsEncodingDecoding/10000#01/snappy/decode-16            1.63µs ± 6%
PostingsEncodingDecoding/10000#02/snappyStreamed/encode-16     111µs ± 2%
PostingsEncodingDecoding/10000#02/snappyStreamed/decode-16    14.5µs ± 7%
PostingsEncodingDecoding/100000/snappyStreamed/encode-16      1.09ms ± 2%
PostingsEncodingDecoding/100000/snappyStreamed/decode-16      14.4µs ± 4%
PostingsEncodingDecoding/100000#01/raw/encode-16               710µs ± 1%
PostingsEncodingDecoding/100000#01/raw/decode-16              79.3ns ±13%
PostingsEncodingDecoding/100000#02/snappy/encode-16            719µs ± 1%
PostingsEncodingDecoding/100000#02/snappy/decode-16           13.5µs ± 4%
PostingsEncodingDecoding/1000000/raw/encode-16                7.14ms ± 1%
PostingsEncodingDecoding/1000000/raw/decode-16                81.7ns ± 9%
PostingsEncodingDecoding/1000000#01/snappy/encode-16          7.52ms ± 3%
PostingsEncodingDecoding/1000000#01/snappy/decode-16           139µs ± 4%
PostingsEncodingDecoding/1000000#02/snappyStreamed/encode-16  11.4ms ± 4%
PostingsEncodingDecoding/1000000#02/snappyStreamed/decode-16  15.5µs ± 4%

name                                                          alloc/op
PostingsEncodingDecoding/10000/raw/encode-16                  13.6kB ± 0%
PostingsEncodingDecoding/10000/raw/decode-16                   96.0B ± 0%
PostingsEncodingDecoding/10000#01/snappy/encode-16            25.9kB ± 0%
PostingsEncodingDecoding/10000#01/snappy/decode-16            11.0kB ± 0%
PostingsEncodingDecoding/10000#02/snappyStreamed/encode-16    16.6kB ± 0%
PostingsEncodingDecoding/10000#02/snappyStreamed/decode-16     148kB ± 0%
PostingsEncodingDecoding/100000/snappyStreamed/encode-16       148kB ± 0%
PostingsEncodingDecoding/100000/snappyStreamed/decode-16       148kB ± 0%
PostingsEncodingDecoding/100000#01/raw/encode-16               131kB ± 0%
PostingsEncodingDecoding/100000#01/raw/decode-16               96.0B ± 0%
PostingsEncodingDecoding/100000#02/snappy/encode-16            254kB ± 0%
PostingsEncodingDecoding/100000#02/snappy/decode-16            107kB ± 0%
PostingsEncodingDecoding/1000000/raw/encode-16                1.25MB ± 0%
PostingsEncodingDecoding/1000000/raw/decode-16                 96.0B ± 0%
PostingsEncodingDecoding/1000000#01/snappy/encode-16          2.48MB ± 0%
PostingsEncodingDecoding/1000000#01/snappy/decode-16          1.05MB ± 0%
PostingsEncodingDecoding/1000000#02/snappyStreamed/encode-16  1.47MB ± 0%
PostingsEncodingDecoding/1000000#02/snappyStreamed/decode-16   148kB ± 0%

name                                                          allocs/op
PostingsEncodingDecoding/10000/raw/encode-16                    2.00 ± 0%
PostingsEncodingDecoding/10000/raw/decode-16                    2.00 ± 0%
PostingsEncodingDecoding/10000#01/snappy/encode-16              3.00 ± 0%
PostingsEncodingDecoding/10000#01/snappy/decode-16              4.00 ± 0%
PostingsEncodingDecoding/10000#02/snappyStreamed/encode-16      4.00 ± 0%
PostingsEncodingDecoding/10000#02/snappyStreamed/decode-16      5.00 ± 0%
PostingsEncodingDecoding/100000/snappyStreamed/encode-16        4.00 ± 0%
PostingsEncodingDecoding/100000/snappyStreamed/decode-16        5.00 ± 0%
PostingsEncodingDecoding/100000#01/raw/encode-16                2.00 ± 0%
PostingsEncodingDecoding/100000#01/raw/decode-16                2.00 ± 0%
PostingsEncodingDecoding/100000#02/snappy/encode-16             3.00 ± 0%
PostingsEncodingDecoding/100000#02/snappy/decode-16             4.00 ± 0%
PostingsEncodingDecoding/1000000/raw/encode-16                  2.00 ± 0%
PostingsEncodingDecoding/1000000/raw/decode-16                  2.00 ± 0%
PostingsEncodingDecoding/1000000#01/snappy/encode-16            3.00 ± 0%
PostingsEncodingDecoding/1000000#01/snappy/decode-16            4.00 ± 0%
PostingsEncodingDecoding/1000000#02/snappyStreamed/encode-16    4.00 ± 0%
PostingsEncodingDecoding/1000000#02/snappyStreamed/decode-16    5.00 ± 0%
```

Compression ratios are still the same like previously:

```
$ /bin/go test -v -timeout 10m -run ^TestDiffVarintCodec$ github.com/thanos-io/thanos/pkg/store
[snip]
=== RUN   TestDiffVarintCodec/snappy/i!~"2.*"
    postings_codec_test.go:73: postings entries: 944450
    postings_codec_test.go:74: original size (4*entries): 3777800 bytes
    postings_codec_test.go:80: encoded size 44498 bytes
    postings_codec_test.go:81: ratio: 0.012
=== RUN   TestDiffVarintCodec/snappyStreamed/i!~"2.*"
    postings_codec_test.go:73: postings entries: 944450
    postings_codec_test.go:74: original size (4*entries): 3777800 bytes
    postings_codec_test.go:80: encoded size 44670 bytes
    postings_codec_test.go:81: ratio: 0.012
```

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* store: clean up postings code

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* store: fix estimation

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* store: use buffer.Bytes()

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* store/postings_codec: reuse extgrpc compressors/decompressors

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* CHANGELOG: add item

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* CHANGELOG: clean up whitespace

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

---------

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS authored Apr 26, 2023
1 parent 692a21d commit ba82514
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6228](https://github.com/thanos-io/thanos/pull/6228) Conditionally generate debug messages in ProxyStore to avoid memory bloat.
- [#6231](https://github.com/thanos-io/thanos/pull/6231) mixins: Add code/grpc-code dimension to error widgets.
- [#6244](https://github.com/thanos-io/thanos/pull/6244) mixin(Rule): Add rule evaluation failures to the Rule dashboard.
- [#6303](https://github.com/thanos-io/thanos/pull/6303) Store: added and start using streamed snappy encoding for postings list instead of block based one. This leads to constant memory usage during decompression. This approximately halves memory usage when decompressing a postings list in index cache.
- [#6071](https://github.com/thanos-io/thanos/pull/6071) Query Frontend: *breaking :warning:* Add experimental native histogram support for which we updated and aligned with the [Prometheus common](https://github.com/prometheus/common) model, which is used for caching so a cache reset required.

### Removed
Expand Down
20 changes: 19 additions & 1 deletion pkg/extgrpc/snappy/snappy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
// Name is the name registered for the snappy compressor.
const Name = "snappy"

var Compressor *compressor = newCompressor()

func init() {
encoding.RegisterCompressor(newCompressor())
encoding.RegisterCompressor(Compressor)
}

type compressor struct {
Expand Down Expand Up @@ -48,6 +50,12 @@ func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
return writeCloser{wr, &c.writersPool}, nil
}

func (c *compressor) DecompressByteReader(r io.Reader) (io.ByteReader, error) {
dr := c.readersPool.Get().(*snappy.Reader)
dr.Reset(r)
return reader{dr, &c.readersPool}, nil
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
dr := c.readersPool.Get().(*snappy.Reader)
dr.Reset(r)
Expand Down Expand Up @@ -88,3 +96,13 @@ func (r reader) Read(p []byte) (n int, err error) {
}
return n, err
}

func (r reader) ReadByte() (n byte, err error) {
n, err = r.reader.ReadByte()
if err == io.EOF {
r.reader.Reset(nil)
r.pool.Put(r.reader)
}
return n, err

}
6 changes: 3 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2340,9 +2340,9 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
l index.Postings
err error
)
if isDiffVarintSnappyEncodedPostings(b) {
if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) {
s := time.Now()
clPostings, err := diffVarintSnappyDecode(b)
clPostings, err := decodePostings(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
Expand Down Expand Up @@ -2442,7 +2442,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
compressions++
s := time.Now()
bep := newBigEndianPostings(pBytes[4:])
data, err := diffVarintSnappyEncode(bep, bep.length())
data, err := diffVarintSnappyStreamedEncode(bep, bep.length())
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
Expand Down
164 changes: 163 additions & 1 deletion pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package store

import (
"bytes"
"encoding/binary"
"fmt"
"io"
"sync"

"github.com/golang/snappy"
Expand All @@ -13,6 +16,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/index"
extsnappy "github.com/thanos-io/thanos/pkg/extgrpc/snappy"
)

// This file implements encoding and decoding of postings using diff (or delta) + varint
Expand All @@ -25,18 +29,175 @@ import (
// significantly (to about 20% of original), snappy then halves it to ~10% of the original.

const (
codecHeaderSnappy = "dvs" // As in "diff+varint+snappy".
codecHeaderSnappy = "dvs" // As in "diff+varint+snappy".
codecHeaderStreamedSnappy = "dss" // As in "diffvarint+streamed snappy".
)

func decodePostings(input []byte) (closeablePostings, error) {
var df func([]byte) (closeablePostings, error)

switch {
case isDiffVarintSnappyEncodedPostings(input):
df = diffVarintSnappyDecode
case isDiffVarintSnappyStreamedEncodedPostings(input):
df = diffVarintSnappyStreamedDecode
default:
return nil, fmt.Errorf("unrecognize postings format")
}

return df(input)
}

// isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec.
func isDiffVarintSnappyEncodedPostings(input []byte) bool {
return bytes.HasPrefix(input, []byte(codecHeaderSnappy))
}

// isDiffVarintSnappyStreamedEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy streamed codec.
func isDiffVarintSnappyStreamedEncodedPostings(input []byte) bool {
return bytes.HasPrefix(input, []byte(codecHeaderStreamedSnappy))
}

// estimateSnappyStreamSize estimates the number of bytes
// needed for encoding length postings. Note that in reality
// the number of bytes needed could be much bigger if postings
// different by a lot. Practically, stddev=64 is used.
func estimateSnappyStreamSize(length int) int {
// Snappy stream writes data in chunks up to 65536 in size.
// The stream begins with bytes 0xff 0x06 0x00 0x00 's' 'N' 'a' 'P' 'p' 'Y'.
// Our encoded data also needs a header.
// Each encoded (or uncompressed) chunk needs tag (chunk type 1B + chunk len 3B) + checksum 4B.

// Mark for encoded data.
ret := len(codecHeaderStreamedSnappy)
// Magic snappy stream start.
ret += 10

const maxBlockSize = 65536

length = 5 * length / 4 // estimate 1.25B per posting.

blocks := length / maxBlockSize

ret += blocks * snappy.MaxEncodedLen(maxBlockSize)
length -= blocks * maxBlockSize
if length > 0 {
ret += snappy.MaxEncodedLen(length)
}

return ret
}

func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error) {
compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(length)))
if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil {
return nil, fmt.Errorf("writing streamed snappy header")
} else if n != len(codecHeaderStreamedSnappy) {
return nil, fmt.Errorf("short-write streamed snappy header")
}

uvarintEncodeBuf := make([]byte, binary.MaxVarintLen64)

sw, err := extsnappy.Compressor.Compress(compressedBuf)
if err != nil {
return nil, fmt.Errorf("creating snappy compressor: %w", err)
}

prev := storage.SeriesRef(0)
for p.Next() {
v := p.At()
if v < prev {
return nil, errors.Errorf("postings entries must be in increasing order, current: %d, previous: %d", v, prev)
}

uvarintSize := binary.PutUvarint(uvarintEncodeBuf, uint64(v-prev))
if written, err := sw.Write(uvarintEncodeBuf[:uvarintSize]); err != nil {
return nil, errors.Wrap(err, "writing uvarint encoded byte")
} else if written != uvarintSize {
return nil, errors.Wrap(err, "short-write for uvarint encoded byte")
}

prev = v
}
if p.Err() != nil {
return nil, p.Err()
}
if err := sw.Close(); err != nil {
return nil, errors.Wrap(err, "closing snappy stream writer")
}

return compressedBuf.Bytes(), nil
}

func diffVarintSnappyStreamedDecode(input []byte) (closeablePostings, error) {
if !isDiffVarintSnappyStreamedEncodedPostings(input) {
return nil, errors.New("header not found")
}

return newStreamedDiffVarintPostings(input[len(codecHeaderStreamedSnappy):])
}

type streamedDiffVarintPostings struct {
cur storage.SeriesRef

sr io.ByteReader
err error
}

func newStreamedDiffVarintPostings(input []byte) (closeablePostings, error) {
r, err := extsnappy.Compressor.DecompressByteReader(bytes.NewBuffer(input))
if err != nil {
return nil, fmt.Errorf("decompressing snappy postings: %w", err)
}

return &streamedDiffVarintPostings{sr: r}, nil
}

func (it *streamedDiffVarintPostings) close() {
}

func (it *streamedDiffVarintPostings) At() storage.SeriesRef {
return it.cur
}

func (it *streamedDiffVarintPostings) Next() bool {
val, err := binary.ReadUvarint(it.sr)
if err != nil {
if err != io.EOF {
it.err = err
}
return false
}

it.cur = it.cur + storage.SeriesRef(val)
return true
}

func (it *streamedDiffVarintPostings) Err() error {
return it.err
}

func (it *streamedDiffVarintPostings) Seek(x storage.SeriesRef) bool {
if it.cur >= x {
return true
}

// We cannot do any search due to how values are stored,
// so we simply advance until we find the right value.
for it.Next() {
if it.At() >= x {
return true
}
}

return false
}

// diffVarintSnappyEncode encodes postings into diff+varint representation,
// and applies snappy compression on the result.
// Returned byte slice starts with codecHeaderSnappy header.
// Length argument is expected number of postings, used for preallocating buffer.
// TODO(GiedriusS): remove for v1.0.
func diffVarintSnappyEncode(p index.Postings, length int) ([]byte, error) {
buf, err := diffVarintEncodeNoHeader(p, length)
if err != nil {
Expand Down Expand Up @@ -97,6 +258,7 @@ func alias(x, y []byte) bool {
return cap(x) > 0 && cap(y) > 0 && &x[0:cap(x)][cap(x)-1] == &y[0:cap(y)][cap(y)-1]
}

// TODO(GiedriusS): remove for v1.0.
func diffVarintSnappyDecode(input []byte) (closeablePostings, error) {
if !isDiffVarintSnappyEncodedPostings(input) {
return nil, errors.New("header not found")
Expand Down
90 changes: 77 additions & 13 deletions pkg/store/postings_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"math"
"math/rand"
"sort"
"strconv"
"testing"

Expand Down Expand Up @@ -57,8 +58,9 @@ func TestDiffVarintCodec(t *testing.T) {
codingFunction func(index.Postings, int) ([]byte, error)
decodingFunction func([]byte) (closeablePostings, error)
}{
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
"snappyStreamed": {codingFunction: diffVarintSnappyStreamedEncode, decodingFunction: diffVarintSnappyStreamedDecode},
}

for postingName, postings := range postingsMap {
Expand Down Expand Up @@ -194,7 +196,7 @@ func (p *uint64Postings) len() int {
return len(p.vals)
}

func BenchmarkEncodePostings(b *testing.B) {
func BenchmarkPostingsEncodingDecoding(b *testing.B) {
const max = 1000000
r := rand.New(rand.NewSource(0))

Expand All @@ -208,16 +210,78 @@ func BenchmarkEncodePostings(b *testing.B) {
p[ix] = p[ix-1] + storage.SeriesRef(d)
}

codecs := map[string]struct {
codingFunction func(index.Postings, int) ([]byte, error)
decodingFunction func([]byte) (closeablePostings, error)
}{
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
"snappyStreamed": {codingFunction: diffVarintSnappyStreamedEncode, decodingFunction: diffVarintSnappyStreamedDecode},
}

b.ReportAllocs()

for _, count := range []int{10000, 100000, 1000000} {
b.Run(strconv.Itoa(count), func(b *testing.B) {
for i := 0; i < b.N; i++ {
ps := &uint64Postings{vals: p[:count]}

_, err := diffVarintEncodeNoHeader(ps, ps.len())
if err != nil {
b.Fatal(err)
}
}
})
for codecName, codecFns := range codecs {
b.Run(strconv.Itoa(count), func(b *testing.B) {
b.Run(codecName, func(b *testing.B) {
b.Run("encode", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
ps := &uint64Postings{vals: p[:count]}

_, err := codecFns.codingFunction(ps, ps.len())
if err != nil {
b.Fatal(err)
}
}
})
b.Run("decode", func(b *testing.B) {
ps := &uint64Postings{vals: p[:count]}

encoded, err := codecFns.codingFunction(ps, ps.len())
if err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := codecFns.decodingFunction(encoded)
if err != nil {
b.Fatal(err)
}
}
})

})
})
}
}
}

func FuzzSnappyStreamEncoding(f *testing.F) {
f.Add(10, 123)

f.Fuzz(func(t *testing.T, postingsCount, seedInit int) {
if postingsCount <= 0 {
return
}
r := rand.New(rand.NewSource(int64(seedInit)))
p := make([]storage.SeriesRef, postingsCount)

for ix := 1; ix < len(p); ix++ {
d := math.Abs(r.NormFloat64()*math.MaxUint64) + 1

p[ix] = p[ix-1] + storage.SeriesRef(d)
}

sort.Slice(p, func(i, j int) bool {
return p[i] < p[j]
})

ps := &uint64Postings{vals: p}

_, err := diffVarintSnappyStreamedEncode(ps, ps.len())
testutil.Ok(t, err)
})
}

0 comments on commit ba82514

Please sign in to comment.