Skip to content

Commit

Permalink
Batch oriented string encoders
Browse files Browse the repository at this point in the history
This commit adds a tsm1 function for encoding a batch of strings into a
provided buffer. The new function also shares the buffer between the
input data and the snappy encoded output, reducing allocations.

The following benchmarks compare the performance of the existing
iterator based encoders, and the new batch oriented encoders using
randomly generated strings.

name                old time/op    new time/op    delta
EncodeStrings/10      2.14µs ± 4%    1.42µs ± 4%   -33.56%  (p=0.000 n=10+10)
EncodeStrings/100     12.7µs ± 3%    10.9µs ± 2%   -14.46%  (p=0.000 n=10+10)
EncodeStrings/1000     132µs ± 2%     114µs ± 2%   -13.88%  (p=0.000 n=10+9)

name                old alloc/op   new alloc/op   delta
EncodeStrings/10        657B ± 0%      704B ± 0%    +7.15%  (p=0.000 n=10+10)
EncodeStrings/100     6.14kB ± 0%    9.47kB ± 0%   +54.14%  (p=0.000 n=10+10)
EncodeStrings/1000    61.4kB ± 0%    90.1kB ± 0%   +46.66%  (p=0.000 n=10+10)

name                old allocs/op  new allocs/op  delta
EncodeStrings/10        3.00 ± 0%      0.00       -100.00%  (p=0.000 n=10+10)
EncodeStrings/100       3.00 ± 0%      1.00 ± 0%   -66.67%  (p=0.000 n=10+10)
EncodeStrings/1000      3.00 ± 0%      1.00 ± 0%   -66.67%  (p=0.000 n=10+10)
  • Loading branch information
e-dard committed Sep 21, 2018
1 parent 5decd99 commit dd13d79
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 3 deletions.
6 changes: 3 additions & 3 deletions tsdb/engine/tsm1/batch_integer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ func dumpBufs(a, b []byte) {
for i := 0; i < longest; i++ {
var as, bs string
if i < len(a) {
as = fmt.Sprintf("%08b", a[i])
as = fmt.Sprintf("%08[1]b (%[1]d)", a[i])
}
if i < len(b) {
bs = fmt.Sprintf("%08b", b[i])
bs = fmt.Sprintf("%08[1]b (%[1]d)", b[i])
}

same := as == bs
Expand All @@ -36,7 +36,7 @@ func dumpBufs(a, b []byte) {

func dumpBuf(b []byte) {
for i, v := range b {
fmt.Printf("%d %08b\n", i, v)
fmt.Printf("%[1]d %08[2]b (%[2]d)\n", i, v)
}
fmt.Println()
}
Expand Down
55 changes: 55 additions & 0 deletions tsdb/engine/tsm1/batch_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,61 @@ var (
errStringBatchDecodeShortBuffer = fmt.Errorf("StringArrayDecodeAll: short buffer")
)

// StringArrayEncodeAll encodes src into b, returning b and any error encountered.
// The returned slice may be of a different length and capactity to b.
//
// Currently only the string compression scheme used snappy.
func StringArrayEncodeAll(src []string, b []byte) ([]byte, error) {
// As a heuristic assume 2 bytes per string (upto 127 byte length).
sz := 2 + (len(src) * 2) // First 2 bytes is assuming empty input (snappy expects 1 byte of data).
if len(b) < sz && cap(b) < sz {
// Not enough capacity, need to grow buffer.
b = append(b[:0], make([]byte, sz-len(b))...)
}
b = b[:sz]

// Shortcut to snappy encoding nothing.
if len(src) == 0 {
b[0] = stringCompressedSnappy << 4
return b[:2], nil
}

n := 0 // Number of bytes written to buffer.
for _, v := range src {
sz := 10 // Maximum needed size for this string.
rem := len(b) - n // Bytes available in b.
if rem < sz && cap(b) >= n+sz {
b = b[:n+sz] // Enough capacity in b to expand.
} else if rem < sz {
b = append(b, make([]byte, sz-rem)...) // Need to grow b.
}
n += binary.PutUvarint(b[n:], uint64(len(v)))
b = append(b[:n], v...)
n += len(v)
}

// Ensure there is room to add the header byte.
if n == len(b) {
b = append(b, byte(stringCompressedSnappy<<4))
} else {
b[n] = byte(stringCompressedSnappy << 4)
}
n++

// Grow b to include the compressed data. That way we don't need to allocate
// a slice only to throw it away.
sz = snappy.MaxEncodedLen(n - 1) // Don't need to consider header
rem := len(b) - n // Bytes available in b.
if rem < sz && cap(b) >= n+sz {
b = b[:n+sz] // Enough capacity in b to just expand.
} else if rem < sz {
b = append(b, make([]byte, sz-rem)...) // Need to grow b.
}
res := snappy.Encode(b[n:], b[:n-1]) // Don't encode header byte.

return b[n-1 : n+len(res)], nil // Include header byte in returned data.
}

func StringArrayDecodeAll(b []byte, dst []string) ([]string, error) {
// First byte stores the encoding type, only have snappy format
// currently so ignore for now.
Expand Down
196 changes: 196 additions & 0 deletions tsdb/engine/tsm1/batch_string_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,169 @@
package tsm1

import (
"bytes"
"fmt"
"math/rand"
"reflect"
"testing"
"testing/quick"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/internal/testutil"
"github.com/influxdata/influxdb/uuid"
)

func TestStringArrayEncodeAll_NoValues(t *testing.T) {
b, err := StringArrayEncodeAll(nil, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

var dec StringDecoder
if err := dec.SetBytes(b); err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}

func TestStringArrayEncodeAll_Single(t *testing.T) {
src := []string{"v1"}
b, err := StringArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

var dec StringDecoder
if dec.SetBytes(b); err != nil {
t.Fatalf("unexpected error creating string decoder: %v", err)
}
if !dec.Next() {
t.Fatalf("unexpected next value: got false, exp true")
}

if src[0] != dec.Read() {
t.Fatalf("unexpected value: got %v, exp %v", dec.Read(), src[0])
}
}

func TestStringArrayEncode_Compare(t *testing.T) {
// generate random values
input := make([]string, 1000)
for i := 0; i < len(input); i++ {
input[i] = uuid.TimeUUID().String()
}

// Example from the paper
s := NewStringEncoder(1000)
for _, v := range input {
s.Write(v)
}
s.Flush()

buf1, err := s.Bytes()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

buf2 := append([]byte("this is some jibberish"), make([]byte, 100, 200)...)
buf2, err = StringArrayEncodeAll(input, buf2)
if err != nil {
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}

result, err := StringArrayDecodeAll(buf2, nil)
if err != nil {
dumpBufs(buf1, buf2)
t.Fatalf("unexpected error: %v\nbuf: %db %x", err, len(buf2), buf2)
}

if got, exp := result, input; !reflect.DeepEqual(got, exp) {
t.Fatalf("got result %v, expected %v", got, exp)
}

// Check that the encoders are byte for byte the same...
if !bytes.Equal(buf1, buf2) {
dumpBufs(buf1, buf2)
t.Fatalf("Raw bytes differ for encoders")
}
}

func TestStringArrayEncodeAll_Multi_Compressed(t *testing.T) {
src := make([]string, 10)
for i := range src {
src[i] = fmt.Sprintf("value %d", i)
}

b, err := StringArrayEncodeAll(src, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if b[0]>>4 != stringCompressedSnappy {
t.Fatalf("unexpected encoding: got %v, exp %v", b[0], stringCompressedSnappy)
}

if exp := 51; len(b) != exp {
t.Fatalf("unexpected length: got %v, exp %v", len(b), exp)
}

var dec StringDecoder
if err := dec.SetBytes(b); err != nil {
t.Fatalf("unexpected erorr creating string decoder: %v", err)
}

for i, v := range src {
if !dec.Next() {
t.Fatalf("unexpected next value: got false, exp true")
}
if v != dec.Read() {
t.Fatalf("unexpected value at pos %d: got %v, exp %v", i, dec.Read(), v)
}
}

if dec.Next() {
t.Fatalf("unexpected next value: got true, exp false")
}
}

func TestStringArrayEncodeAll_Quick(t *testing.T) {
var base []byte
quick.Check(func(values []string) bool {
src := values
if values == nil {
src = []string{}
}

// Retrieve encoded bytes from encoder.
buf, err := StringArrayEncodeAll(src, base)
if err != nil {
t.Fatal(err)
}

// Read values out of decoder.
got := make([]string, 0, len(src))
var dec StringDecoder
if err := dec.SetBytes(buf); err != nil {
t.Fatal(err)
}
for dec.Next() {
if err := dec.Error(); err != nil {
t.Fatal(err)
}
got = append(got, dec.Read())
}

// Verify that input and output values match.
if !reflect.DeepEqual(src, got) {
t.Fatalf("mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", src, got)
}

return true
}, nil)
}

func TestStringArrayDecodeAll_NoValues(t *testing.T) {
enc := NewStringEncoder(1024)
b, err := enc.Bytes()
Expand Down Expand Up @@ -149,6 +303,48 @@ func TestStringArrayDecodeAll_CorruptBytes(t *testing.T) {
}
}

func BenchmarkEncodeStrings(b *testing.B) {
var err error
cases := []int{10, 100, 1000}

for _, n := range cases {
enc := NewStringEncoder(n)
b.Run(fmt.Sprintf("%d", n), func(b *testing.B) {
input := make([]string, n)
for i := 0; i < n; i++ {
input[i] = uuid.TimeUUID().String()
}

b.Run("itr", func(b *testing.B) {
b.ReportAllocs()
enc.Reset()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enc.Reset()
for _, x := range input {
enc.Write(x)
}
enc.Flush()
if bufResult, err = enc.Bytes(); err != nil {
b.Fatal(err)
}
}
})

b.Run("batch", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
if bufResult, err = StringArrayEncodeAll(input, bufResult); err != nil {
b.Fatal(err)
}
}
})

})
}
}

func BenchmarkStringArrayDecodeAll(b *testing.B) {
benchmarks := []struct {
n int
Expand Down

0 comments on commit dd13d79

Please sign in to comment.