Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add streamed snappy encoding for postings list #6303

Merged
merged 8 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6228](https://github.com/thanos-io/thanos/pull/6228) Conditionally generate debug messages in ProxyStore to avoid memory bloat.
- [#6231](https://github.com/thanos-io/thanos/pull/6231) mixins: Add code/grpc-code dimension to error widgets.
- [#6244](https://github.com/thanos-io/thanos/pull/6244) mixin(Rule): Add rule evaluation failures to the Rule dashboard.
- [#6303](https://github.com/thanos-io/thanos/pull/6303) Store: added and start using streamed snappy encoding for postings list instead of block based one. This leads to constant memory usage during decompression. This approximately halves memory usage when decompressing a postings list in index cache.


### Removed


## [v0.31.0](https://github.com/thanos-io/thanos/tree/release-0.31) - 22.03.2023

### Added
Expand Down
20 changes: 19 additions & 1 deletion pkg/extgrpc/snappy/snappy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
// Name is the name registered for the snappy compressor.
const Name = "snappy"

var Compressor *compressor = newCompressor()

func init() {
encoding.RegisterCompressor(newCompressor())
encoding.RegisterCompressor(Compressor)
}

type compressor struct {
Expand Down Expand Up @@ -48,6 +50,12 @@ func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
return writeCloser{wr, &c.writersPool}, nil
}

func (c *compressor) DecompressByteReader(r io.Reader) (io.ByteReader, error) {
dr := c.readersPool.Get().(*snappy.Reader)
dr.Reset(r)
return reader{dr, &c.readersPool}, nil
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
dr := c.readersPool.Get().(*snappy.Reader)
dr.Reset(r)
Expand Down Expand Up @@ -88,3 +96,13 @@ func (r reader) Read(p []byte) (n int, err error) {
}
return n, err
}

func (r reader) ReadByte() (n byte, err error) {
n, err = r.reader.ReadByte()
if err == io.EOF {
r.reader.Reset(nil)
r.pool.Put(r.reader)
}
return n, err

}
6 changes: 3 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2340,9 +2340,9 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
l index.Postings
err error
)
if isDiffVarintSnappyEncodedPostings(b) {
if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) {
s := time.Now()
clPostings, err := diffVarintSnappyDecode(b)
clPostings, err := decodePostings(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
Expand Down Expand Up @@ -2442,7 +2442,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
compressions++
s := time.Now()
bep := newBigEndianPostings(pBytes[4:])
data, err := diffVarintSnappyEncode(bep, bep.length())
data, err := diffVarintSnappyStreamedEncode(bep, bep.length())
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
Expand Down
164 changes: 163 additions & 1 deletion pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package store

import (
"bytes"
"encoding/binary"
"fmt"
"io"
"sync"

"github.com/golang/snappy"
Expand All @@ -13,6 +16,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/index"
extsnappy "github.com/thanos-io/thanos/pkg/extgrpc/snappy"
)

// This file implements encoding and decoding of postings using diff (or delta) + varint
Expand All @@ -25,18 +29,175 @@ import (
// significantly (to about 20% of original), snappy then halves it to ~10% of the original.

const (
codecHeaderSnappy = "dvs" // As in "diff+varint+snappy".
codecHeaderSnappy = "dvs" // As in "diff+varint+snappy".
codecHeaderStreamedSnappy = "dss" // As in "diffvarint+streamed snappy".
)

func decodePostings(input []byte) (closeablePostings, error) {
var df func([]byte) (closeablePostings, error)

switch {
case isDiffVarintSnappyEncodedPostings(input):
df = diffVarintSnappyDecode
case isDiffVarintSnappyStreamedEncodedPostings(input):
df = diffVarintSnappyStreamedDecode
default:
return nil, fmt.Errorf("unrecognize postings format")
}

return df(input)
}

// isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec.
func isDiffVarintSnappyEncodedPostings(input []byte) bool {
return bytes.HasPrefix(input, []byte(codecHeaderSnappy))
}

// isDiffVarintSnappyStreamedEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy streamed codec.
func isDiffVarintSnappyStreamedEncodedPostings(input []byte) bool {
return bytes.HasPrefix(input, []byte(codecHeaderStreamedSnappy))
}

// estimateSnappyStreamSize estimates the number of bytes
// needed for encoding length postings. Note that in reality
// the number of bytes needed could be much bigger if postings
// different by a lot. Practically, stddev=64 is used.
func estimateSnappyStreamSize(length int) int {
// Snappy stream writes data in chunks up to 65536 in size.
// The stream begins with bytes 0xff 0x06 0x00 0x00 's' 'N' 'a' 'P' 'p' 'Y'.
// Our encoded data also needs a header.
// Each encoded (or uncompressed) chunk needs tag (chunk type 1B + chunk len 3B) + checksum 4B.

// Mark for encoded data.
ret := len(codecHeaderStreamedSnappy)
// Magic snappy stream start.
ret += 10

const maxBlockSize = 65536

length = 5 * length / 4 // estimate 1.25B per posting.

blocks := length / maxBlockSize

ret += blocks * snappy.MaxEncodedLen(maxBlockSize)
length -= blocks * maxBlockSize
if length > 0 {
ret += snappy.MaxEncodedLen(length)
}

return ret
}

func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error) {
compressedBuf := bytes.NewBuffer(make([]byte, 0, estimateSnappyStreamSize(length)))
if n, err := compressedBuf.WriteString(codecHeaderStreamedSnappy); err != nil {
return nil, fmt.Errorf("writing streamed snappy header")
} else if n != len(codecHeaderStreamedSnappy) {
return nil, fmt.Errorf("short-write streamed snappy header")
}

uvarintEncodeBuf := make([]byte, binary.MaxVarintLen64)

sw, err := extsnappy.Compressor.Compress(compressedBuf)
if err != nil {
return nil, fmt.Errorf("creating snappy compressor: %w", err)
}

prev := storage.SeriesRef(0)
for p.Next() {
v := p.At()
if v < prev {
return nil, errors.Errorf("postings entries must be in increasing order, current: %d, previous: %d", v, prev)
}

uvarintSize := binary.PutUvarint(uvarintEncodeBuf, uint64(v-prev))
if written, err := sw.Write(uvarintEncodeBuf[:uvarintSize]); err != nil {
return nil, errors.Wrap(err, "writing uvarint encoded byte")
} else if written != uvarintSize {
return nil, errors.Wrap(err, "short-write for uvarint encoded byte")
}

prev = v
}
if p.Err() != nil {
return nil, p.Err()
}
if err := sw.Close(); err != nil {
return nil, errors.Wrap(err, "closing snappy stream writer")
}

return compressedBuf.Bytes(), nil
}

func diffVarintSnappyStreamedDecode(input []byte) (closeablePostings, error) {
if !isDiffVarintSnappyStreamedEncodedPostings(input) {
return nil, errors.New("header not found")
}

return newStreamedDiffVarintPostings(input[len(codecHeaderStreamedSnappy):])
}

type streamedDiffVarintPostings struct {
cur storage.SeriesRef

sr io.ByteReader
err error
}

func newStreamedDiffVarintPostings(input []byte) (closeablePostings, error) {
r, err := extsnappy.Compressor.DecompressByteReader(bytes.NewBuffer(input))
if err != nil {
return nil, fmt.Errorf("decompressing snappy postings: %w", err)
}

return &streamedDiffVarintPostings{sr: r}, nil
}

func (it *streamedDiffVarintPostings) close() {
}

func (it *streamedDiffVarintPostings) At() storage.SeriesRef {
return it.cur
}

func (it *streamedDiffVarintPostings) Next() bool {
val, err := binary.ReadUvarint(it.sr)
if err != nil {
if err != io.EOF {
it.err = err
}
return false
}

it.cur = it.cur + storage.SeriesRef(val)
return true
}

func (it *streamedDiffVarintPostings) Err() error {
return it.err
}

func (it *streamedDiffVarintPostings) Seek(x storage.SeriesRef) bool {
if it.cur >= x {
return true
}

// We cannot do any search due to how values are stored,
// so we simply advance until we find the right value.
for it.Next() {
if it.At() >= x {
return true
}
}

return false
}

// diffVarintSnappyEncode encodes postings into diff+varint representation,
matej-g marked this conversation as resolved.
Show resolved Hide resolved
// and applies snappy compression on the result.
// Returned byte slice starts with codecHeaderSnappy header.
// Length argument is expected number of postings, used for preallocating buffer.
// TODO(GiedriusS): remove for v1.0.
func diffVarintSnappyEncode(p index.Postings, length int) ([]byte, error) {
buf, err := diffVarintEncodeNoHeader(p, length)
if err != nil {
Expand Down Expand Up @@ -97,6 +258,7 @@ func alias(x, y []byte) bool {
return cap(x) > 0 && cap(y) > 0 && &x[0:cap(x)][cap(x)-1] == &y[0:cap(y)][cap(y)-1]
}

// TODO(GiedriusS): remove for v1.0.
func diffVarintSnappyDecode(input []byte) (closeablePostings, error) {
if !isDiffVarintSnappyEncodedPostings(input) {
return nil, errors.New("header not found")
Expand Down
90 changes: 77 additions & 13 deletions pkg/store/postings_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"math"
"math/rand"
"sort"
"strconv"
"testing"

Expand Down Expand Up @@ -57,8 +58,9 @@ func TestDiffVarintCodec(t *testing.T) {
codingFunction func(index.Postings, int) ([]byte, error)
decodingFunction func([]byte) (closeablePostings, error)
}{
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
"snappyStreamed": {codingFunction: diffVarintSnappyStreamedEncode, decodingFunction: diffVarintSnappyStreamedDecode},
}

for postingName, postings := range postingsMap {
Expand Down Expand Up @@ -194,7 +196,7 @@ func (p *uint64Postings) len() int {
return len(p.vals)
}

func BenchmarkEncodePostings(b *testing.B) {
func BenchmarkPostingsEncodingDecoding(b *testing.B) {
const max = 1000000
r := rand.New(rand.NewSource(0))

Expand All @@ -208,16 +210,78 @@ func BenchmarkEncodePostings(b *testing.B) {
p[ix] = p[ix-1] + storage.SeriesRef(d)
}

codecs := map[string]struct {
codingFunction func(index.Postings, int) ([]byte, error)
decodingFunction func([]byte) (closeablePostings, error)
}{
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (closeablePostings, error) { return newDiffVarintPostings(bytes, nil), nil }},
"snappy": {codingFunction: diffVarintSnappyEncode, decodingFunction: diffVarintSnappyDecode},
"snappyStreamed": {codingFunction: diffVarintSnappyStreamedEncode, decodingFunction: diffVarintSnappyStreamedDecode},
}

b.ReportAllocs()

for _, count := range []int{10000, 100000, 1000000} {
b.Run(strconv.Itoa(count), func(b *testing.B) {
for i := 0; i < b.N; i++ {
ps := &uint64Postings{vals: p[:count]}

_, err := diffVarintEncodeNoHeader(ps, ps.len())
if err != nil {
b.Fatal(err)
}
}
})
for codecName, codecFns := range codecs {
b.Run(strconv.Itoa(count), func(b *testing.B) {
b.Run(codecName, func(b *testing.B) {
b.Run("encode", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
ps := &uint64Postings{vals: p[:count]}

_, err := codecFns.codingFunction(ps, ps.len())
if err != nil {
b.Fatal(err)
}
}
})
b.Run("decode", func(b *testing.B) {
ps := &uint64Postings{vals: p[:count]}

encoded, err := codecFns.codingFunction(ps, ps.len())
if err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := codecFns.decodingFunction(encoded)
if err != nil {
b.Fatal(err)
}
}
})

})
})
}
}
}

func FuzzSnappyStreamEncoding(f *testing.F) {
f.Add(10, 123)

f.Fuzz(func(t *testing.T, postingsCount, seedInit int) {
if postingsCount <= 0 {
return
}
r := rand.New(rand.NewSource(int64(seedInit)))
p := make([]storage.SeriesRef, postingsCount)

for ix := 1; ix < len(p); ix++ {
d := math.Abs(r.NormFloat64()*math.MaxUint64) + 1

p[ix] = p[ix-1] + storage.SeriesRef(d)
}

sort.Slice(p, func(i, j int) bool {
return p[i] < p[j]
})

ps := &uint64Postings{vals: p}

_, err := diffVarintSnappyStreamedEncode(ps, ps.len())
testutil.Ok(t, err)
})
}