Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
store: clean up postings code
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
GiedriusS committed Apr 25, 2023
1 parent 51a984d commit 05c3a43
Showing 2 changed files with 22 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
@@ -2342,7 +2342,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
)
if isDiffVarintSnappyEncodedPostings(b) || isDiffVarintSnappyStreamedEncodedPostings(b) {
s := time.Now()
clPostings, err := getDecodingFunction(b)(b)
clPostings, err := decodePostings(b)
r.stats.cachedPostingsDecompressions += 1
r.stats.CachedPostingsDecompressionTimeSum += time.Since(s)
if err != nil {
53 changes: 21 additions & 32 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ package store
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"sync"

@@ -31,14 +32,19 @@ const (
codecHeaderStreamedSnappy = "dss" // As in "diffvarint+streamed snappy".
)

func getDecodingFunction(input []byte) func([]byte) (closeablePostings, error) {
if isDiffVarintSnappyEncodedPostings(input) {
return diffVarintSnappyDecode
}
if isDiffVarintSnappyStreamedEncodedPostings(input) {
return diffVarintSnappyStreamedDecode
func decodePostings(input []byte) (closeablePostings, error) {
var df func([]byte) (closeablePostings, error)

switch {
case isDiffVarintSnappyEncodedPostings(input):
df = diffVarintSnappyDecode
case isDiffVarintSnappyStreamedEncodedPostings(input):
df = diffVarintSnappyStreamedDecode
default:
return nil, fmt.Errorf("unrecognize postings format")
}
return nil

return df(input)
}

// isDiffVarintSnappyEncodedPostings returns true, if input looks like it has been encoded by diff+varint+snappy codec.
@@ -51,29 +57,6 @@ func isDiffVarintSnappyStreamedEncodedPostings(input []byte) bool {
return bytes.HasPrefix(input, []byte(codecHeaderStreamedSnappy))
}

func writeUvarint(w io.Writer, oneByteSlice []byte, x uint64) error {
for x >= 0x80 {
oneByteSlice[0] = byte(x) | 0x80
n, err := w.Write(oneByteSlice)
if err != nil {
return err
}
if n != 1 {
return io.EOF
}
x >>= 7
}
oneByteSlice[0] = byte(x)
n, err := w.Write(oneByteSlice)
if err != nil {
return err
}
if n != 1 {
return io.EOF
}
return nil
}

var snappyWriterPool, snappyReaderPool sync.Pool

func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error) {
@@ -83,7 +66,7 @@ func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error
compressedBuf := bytes.NewBuffer(out[len(codecHeaderStreamedSnappy):])
var sw *snappy.Writer

oneByteSlice := make([]byte, 1)
uvarintEncodeBuf := make([]byte, binary.MaxVarintLen64)

pooledSW := snappyWriterPool.Get()
if pooledSW == nil {
@@ -104,9 +87,13 @@ func diffVarintSnappyStreamedEncode(p index.Postings, length int) ([]byte, error
return nil, errors.Errorf("postings entries must be in increasing order, current: %d, previous: %d", v, prev)
}

if err := writeUvarint(sw, oneByteSlice, uint64(v-prev)); err != nil {
uvarintSize := binary.PutUvarint(uvarintEncodeBuf, uint64(v-prev))
if written, err := sw.Write(uvarintEncodeBuf[:uvarintSize]); err != nil {
return nil, errors.Wrap(err, "writing uvarint encoded byte")
} else if written != uvarintSize {
return nil, errors.Wrap(err, "short-write for uvarint encoded byte")
}

prev = v
}
if p.Err() != nil {
@@ -193,6 +180,7 @@ func (it *streamedDiffVarintPostings) Seek(x storage.SeriesRef) bool {
// and applies snappy compression on the result.
// Returned byte slice starts with codecHeaderSnappy header.
// Length argument is expected number of postings, used for preallocating buffer.
// TODO(GiedriusS): remove for v1.0.
func diffVarintSnappyEncode(p index.Postings, length int) ([]byte, error) {
buf, err := diffVarintEncodeNoHeader(p, length)
if err != nil {
@@ -253,6 +241,7 @@ func alias(x, y []byte) bool {
return cap(x) > 0 && cap(y) > 0 && &x[0:cap(x)][cap(x)-1] == &y[0:cap(y)][cap(y)-1]
}

// TODO(GiedriusS): remove for v1.0.
func diffVarintSnappyDecode(input []byte) (closeablePostings, error) {
if !isDiffVarintSnappyEncodedPostings(input) {
return nil, errors.New("header not found")

0 comments on commit 05c3a43

Please sign in to comment.