diff --git a/CHANGELOG.md b/CHANGELOG.md index cf8dae98be..64c341911d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6228](https://github.com/thanos-io/thanos/pull/6228) Conditionally generate debug messages in ProxyStore to avoid memory bloat. - [#6231](https://github.com/thanos-io/thanos/pull/6231) mixins: Add code/grpc-code dimension to error widgets. - [#6244](https://github.com/thanos-io/thanos/pull/6244) mixin(Rule): Add rule evaluation failures to the Rule dashboard. +- [#6303](https://github.com/thanos-io/thanos/pull/6303) Store: added and start using streamed snappy encoding for postings list instead of block based one. This leads to constant memory usage during decompression. This approximately halves memory usage when decompressing a postings list in index cache. - [#6071](https://github.com/thanos-io/thanos/pull/6071) Query Frontend: *breaking :warning:* Add experimental native histogram support for which we updated and aligned with the [Prometheus common](https://github.com/prometheus/common) model, which is used for caching so a cache reset required. ### Removed diff --git a/pkg/extgrpc/snappy/snappy.go b/pkg/extgrpc/snappy/snappy.go index e2576683bf..45f7cfc369 100644 --- a/pkg/extgrpc/snappy/snappy.go +++ b/pkg/extgrpc/snappy/snappy.go @@ -14,8 +14,10 @@ import ( // Name is the name registered for the snappy compressor. const Name = "snappy" +var Compressor *compressor = newCompressor() + func init() { - encoding.RegisterCompressor(newCompressor()) + encoding.RegisterCompressor(Compressor) } type compressor struct { @@ -48,6 +50,12 @@ func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { return writeCloser{wr, &c.writersPool}, nil } +func (c *compressor) DecompressByteReader(r io.Reader) (io.ByteReader, error) { + dr := c.readersPool.Get().(*snappy.Reader) + dr.Reset(r) + return reader{dr, &c.readersPool}, nil +} + func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { dr := c.readersPool.Get().(*snappy.Reader) dr.Reset(r) @@ -88,3 +96,13 @@ func (r reader) Read(p []byte) (n int, err error) { } return n, err } + +func (r reader) ReadByte() (n byte, err error) { + n, err = r.reader.ReadByte() + if err == io.EOF { + r.reader.Reset(nil) + r.pool.Put(r.reader) + } + return n, err + +} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 05e82607e3..ac4e439ab2 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2340,9 +2340,9 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab l index.Postings err error ) - if isDiffVarintSnappyEncodedPostings(b) { + if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) { s := time.Now() - clPostings, err := diffVarintSnappyDecode(b) + clPostings, err := decodePostings(b) r.stats.cachedPostingsDecompressions += 1 r.stats.CachedPostingsDecompressionTimeSum += time.Since(s) if err != nil { @@ -2442,7 +2442,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab compressions++ s := time.Now() bep := newBigEndianPostings(pBytes[4:]) - data, err := diffVarintSnappyEncode(bep, bep.length()) + data, err := diffVarintSnappyStreamedEncode(bep, bep.length()) compressionTime = time.Since(s) if err == nil { dataToCache = data diff --git a/pkg/store/postings_codec.go b/pkg/store/postings_codec.go index f60fe7c1b2..5e2f0a9cc2 100644 --- a/pkg/store/postings_codec.go +++ b/pkg/store/postings_codec.go @@ -5,6 +5,9 @@ package store import ( "bytes" + "encoding/binary" + "fmt" + "io" "sync" "github.com/golang/snappy" @@ -13,6 +16,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" + extsnappy "github.com/thanos-io/thanos/pkg/extgrpc/snappy" ) // This file implements encoding and decoding of postings using diff (or delta) + varint @@ -25,18 +29,175 @@ import ( // significantly (to about 20% of original), snappy then halves it to ~10% of the original. const ( - codecHeaderSnappy = "dvs" // As in "diff+varint+snappy". + codecHeaderSnappy = "dvs" // As in "diff+varint+snappy". + codecHeaderStreamedSnappy = "dss" // As in "diffvarint+streamed snappy". ) +func decodePostings(input []byte) (closeablePostings, error) { + var df func([]byte) (closeablePostings, error) + + switch { + case isDiffVarintSnappyEncodedPostings(input): + df = diffVarintSnappyDecode + case isDiffVarintSnappyStreamedEncodedPostings(input): + df = diffVarintSnappyStreamedDecode + default: + return nil, fmt.Errorf("unrecognize postings format") + } + + return df(input) +} + // isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec. func isDiffVarintSnappyEncodedPostings(input []byte) bool { return bytes.HasPrefix(input, []byte(codecHeaderSnappy)) } +// isDiffVarintSnappyStreamedEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy streamed codec. +func isDiffVarintSnappyStreamedEncodedPostings(input []byte) bool { + return bytes.HasPrefix(input, []byte(codecHeaderStreamedSnappy)) +} + +// estimateSnappyStreamSize estimates the number of bytes +// needed for encoding length postings. Note that in reality +// the number of bytes needed could be much bigger if postings +// different by a lot. Practically, stddev=64 is used. +func estimateSnappyStreamSize(length int) int { + // Snappy stream writes data in chunks up to 65536 in size. + // The stream begins with bytes 0xff 0x06 0x00 0x00 's' 'N' 'a' 'P' 'p' 'Y'. + // Our encoded data also needs a header. + // Each encoded (or uncompressed) chunk needs tag (chunk type 1B + chunk len 3B) + checksum 4B. + + // Mark for encoded data. + ret := len(codecHeaderStreamedSnappy) + // Magic snappy stream start. + ret += 10 + + const maxBlockSize = 65536 + + length = 5 * length / 4 // estimate 1.25B per posting. + + blocks := length / maxBlockSize + + ret += blocks * snappy.MaxEncodedLen(maxBlockSize) + length -= blocks * maxBlockSize + if length > 0 { + ret += snappy.MaxEncodedLen(length) + } + + return ret +} + +func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error) { + compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(length))) + if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil { + return nil, fmt.Errorf("writing streamed snappy header") + } else if n != len(codecHeaderStreamedSnappy) { + return nil, fmt.Errorf("short-write streamed snappy header") + } + + uvarintEncodeBuf := make([]byte, binary.MaxVarintLen64) + + sw, err := extsnappy.Compressor.Compress(compressedBuf) + if err != nil { + return nil, fmt.Errorf("creating snappy compressor: %w", err) + } + + prev := storage.SeriesRef(0) + for p.Next() { + v := p.At() + if v < prev { + return nil, errors.Errorf("postings entries must be in increasing order, current: %d, previous: %d", v, prev) + } + + uvarintSize := binary.PutUvarint(uvarintEncodeBuf, uint64(v-prev)) + if written, err := sw.Write(uvarintEncodeBuf[:uvarintSize]); err != nil { + return nil, errors.Wrap(err, "writing uvarint encoded byte") + } else if written != uvarintSize { + return nil, errors.Wrap(err, "short-write for uvarint encoded byte") + } + + prev = v + } + if p.Err() != nil { + return nil, p.Err() + } + if err := sw.Close(); err != nil { + return nil, errors.Wrap(err, "closing snappy stream writer") + } + + return compressedBuf.Bytes(), nil +} + +func diffVarintSnappyStreamedDecode(input []byte) (closeablePostings, error) { + if !isDiffVarintSnappyStreamedEncodedPostings(input) { + return nil, errors.New("header not found") + } + + return newStreamedDiffVarintPostings(input[len(codecHeaderStreamedSnappy):]) +} + +type streamedDiffVarintPostings struct { + cur storage.SeriesRef + + sr io.ByteReader + err error +} + +func newStreamedDiffVarintPostings(input []byte) (closeablePostings, error) { + r, err := extsnappy.Compressor.DecompressByteReader(bytes.NewBuffer(input)) + if err != nil { + return nil, fmt.Errorf("decompressing snappy postings: %w", err) + } + + return &streamedDiffVarintPostings{sr: r}, nil +} + +func (it *streamedDiffVarintPostings) close() { +} + +func (it *streamedDiffVarintPostings) At() storage.SeriesRef { + return it.cur +} + +func (it *streamedDiffVarintPostings) Next() bool { + val, err := binary.ReadUvarint(it.sr) + if err != nil { + if err != io.EOF { + it.err = err + } + return false + } + + it.cur = it.cur + storage.SeriesRef(val) + return true +} + +func (it *streamedDiffVarintPostings) Err() error { + return it.err +} + +func (it *streamedDiffVarintPostings) Seek(x storage.SeriesRef) bool { + if it.cur >= x { + return true + } + + // We cannot do any search due to how values are stored, + // so we simply advance until we find the right value. + for it.Next() { + if it.At() >= x { + return true + } + } + + return false +} + // diffVarintSnappyEncode encodes postings into diff+varint representation, // and applies snappy compression on the result. // Returned byte slice starts with codecHeaderSnappy header. // Length argument is expected number of postings, used for preallocating buffer. +// TODO(GiedriusS): remove for v1.0. func diffVarintSnappyEncode(p index.Postings, length int) ([]byte, error) { buf, err := diffVarintEncodeNoHeader(p, length) if err != nil { @@ -97,6 +258,7 @@ func alias(x, y []byte) bool { return cap(x) > 0 && cap(y) > 0 && &x[0:cap(x)][cap(x)-1] == &y[0:cap(y)][cap(y)-1] } +// TODO(GiedriusS): remove for v1.0. func diffVarintSnappyDecode(input []byte) (closeablePostings, error) { if !isDiffVarintSnappyEncodedPostings(input) { return nil, errors.New("header not found") diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index 8ac86008b5..08b985e450 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -7,6 +7,7 @@ import ( "context" "math" "math/rand" + "sort" "strconv" "testing" @@ -57,8 +58,9 @@ func TestDiffVarintCodec(t *testing.T) { codingFunction func(index.Postings, int) ([]byte, error) decodingFunction func([]byte) (closeablePostings, error) }{ - "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }}, - "snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode}, + "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }}, + "snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode}, + "snappyStreamed": {codingFunction: diffVarintSnappyStreamedEncode, decodingFunction: diffVarintSnappyStreamedDecode}, } for postingName, postings := range postingsMap { @@ -194,7 +196,7 @@ func (p *uint64Postings) len() int { return len(p.vals) } -func BenchmarkEncodePostings(b *testing.B) { +func BenchmarkPostingsEncodingDecoding(b *testing.B) { const max = 1000000 r := rand.New(rand.NewSource(0)) @@ -208,16 +210,78 @@ func BenchmarkEncodePostings(b *testing.B) { p[ix] = p[ix-1] + storage.SeriesRef(d) } + codecs := map[string]struct { + codingFunction func(index.Postings, int) ([]byte, error) + decodingFunction func([]byte) (closeablePostings, error) + }{ + "raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }}, + "snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode}, + "snappyStreamed": {codingFunction: diffVarintSnappyStreamedEncode, decodingFunction: diffVarintSnappyStreamedDecode}, + } + + b.ReportAllocs() + for _, count := range []int{10000, 100000, 1000000} { - b.Run(strconv.Itoa(count), func(b *testing.B) { - for i := 0; i < b.N; i++ { - ps := &uint64Postings{vals: p[:count]} - - _, err := diffVarintEncodeNoHeader(ps, ps.len()) - if err != nil { - b.Fatal(err) - } - } - }) + for codecName, codecFns := range codecs { + b.Run(strconv.Itoa(count), func(b *testing.B) { + b.Run(codecName, func(b *testing.B) { + b.Run("encode", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + ps := &uint64Postings{vals: p[:count]} + + _, err := codecFns.codingFunction(ps, ps.len()) + if err != nil { + b.Fatal(err) + } + } + }) + b.Run("decode", func(b *testing.B) { + ps := &uint64Postings{vals: p[:count]} + + encoded, err := codecFns.codingFunction(ps, ps.len()) + if err != nil { + b.Fatal(err) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := codecFns.decodingFunction(encoded) + if err != nil { + b.Fatal(err) + } + } + }) + + }) + }) + } } } + +func FuzzSnappyStreamEncoding(f *testing.F) { + f.Add(10, 123) + + f.Fuzz(func(t *testing.T, postingsCount, seedInit int) { + if postingsCount <= 0 { + return + } + r := rand.New(rand.NewSource(int64(seedInit))) + p := make([]storage.SeriesRef, postingsCount) + + for ix := 1; ix < len(p); ix++ { + d := math.Abs(r.NormFloat64()*math.MaxUint64) + 1 + + p[ix] = p[ix-1] + storage.SeriesRef(d) + } + + sort.Slice(p, func(i, j int) bool { + return p[i] < p[j] + }) + + ps := &uint64Postings{vals: p} + + _, err := diffVarintSnappyStreamedEncode(ps, ps.len()) + testutil.Ok(t, err) + }) +}