From 6b4d005c4128457c6e113c0e745c2698fd947355 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 10 Jul 2020 11:38:27 -0700 Subject: [PATCH 1/8] Add a limit to the size of the batches sent over a stream. Currently, there's no limit to how many KVs can be included in a single batch. This leads to issues when the size is over a hard limit. For example, a batch of size > 2GB will cause issues if it has to be sent over gRPC. --- stream.go | 45 ++++++++++++++++++++++++++++++--------- stream_test.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 10 deletions(-) diff --git a/stream.go b/stream.go index 30814e70c..dc7df9e32 100644 --- a/stream.go +++ b/stream.go @@ -32,6 +32,9 @@ import ( const pageSize = 4 << 20 // 4MB +// maxStreamSize +var maxStreamSize = uint64(100 << 20) // 100MB + // Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up // key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key // ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted @@ -248,6 +251,19 @@ func (st *Stream) streamKVs(ctx context.Context) error { defer t.Stop() now := time.Now() + sendBatch := func(batch *pb.KVList) error { + sz := uint64(proto.Size(batch)) + bytesSent += sz + count += len(batch.Kv) + t := time.Now() + if err := st.Send(batch); err != nil { + return err + } + st.db.opt.Infof("%s Created batch of size: %s in %s.\n", + st.LogPrefix, humanize.Bytes(sz), time.Since(t)) + return nil + } + slurp := func(batch *pb.KVList) error { loop: for { @@ -258,20 +274,18 @@ func (st *Stream) streamKVs(ctx context.Context) error { } y.AssertTrue(kvs != nil) batch.Kv = append(batch.Kv, kvs.Kv...) + + // If the size of the batch exceeds maxStreamSize, break from the loop + // to avoid creating a batch that is so big certain limits are reached. + sz := uint64(proto.Size(batch)) + if sz > maxStreamSize { + break loop + } default: break loop } } - sz := uint64(proto.Size(batch)) - bytesSent += sz - count += len(batch.Kv) - t := time.Now() - if err := st.Send(batch); err != nil { - return err - } - st.db.opt.Infof("%s Created batch of size: %s in %s.\n", - st.LogPrefix, humanize.Bytes(sz), time.Since(t)) - return nil + return sendBatch(batch) } outer: @@ -297,6 +311,17 @@ outer: } y.AssertTrue(kvs != nil) batch = kvs + + // Send the batch immediately if it already exceeds the maximum allowed size. + sz := uint64(proto.Size(batch)) + if sz > maxStreamSize { + if err := sendBatch(batch); err != nil { + return err + } + continue + } + + // Otherwise, slurp more keys into this batch. if err := slurp(batch); err != nil { return err } diff --git a/stream_test.go b/stream_test.go index f60e3decb..09381089b 100644 --- a/stream_test.go +++ b/stream_test.go @@ -207,3 +207,60 @@ func TestStreamWithThreadId(t *testing.T) { } require.NoError(t, db.Close()) } + +func TestBigStream(t *testing.T) { + // Set the maxStreamSize to 1MB for the duration of the test so that the it can use a smaller + // dataset than it would otherwise need. + originalMaxStreamSize := maxStreamSize + maxStreamSize = 1 << 20 + defer func() { + maxStreamSize = originalMaxStreamSize + }() + + testSize := int(1e6) + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + + db, err := OpenManaged(DefaultOptions(dir)) + require.NoError(t, err) + + var count int + for _, prefix := range []string{"p0", "p1", "p2"} { + txn := db.NewTransactionAt(math.MaxUint64, true) + for i := 1; i <= testSize; i++ { + require.NoError(t, txn.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i)))) + count++ + if i % 1000 == 0 { + require.NoError(t, txn.CommitAt(5, nil)) + txn = db.NewTransactionAt(math.MaxUint64, true) + } + } + require.NoError(t, txn.CommitAt(5, nil)) + } + + stream := db.NewStreamAt(math.MaxUint64) + stream.LogPrefix = "Testing" + c := &collector{} + stream.Send = func(list *bpb.KVList) error { + return c.Send(list) + } + + // Test case 1. Retrieve everything. + err = stream.Orchestrate(ctxb) + require.NoError(t, err) + require.Equal(t, 3*testSize, len(c.kv), "Expected 30000. Got: %d", len(c.kv)) + + m := make(map[string]int) + for _, kv := range c.kv { + prefix, ki := keyToInt(kv.Key) + expected := value(ki) + require.Equal(t, expected, kv.Value) + m[prefix]++ + } + require.Equal(t, 3, len(m)) + for pred, count := range m { + require.Equal(t, testSize, count, "Count mismatch for pred: %s", pred) + } +} + From 9a5175c1447fa3707f1ae19b2e48b8367f5b3ea2 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 10 Jul 2020 12:08:51 -0700 Subject: [PATCH 2/8] Fix comment. --- stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index dc7df9e32..ef6a403e7 100644 --- a/stream.go +++ b/stream.go @@ -275,8 +275,8 @@ func (st *Stream) streamKVs(ctx context.Context) error { y.AssertTrue(kvs != nil) batch.Kv = append(batch.Kv, kvs.Kv...) - // If the size of the batch exceeds maxStreamSize, break from the loop - // to avoid creating a batch that is so big certain limits are reached. + // If the size of the batch exceeds maxStreamSize, break from the loop to + // avoid creating a batch that is so big that certain limits are reached. sz := uint64(proto.Size(batch)) if sz > maxStreamSize { break loop From fe7fcbd9afbe563351a1867b02d0220e6d44f92f Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 10 Jul 2020 12:15:34 -0700 Subject: [PATCH 3/8] Fix Deepsource warnings. --- stream_test.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/stream_test.go b/stream_test.go index 09381089b..bb9d7cf6a 100644 --- a/stream_test.go +++ b/stream_test.go @@ -77,10 +77,8 @@ func TestStream(t *testing.T) { stream := db.NewStreamAt(math.MaxUint64) stream.LogPrefix = "Testing" c := &collector{} - stream.Send = func(list *bpb.KVList) error { - return c.Send(list) - } - + stream.Send = c.Send + // Test case 1. Retrieve everything. err = stream.Orchestrate(ctxb) require.NoError(t, err) @@ -186,9 +184,7 @@ func TestStreamWithThreadId(t *testing.T) { return stream.ToList(key, itr) } c := &collector{} - stream.Send = func(list *bpb.KVList) error { - return c.Send(list) - } + stream.Send = c.Send err = stream.Orchestrate(ctxb) require.NoError(t, err) @@ -242,9 +238,7 @@ func TestBigStream(t *testing.T) { stream := db.NewStreamAt(math.MaxUint64) stream.LogPrefix = "Testing" c := &collector{} - stream.Send = func(list *bpb.KVList) error { - return c.Send(list) - } + stream.Send = c.Send // Test case 1. Retrieve everything. err = stream.Orchestrate(ctxb) From 4fdf67535814635f9d2880f819a018fd92cf5b93 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 10 Jul 2020 12:37:13 -0700 Subject: [PATCH 4/8] Different prefix for temp dir. --- stream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream_test.go b/stream_test.go index bb9d7cf6a..2f45ca0a1 100644 --- a/stream_test.go +++ b/stream_test.go @@ -214,7 +214,7 @@ func TestBigStream(t *testing.T) { }() testSize := int(1e6) - dir, err := ioutil.TempDir("", "badger-test") + dir, err := ioutil.TempDir("", "badger-big-test") require.NoError(t, err) defer removeDir(dir) From 00b815c8cf3b4f813cda37f58239ccedeb39dacc Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 10 Jul 2020 12:38:51 -0700 Subject: [PATCH 5/8] update comment. --- stream.go | 1 + 1 file changed, 1 insertion(+) diff --git a/stream.go b/stream.go index ef6a403e7..da22a2d9c 100644 --- a/stream.go +++ b/stream.go @@ -313,6 +313,7 @@ outer: batch = kvs // Send the batch immediately if it already exceeds the maximum allowed size. + // Calling slurp on this batch will only increase the size further. sz := uint64(proto.Size(batch)) if sz > maxStreamSize { if err := sendBatch(batch); err != nil { From 1cb590d9ed3d667319dfb1f447815775058b705d Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 10 Jul 2020 15:58:38 -0700 Subject: [PATCH 6/8] Add comment. --- stream.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/stream.go b/stream.go index da22a2d9c..7500064e3 100644 --- a/stream.go +++ b/stream.go @@ -32,7 +32,10 @@ import ( const pageSize = 4 << 20 // 4MB -// maxStreamSize +// maxStreamSize is the maximum allowed size of a stream batch. This is a soft limit +// as a single list that is still over the limit will have to be sent as is since it +// cannot be split further. This limit prevents the framework from creating batches +// so big that sending them causes issues (e.g running into the max size gRPC limit). var maxStreamSize = uint64(100 << 20) // 100MB // Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up From 4722a9519ca3415272076cfd58d11d537579fcae Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Fri, 10 Jul 2020 16:00:40 -0700 Subject: [PATCH 7/8] Close the database at the end of the test. --- stream_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/stream_test.go b/stream_test.go index 2f45ca0a1..5118653d4 100644 --- a/stream_test.go +++ b/stream_test.go @@ -256,5 +256,6 @@ func TestBigStream(t *testing.T) { for pred, count := range m { require.Equal(t, testSize, count, "Count mismatch for pred: %s", pred) } + require.NoError(t, db.Close()) } From 1e5fa40c6f3bdf28551d6a90dc676698943bdb61 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 13 Jul 2020 21:04:50 +0530 Subject: [PATCH 8/8] Address review comments --- stream.go | 24 +++++++----------------- stream_test.go | 13 ++++--------- 2 files changed, 11 insertions(+), 26 deletions(-) diff --git a/stream.go b/stream.go index 7500064e3..eb7341e49 100644 --- a/stream.go +++ b/stream.go @@ -270,6 +270,13 @@ func (st *Stream) streamKVs(ctx context.Context) error { slurp := func(batch *pb.KVList) error { loop: for { + // Send the batch immediately if it already exceeds the maximum allowed size. + // If the size of the batch exceeds maxStreamSize, break from the loop to + // avoid creating a batch that is so big that certain limits are reached. + sz := uint64(proto.Size(batch)) + if sz > maxStreamSize { + break loop + } select { case kvs, ok := <-st.kvChan: if !ok { @@ -277,13 +284,6 @@ func (st *Stream) streamKVs(ctx context.Context) error { } y.AssertTrue(kvs != nil) batch.Kv = append(batch.Kv, kvs.Kv...) - - // If the size of the batch exceeds maxStreamSize, break from the loop to - // avoid creating a batch that is so big that certain limits are reached. - sz := uint64(proto.Size(batch)) - if sz > maxStreamSize { - break loop - } default: break loop } @@ -315,16 +315,6 @@ outer: y.AssertTrue(kvs != nil) batch = kvs - // Send the batch immediately if it already exceeds the maximum allowed size. - // Calling slurp on this batch will only increase the size further. - sz := uint64(proto.Size(batch)) - if sz > maxStreamSize { - if err := sendBatch(batch); err != nil { - return err - } - continue - } - // Otherwise, slurp more keys into this batch. if err := slurp(batch); err != nil { return err diff --git a/stream_test.go b/stream_test.go index 5118653d4..f3c4b1d48 100644 --- a/stream_test.go +++ b/stream_test.go @@ -78,7 +78,7 @@ func TestStream(t *testing.T) { stream.LogPrefix = "Testing" c := &collector{} stream.Send = c.Send - + // Test case 1. Retrieve everything. err = stream.Orchestrate(ctxb) require.NoError(t, err) @@ -222,18 +222,14 @@ func TestBigStream(t *testing.T) { require.NoError(t, err) var count int + wb := db.NewWriteBatchAt(5) for _, prefix := range []string{"p0", "p1", "p2"} { - txn := db.NewTransactionAt(math.MaxUint64, true) for i := 1; i <= testSize; i++ { - require.NoError(t, txn.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i)))) + require.NoError(t, wb.SetEntry(NewEntry(keyWithPrefix(prefix, i), value(i)))) count++ - if i % 1000 == 0 { - require.NoError(t, txn.CommitAt(5, nil)) - txn = db.NewTransactionAt(math.MaxUint64, true) - } } - require.NoError(t, txn.CommitAt(5, nil)) } + require.NoError(t, wb.Flush()) stream := db.NewStreamAt(math.MaxUint64) stream.LogPrefix = "Testing" @@ -258,4 +254,3 @@ func TestBigStream(t *testing.T) { } require.NoError(t, db.Close()) } -