Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a limit to the size of the batches sent over a stream. #1412

Merged
merged 8 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (

const pageSize = 4 << 20 // 4MB

// maxStreamSize
var maxStreamSize = uint64(100 << 20) // 100MB

// Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up
// key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key
// ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted
Expand Down Expand Up @@ -248,6 +251,19 @@ func (st *Stream) streamKVs(ctx context.Context) error {
defer t.Stop()
now := time.Now()

sendBatch := func(batch *pb.KVList) error {
sz := uint64(proto.Size(batch))
bytesSent += sz
count += len(batch.Kv)
t := time.Now()
if err := st.Send(batch); err != nil {
return err
}
st.db.opt.Infof("%s Created batch of size: %s in %s.\n",
st.LogPrefix, humanize.Bytes(sz), time.Since(t))
return nil
}

slurp := func(batch *pb.KVList) error {
loop:
for {
Expand All @@ -258,20 +274,18 @@ func (st *Stream) streamKVs(ctx context.Context) error {
}
y.AssertTrue(kvs != nil)
batch.Kv = append(batch.Kv, kvs.Kv...)

// If the size of the batch exceeds maxStreamSize, break from the loop to
// avoid creating a batch that is so big that certain limits are reached.
sz := uint64(proto.Size(batch))
if sz > maxStreamSize {
break loop
}
default:
break loop
}
}
sz := uint64(proto.Size(batch))
bytesSent += sz
count += len(batch.Kv)
t := time.Now()
if err := st.Send(batch); err != nil {
return err
}
st.db.opt.Infof("%s Created batch of size: %s in %s.\n",
st.LogPrefix, humanize.Bytes(sz), time.Since(t))
return nil
return sendBatch(batch)
}

outer:
Expand All @@ -297,6 +311,18 @@ outer:
}
y.AssertTrue(kvs != nil)
batch = kvs

// Send the batch immediately if it already exceeds the maximum allowed size.
// Calling slurp on this batch will only increase the size further.
sz := uint64(proto.Size(batch))
if sz > maxStreamSize {
if err := sendBatch(batch); err != nil {
return err
}
continue
}

// Otherwise, slurp more keys into this batch.
if err := slurp(batch); err != nil {
return err
}
Expand Down
65 changes: 58 additions & 7 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,8 @@ func TestStream(t *testing.T) {
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Testing"
c := &collector{}
stream.Send = func(list *bpb.KVList) error {
return c.Send(list)
}

stream.Send = c.Send

// Test case 1. Retrieve everything.
err = stream.Orchestrate(ctxb)
require.NoError(t, err)
Expand Down Expand Up @@ -186,9 +184,7 @@ func TestStreamWithThreadId(t *testing.T) {
return stream.ToList(key, itr)
}
c := &collector{}
stream.Send = func(list *bpb.KVList) error {
return c.Send(list)
}
stream.Send = c.Send

err = stream.Orchestrate(ctxb)
require.NoError(t, err)
Expand All @@ -207,3 +203,58 @@ func TestStreamWithThreadId(t *testing.T) {
}
require.NoError(t, db.Close())
}

func TestBigStream(t *testing.T) {
// Set the maxStreamSize to 1MB for the duration of the test so that the it can use a smaller
// dataset than it would otherwise need.
originalMaxStreamSize := maxStreamSize
maxStreamSize = 1 << 20
defer func() {
maxStreamSize = originalMaxStreamSize
}()

testSize := int(1e6)
dir, err := ioutil.TempDir("", "badger-big-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := OpenManaged(DefaultOptions(dir))
require.NoError(t, err)

var count int
for _, prefix := range []string{"p0", "p1", "p2"} {
txn := db.NewTransactionAt(math.MaxUint64, true)
for i := 1; i <= testSize; i++ {
require.NoError(t, txn.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i))))
count++
if i % 1000 == 0 {
require.NoError(t, txn.CommitAt(5, nil))
txn = db.NewTransactionAt(math.MaxUint64, true)
}
}
require.NoError(t, txn.CommitAt(5, nil))
}

stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Testing"
c := &collector{}
stream.Send = c.Send

// Test case 1. Retrieve everything.
err = stream.Orchestrate(ctxb)
require.NoError(t, err)
require.Equal(t, 3*testSize, len(c.kv), "Expected 30000. Got: %d", len(c.kv))

m := make(map[string]int)
for _, kv := range c.kv {
prefix, ki := keyToInt(kv.Key)
expected := value(ki)
require.Equal(t, expected, kv.Value)
m[prefix]++
}
require.Equal(t, 3, len(m))
for pred, count := range m {
require.Equal(t, testSize, count, "Count mismatch for pred: %s", pred)
}
}