Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Preallocate output buffer when encoding postings. #2812

Merged
merged 3 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1705,7 +1705,8 @@ func (r *bucketIndexReader) fetchPostings(keys []labels.Label) ([]index.Postings
// Errors from corrupted postings will be reported when postings are used.
compressions++
s := time.Now()
data, err := diffVarintSnappyEncode(newBigEndianPostings(pBytes[4:]))
bep := newBigEndianPostings(pBytes[4:])
data, err := diffVarintSnappyEncode(bep, bep.count())
compressionTime = time.Since(s)
if err == nil {
dataToCache = data
Expand Down Expand Up @@ -1803,6 +1804,11 @@ func (it *bigEndianPostings) Err() error {
return nil
}

// Returns number of remaining postings values.
func (it *bigEndianPostings) count() int {
return len(it.list) / 4
}

func (r *bucketIndexReader) PreloadSeries(ids []uint64) error {
// Load series from cache, overwriting the list of ids to preload
// with the missing ones.
Expand Down
19 changes: 19 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package store
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -1922,3 +1923,21 @@ func mustMarshalAny(pb proto.Message) *types.Any {
}
return out
}

func TestBigEndianPostingsCount(t *testing.T) {
const count = 1000
raw := make([]byte, count*4)

for ix := 0; ix < count; ix++ {
binary.BigEndian.PutUint32(raw[4*ix:], rand.Uint32())
}

p := newBigEndianPostings(raw)
testutil.Equals(t, count, p.count())

c := 0
for p.Next() {
c++
}
testutil.Equals(t, count, c)
}
14 changes: 11 additions & 3 deletions pkg/store/postings_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ func isDiffVarintSnappyEncodedPostings(input []byte) bool {
// diffVarintSnappyEncode encodes postings into diff+varint representation,
// and applies snappy compression on the result.
// Returned byte slice starts with codecHeaderSnappy header.
func diffVarintSnappyEncode(p index.Postings) ([]byte, error) {
buf, err := diffVarintEncodeNoHeader(p)
// Values argument is expected number of postings, used for preallocating buffer.
func diffVarintSnappyEncode(p index.Postings, values int) ([]byte, error) {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
buf, err := diffVarintEncodeNoHeader(p, values)
if err != nil {
return nil, err
}
Expand All @@ -52,9 +53,16 @@ func diffVarintSnappyEncode(p index.Postings) ([]byte, error) {

// diffVarintEncodeNoHeader encodes postings into diff+varint representation.
// It doesn't add any header to the output bytes.
func diffVarintEncodeNoHeader(p index.Postings) ([]byte, error) {
// Values argument is expected number of postings, used for preallocating buffer.
func diffVarintEncodeNoHeader(p index.Postings, values int) ([]byte, error) {
buf := encoding.Encbuf{}

// This encoding uses around ~1 bytes per posting, but let's use
// conservative 1.25 bytes per posting to avoid extra allocations.
if values > 0 {
buf.B = make([]byte, 0, 5*values/4)
}

prev := uint64(0)
for p.Next() {
v := p.At()
Expand Down
37 changes: 34 additions & 3 deletions pkg/store/postings_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ package store

import (
"io/ioutil"
"math"
"math/rand"
"os"
"strconv"
"testing"

"github.com/prometheus/prometheus/pkg/labels"
Expand Down Expand Up @@ -49,7 +52,7 @@ func TestDiffVarintCodec(t *testing.T) {
}

codecs := map[string]struct {
codingFunction func(index.Postings) ([]byte, error)
codingFunction func(index.Postings, int) ([]byte, error)
decodingFunction func([]byte) (index.Postings, error)
}{
"raw": {codingFunction: diffVarintEncodeNoHeader, decodingFunction: func(bytes []byte) (index.Postings, error) { return newDiffVarintPostings(bytes), nil }},
Expand All @@ -68,11 +71,11 @@ func TestDiffVarintCodec(t *testing.T) {
t.Log("original size (4*entries):", 4*p.len(), "bytes")
p.reset() // We reuse postings between runs, so we need to reset iterator.

data, err := codec.codingFunction(p)
data, err := codec.codingFunction(p, p.len())
testutil.Ok(t, err)

t.Log("encoded size", len(data), "bytes")
t.Logf("ratio: %0.3f", (float64(len(data)) / float64(4*p.len())))
t.Logf("ratio: %0.3f", float64(len(data))/float64(4*p.len()))

decodedPostings, err := codec.decodingFunction(data)
testutil.Ok(t, err)
Expand Down Expand Up @@ -188,3 +191,31 @@ func (p *uint64Postings) reset() {
func (p *uint64Postings) len() int {
return len(p.vals)
}

func BenchmarkEncodePostings(b *testing.B) {
const max = 1000000
r := rand.New(rand.NewSource(0))

p := make([]uint64, max)

for ix := 1; ix < len(p); ix++ {
// Use normal distribution, with stddev=64 (i.e. most values are < 64).
// This is very rough approximation of experiments with real blocks.v
d := math.Abs(r.NormFloat64()*64) + 1

p[ix] = p[ix-1] + uint64(d)
}

for _, count := range []int{10000, 100000, 1000000} {
b.Run(strconv.Itoa(count), func(b *testing.B) {
for i := 0; i < b.N; i++ {
ps := &uint64Postings{vals: p[:count]}

_, err := diffVarintEncodeNoHeader(ps, ps.len())
if err != nil {
b.Fatal(err)
}
}
})
}
}