Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(compaction/stream): bring changes from master #1717

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,34 +829,29 @@ func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
continue
}
numBuilds++
fileID := s.reserveFileID()
if err := inflightBuilders.Do(); err != nil {
// Can't return from here, until I decrRef all the tables that I built so far.
break
}
go func(builder *table.Builder) {
go func(builder *table.Builder, fileID uint64) {
var err error
defer inflightBuilders.Done(err)
defer builder.Close()

build := func(fileID uint64) (*table.Table, error) {
fname := table.NewFilename(fileID, s.kv.opt.Dir)
return table.CreateTable(fname, builder)
}

var tbl *table.Table
if s.kv.opt.InMemory {
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
} else {
tbl, err = build(fileID)
fname := table.NewFilename(fileID, s.kv.opt.Dir)
tbl, err = table.CreateTable(fname, builder)
}

// If we couldn't build the table, return fast.
if err != nil {
return
}
res <- tbl
}(builder)
}(builder, s.reserveFileID())
}
s.kv.vlog.updateDiscardStats(discardStats)
s.kv.opt.Debugf("Discard stats: %v", discardStats)
Expand Down
2 changes: 2 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
}
kv.Version = item.Version()
kv.ExpiresAt = item.ExpiresAt()
// As we do full copy, we need to transmit only if it is a delete key or not.
kv.Meta = []byte{item.meta & bitDelete}
kv.UserMeta = a.Copy([]byte{item.UserMeta()})

list.Kv = append(list.Kv, kv)
Expand Down
6 changes: 5 additions & 1 deletion stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,11 @@ func (w *sortedWriter) handleRequests() {
for i, e := range req.Entries {
// If badger is running in InMemory mode, len(req.Ptrs) == 0.
var vs y.ValueStruct
if e.skipVlogAndSetThreshold(w.db.valueThreshold()) {
// Sorted stream writer receives Key-Value (not a pointer to value). So, its upto the
// writer (and not the sender) to determine if the Value goes to vlog or stays in SST
// only. In managed mode, we do not write values to vlog and hence we would not have
// req.Ptrs initialized.
if w.db.opt.managedTxns || e.skipVlogAndSetThreshold(w.db.valueThreshold()) {
vs = y.ValueStruct{
Value: e.Value,
Meta: e.meta,
Expand Down
46 changes: 35 additions & 11 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func getSortedKVList(valueSize, listSize int) *z.Buffer {
value := make([]byte, valueSize)
y.Check2(rand.Read(value))
buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
for i := 0; i < listSize; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, uint64(i))
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestStreamWriter3(t *testing.T) {
value := make([]byte, valueSize)
y.Check2(rand.Read(value))
counter := 0
buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
for i := 0; i < noOfKeys; i++ {
key := make([]byte, 8)
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestStreamWriter4(t *testing.T) {
require.NoError(t, err, "error while updating db")
}

buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
KVToBuffer(&pb.KV{
Key: []byte("key-1"),
Expand All @@ -297,7 +297,7 @@ func TestStreamWriter5(t *testing.T) {
right[0] = 0xff
copy(right[1:], []byte("break"))

buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
KVToBuffer(&pb.KV{
Key: left,
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestStreamWriter6(t *testing.T) {
// will be written to level 6, we need to insert at least 1 mb of data.
// Setting keycount below 32 would cause this test to fail.
keyCount := 40
buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
for i := range str {
for j := 0; j < keyCount; j++ {
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestStreamWriterCancel(t *testing.T) {
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
str := []string{"a", "a", "b", "b", "c", "c"}
ver := 1
buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
for i := range str {
kv := &pb.KV{
Expand Down Expand Up @@ -411,7 +411,7 @@ func TestStreamDone(t *testing.T) {
var val [10]byte
rand.Read(val[:])
for i := 0; i < 10; i++ {
buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
kv1 := &pb.KV{
Key: []byte(fmt.Sprintf("%d", i)),
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestSendOnClosedStream(t *testing.T) {

var val [10]byte
rand.Read(val[:])
buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
kv1 := &pb.KV{
Key: []byte(fmt.Sprintf("%d", 1)),
Expand All @@ -475,7 +475,7 @@ func TestSendOnClosedStream(t *testing.T) {
require.NoError(t, db.Close())
}()
// Send once stream is closed.
buf1 := z.NewBuffer(10 << 20, "test")
buf1 := z.NewBuffer(10<<20, "test")
defer buf1.Release()
kv1 = &pb.KV{
Key: []byte(fmt.Sprintf("%d", 2)),
Expand All @@ -502,7 +502,7 @@ func TestSendOnClosedStream2(t *testing.T) {

var val [10]byte
rand.Read(val[:])
buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
kv1 := &pb.KV{
Key: []byte(fmt.Sprintf("%d", 1)),
Expand Down Expand Up @@ -549,7 +549,7 @@ func TestStreamWriterEncrypted(t *testing.T) {
key := []byte("mykey")
value := []byte("myvalue")

buf := z.NewBuffer(10 << 20, "test")
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
KVToBuffer(&pb.KV{
Key: key,
Expand Down Expand Up @@ -578,3 +578,27 @@ func TestStreamWriterEncrypted(t *testing.T) {
require.NoError(t, db.Close())

}

// Test that stream writer does not crashes with large values in managed mode. In managed mode, we
// don't write to value log.
func TestStreamWriterWithLargeValue(t *testing.T) {
opts := DefaultOptions("")
opts.managedTxns = true
runBadgerTest(t, &opts, func(t *testing.T, db *DB) {
buf := z.NewBuffer(10<<20, "test")
defer buf.Release()
val := make([]byte, 10<<20)
_, err := rand.Read(val)
require.NoError(t, err)
KVToBuffer(&pb.KV{
Key: []byte("key"),
Value: val,
Version: 1,
}, buf)

sw := db.NewStreamWriter()
require.NoError(t, sw.Prepare(), "sw.Prepare() failed")
require.NoError(t, sw.Write(buf), "sw.Write() failed")
require.NoError(t, sw.Flush(), "sw.Flush() failed")
})
}