From fac972feb849093aa94109a57c978f3fa501b017 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 30 Jun 2020 21:00:38 +0530 Subject: [PATCH] Update head while replaying value log (#1372) Fixes https://github.com/dgraph-io/badger/issues/1363 The head pointer is not updated when we perform replays. The head pointer would be updated only when the replay completes. If badger crashes between the point when replay started and replay finished, we would end up replaying all the value log files. This PR fixes this issue. (cherry picked from commit 509de73a8ad1c1b8ea62ffc0cef944716435ec25) --- db.go | 10 ++++++++-- value_test.go | 13 ++++++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/db.go b/db.go index 9fca7a1fc..fc908dbf5 100644 --- a/db.go +++ b/db.go @@ -136,6 +136,10 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { } else { nv = vp.Encode() meta = meta | bitValuePointer + // Update vhead. If the crash happens while replay was in progess + // and the head is not updated, we will end up replaying all the + // files again. + db.updateHead([]valuePointer{vp}) } v := y.ValueStruct{ @@ -638,6 +642,8 @@ func (db *DB) get(key []byte) (y.ValueStruct, error) { return db.lc.get(key, maxVs, 0) } +// updateHead should not be called without the db.Lock() since db.vhead is used +// by the writer go routines and memtable flushing goroutine. func (db *DB) updateHead(ptrs []valuePointer) { var ptr valuePointer for i := len(ptrs) - 1; i >= 0; i-- { @@ -651,8 +657,6 @@ func (db *DB) updateHead(ptrs []valuePointer) { return } - db.Lock() - defer db.Unlock() y.AssertTrue(!ptr.Less(db.vhead)) db.vhead = ptr } @@ -751,7 +755,9 @@ func (db *DB) writeRequests(reqs []*request) error { done(err) return errors.Wrap(err, "writeRequests") } + db.Lock() db.updateHead(b.Ptrs) + db.Unlock() } done(nil) db.opt.Debugf("%d entries written", count) diff --git a/value_test.go b/value_test.go index 8c3c67a8a..de537837f 100644 --- a/value_test.go +++ b/value_test.go @@ -370,7 +370,6 @@ func TestValueGC4(t *testing.T) { kv, err := Open(opt) require.NoError(t, err) - defer kv.Close() sz := 128 << 10 // 5 entries per value log file. txn := kv.NewTransaction(true) @@ -409,11 +408,9 @@ func TestValueGC4(t *testing.T) { kv.vlog.rewrite(lf0, tr) kv.vlog.rewrite(lf1, tr) - err = kv.vlog.Close() - require.NoError(t, err) + require.NoError(t, kv.Close()) - kv.vlog.init(kv) - err = kv.vlog.open(kv, valuePointer{Fid: 2}, kv.replayFunction()) + kv, err = Open(opt) require.NoError(t, err) for i := 0; i < 8; i++ { @@ -435,6 +432,7 @@ func TestValueGC4(t *testing.T) { return nil })) } + require.NoError(t, kv.Close()) } func TestPersistLFDiscardStats(t *testing.T) { @@ -644,6 +642,11 @@ func TestPartialAppendToValueLog(t *testing.T) { // Replay value log from beginning, badger head is past k2. require.NoError(t, kv.vlog.Close()) + // clean up the current db.vhead so that we can replay from the beginning. + // If we don't clear the current vhead, badger will error out since new + // head passed while opening vlog is zero in the following lines. + kv.vhead = valuePointer{} + kv.vlog.init(kv) require.NoError( t, kv.vlog.open(kv, valuePointer{Fid: 0}, kv.replayFunction()),