Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a limit to the size of the batches sent over a stream. #1412

Merged
merged 8 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (

const pageSize = 4 << 20 // 4MB

// maxStreamSize
var maxStreamSize = uint64(100 << 20) // 100MB

// Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up
// key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key
// ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted
Expand Down Expand Up @@ -248,6 +251,19 @@ func (st *Stream) streamKVs(ctx context.Context) error {
defer t.Stop()
now := time.Now()

sendBatch := func(batch *pb.KVList) error {
sz := uint64(proto.Size(batch))
bytesSent += sz
count += len(batch.Kv)
t := time.Now()
if err := st.Send(batch); err != nil {
return err
}
st.db.opt.Infof("%s Created batch of size: %s in %s.\n",
st.LogPrefix, humanize.Bytes(sz), time.Since(t))
return nil
}

slurp := func(batch *pb.KVList) error {
loop:
for {
Expand All @@ -258,20 +274,18 @@ func (st *Stream) streamKVs(ctx context.Context) error {
}
y.AssertTrue(kvs != nil)
batch.Kv = append(batch.Kv, kvs.Kv...)

// If the size of the batch exceeds maxStreamSize, break from the loop
// to avoid creating a batch that is so big certain limits are reached.
sz := uint64(proto.Size(batch))
if sz > maxStreamSize {
break loop
}
default:
break loop
}
}
sz := uint64(proto.Size(batch))
bytesSent += sz
count += len(batch.Kv)
t := time.Now()
if err := st.Send(batch); err != nil {
return err
}
st.db.opt.Infof("%s Created batch of size: %s in %s.\n",
st.LogPrefix, humanize.Bytes(sz), time.Since(t))
return nil
return sendBatch(batch)
}

outer:
Expand All @@ -297,6 +311,17 @@ outer:
}
y.AssertTrue(kvs != nil)
batch = kvs

// Send the batch immediately if it already exceeds the maximum allowed size.
sz := uint64(proto.Size(batch))
if sz > maxStreamSize {
if err := sendBatch(batch); err != nil {
return err
}
continue
}

// Otherwise, slurp more keys into this batch.
if err := slurp(batch); err != nil {
return err
}
Expand Down
57 changes: 57 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,60 @@ func TestStreamWithThreadId(t *testing.T) {
}
require.NoError(t, db.Close())
}

func TestBigStream(t *testing.T) {
// Set the maxStreamSize to 1MB for the duration of the test so that the it can use a smaller
// dataset than it would otherwise need.
originalMaxStreamSize := maxStreamSize
maxStreamSize = 1 << 20
defer func() {
maxStreamSize = originalMaxStreamSize
}()

testSize := int(1e6)
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := OpenManaged(DefaultOptions(dir))
require.NoError(t, err)

var count int
for _, prefix := range []string{"p0", "p1", "p2"} {
txn := db.NewTransactionAt(math.MaxUint64, true)
for i := 1; i <= testSize; i++ {
require.NoError(t, txn.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i))))
count++
if i % 1000 == 0 {
require.NoError(t, txn.CommitAt(5, nil))
txn = db.NewTransactionAt(math.MaxUint64, true)
}
}
require.NoError(t, txn.CommitAt(5, nil))
}

stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = "Testing"
c := &collector{}
stream.Send = func(list *bpb.KVList) error {
return c.Send(list)
}

// Test case 1. Retrieve everything.
err = stream.Orchestrate(ctxb)
require.NoError(t, err)
require.Equal(t, 3*testSize, len(c.kv), "Expected 30000. Got: %d", len(c.kv))

m := make(map[string]int)
for _, kv := range c.kv {
prefix, ki := keyToInt(kv.Key)
expected := value(ki)
require.Equal(t, expected, kv.Value)
m[prefix]++
}
require.Equal(t, 3, len(m))
for pred, count := range m {
require.Equal(t, testSize, count, "Count mismatch for pred: %s", pred)
}
}