diff --git a/levels.go b/levels.go index 9fa761e63..637a3565d 100644 --- a/levels.go +++ b/levels.go @@ -829,26 +829,21 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef, continue } numBuilds++ - fileID := s.reserveFileID() if err := inflightBuilders.Do(); err != nil { // Can't return from here, until I decrRef all the tables that I built so far. break } - go func(builder *table.Builder) { + go func(builder *table.Builder, fileID uint64) { var err error defer inflightBuilders.Done(err) defer builder.Close() - build := func(fileID uint64) (*table.Table, error) { - fname := table.NewFilename(fileID, s.kv.opt.Dir) - return table.CreateTable(fname, builder) - } - var tbl *table.Table if s.kv.opt.InMemory { tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts) } else { - tbl, err = build(fileID) + fname := table.NewFilename(fileID, s.kv.opt.Dir) + tbl, err = table.CreateTable(fname, builder) } // If we couldn't build the table, return fast. @@ -856,7 +851,7 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef, return } res <- tbl - }(builder) + }(builder, s.reserveFileID()) } s.kv.vlog.updateDiscardStats(discardStats) s.kv.opt.Debugf("Discard stats: %v", discardStats) diff --git a/stream.go b/stream.go index 0614f19aa..527895907 100644 --- a/stream.go +++ b/stream.go @@ -127,6 +127,8 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { } kv.Version = item.Version() kv.ExpiresAt = item.ExpiresAt() + // As we do full copy, we need to transmit only if it is a delete key or not. + kv.Meta = []byte{item.meta & bitDelete} kv.UserMeta = a.Copy([]byte{item.UserMeta()}) list.Kv = append(list.Kv, kv) diff --git a/stream_writer.go b/stream_writer.go index 4613718cc..139ade8af 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -316,7 +316,11 @@ func (w *sortedWriter) handleRequests() { for i, e := range req.Entries { // If badger is running in InMemory mode, len(req.Ptrs) == 0. var vs y.ValueStruct - if e.skipVlogAndSetThreshold(w.db.valueThreshold()) { + // Sorted stream writer receives Key-Value (not a pointer to value). So, its upto the + // writer (and not the sender) to determine if the Value goes to vlog or stays in SST + // only. In managed mode, we do not write values to vlog and hence we would not have + // req.Ptrs initialized. + if w.db.opt.managedTxns || e.skipVlogAndSetThreshold(w.db.valueThreshold()) { vs = y.ValueStruct{ Value: e.Value, Meta: e.meta, diff --git a/stream_writer_test.go b/stream_writer_test.go index 48246e4d5..bdc520a91 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -36,7 +36,7 @@ import ( func getSortedKVList(valueSize, listSize int) *z.Buffer { value := make([]byte, valueSize) y.Check2(rand.Read(value)) - buf := z.NewBuffer(10 << 20, "test") + buf := z.NewBuffer(10<<20, "test") for i := 0; i < listSize; i++ { key := make([]byte, 8) binary.BigEndian.PutUint64(key, uint64(i)) @@ -175,7 +175,7 @@ func TestStreamWriter3(t *testing.T) { value := make([]byte, valueSize) y.Check2(rand.Read(value)) counter := 0 - buf := z.NewBuffer(10 << 20, "test") + buf := z.NewBuffer(10<<20, "test") defer buf.Release() for i := 0; i < noOfKeys; i++ { key := make([]byte, 8) @@ -272,7 +272,7 @@ func TestStreamWriter4(t *testing.T) { require.NoError(t, err, "error while updating db") } - buf := z.NewBuffer(10 << 20, "test") + buf := z.NewBuffer(10<<20, "test") defer buf.Release() KVToBuffer(&pb.KV{ Key: []byte("key-1"), @@ -297,7 +297,7 @@ func TestStreamWriter5(t *testing.T) { right[0] = 0xff copy(right[1:], []byte("break")) - buf := z.NewBuffer(10 << 20, "test") + buf := z.NewBuffer(10<<20, "test") defer buf.Release() KVToBuffer(&pb.KV{ Key: left, @@ -336,7 +336,7 @@ func TestStreamWriter6(t *testing.T) { // will be written to level 6, we need to insert at least 1 mb of data. // Setting keycount below 32 would cause this test to fail. keyCount := 40 - buf := z.NewBuffer(10 << 20, "test") + buf := z.NewBuffer(10<<20, "test") defer buf.Release() for i := range str { for j := 0; j < keyCount; j++ { @@ -377,7 +377,7 @@ func TestStreamWriterCancel(t *testing.T) { runBadgerTest(t, &opt, func(t *testing.T, db *DB) { str := []string{"a", "a", "b", "b", "c", "c"} ver := 1 - buf := z.NewBuffer(10 << 20, "test") + buf := z.NewBuffer(10<<20, "test") defer buf.Release() for i := range str { kv := &pb.KV{ @@ -411,7 +411,7 @@ func TestStreamDone(t *testing.T) { var val [10]byte rand.Read(val[:]) for i := 0; i < 10; i++ { - buf := z.NewBuffer(10 << 20, "test") + buf := z.NewBuffer(10<<20, "test") defer buf.Release() kv1 := &pb.KV{ Key: []byte(fmt.Sprintf("%d", i)), @@ -452,7 +452,7 @@ func TestSendOnClosedStream(t *testing.T) { var val [10]byte rand.Read(val[:]) - buf := z.NewBuffer(10 << 20, "test") + buf := z.NewBuffer(10<<20, "test") defer buf.Release() kv1 := &pb.KV{ Key: []byte(fmt.Sprintf("%d", 1)), @@ -475,7 +475,7 @@ func TestSendOnClosedStream(t *testing.T) { require.NoError(t, db.Close()) }() // Send once stream is closed. - buf1 := z.NewBuffer(10 << 20, "test") + buf1 := z.NewBuffer(10<<20, "test") defer buf1.Release() kv1 = &pb.KV{ Key: []byte(fmt.Sprintf("%d", 2)), @@ -502,7 +502,7 @@ func TestSendOnClosedStream2(t *testing.T) { var val [10]byte rand.Read(val[:]) - buf := z.NewBuffer(10 << 20, "test") + buf := z.NewBuffer(10<<20, "test") defer buf.Release() kv1 := &pb.KV{ Key: []byte(fmt.Sprintf("%d", 1)), @@ -549,7 +549,7 @@ func TestStreamWriterEncrypted(t *testing.T) { key := []byte("mykey") value := []byte("myvalue") - buf := z.NewBuffer(10 << 20, "test") + buf := z.NewBuffer(10<<20, "test") defer buf.Release() KVToBuffer(&pb.KV{ Key: key, @@ -578,3 +578,27 @@ func TestStreamWriterEncrypted(t *testing.T) { require.NoError(t, db.Close()) } + +// Test that stream writer does not crashes with large values in managed mode. In managed mode, we +// don't write to value log. +func TestStreamWriterWithLargeValue(t *testing.T) { + opts := DefaultOptions("") + opts.managedTxns = true + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + buf := z.NewBuffer(10<<20, "test") + defer buf.Release() + val := make([]byte, 10<<20) + _, err := rand.Read(val) + require.NoError(t, err) + KVToBuffer(&pb.KV{ + Key: []byte("key"), + Value: val, + Version: 1, + }, buf) + + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + require.NoError(t, sw.Write(buf), "sw.Write() failed") + require.NoError(t, sw.Flush(), "sw.Flush() failed") + }) +}