Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move checkpoint key to WAL #3444

Merged
merged 9 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,9 +497,6 @@ func printKeys(db *badger.DB) {

// Don't use a switch case here. Because multiple of these can be true. In particular,
// IsSchema can be true alongside IsData.
if pk.IsRaft() {
buf.WriteString("{r}")
}
if pk.IsData() {
buf.WriteString("{d}")
}
Expand Down
6 changes: 6 additions & 0 deletions dgraph/cmd/debug/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ func printRaft(db *badger.DB, store *raftwal.DiskStorage) {
fmt.Printf("Hardstate: %+v\n", hs)
}

if chk, err := store.Checkpoint(); err != nil {
fmt.Printf("Got error while retrieving checkpoint: %v\n", err)
} else {
fmt.Printf("Checkpoint: %d\n", chk)
}

lastIdx, err := store.LastIndex()
if err != nil {
fmt.Printf("Got error while retrieving last index: %v\n", err)
Expand Down
87 changes: 64 additions & 23 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/glog"
"go.etcd.io/etcd/raft"
pb "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/raftpb"
"golang.org/x/net/trace"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage {

_, err = w.FirstIndex()
if err == errNotFound {
ents := make([]pb.Entry, 1)
ents := make([]raftpb.Entry, 1)
x.Check(w.reset(ents))
} else {
x.Check(err)
Expand Down Expand Up @@ -101,6 +102,14 @@ func (w *DiskStorage) HardStateKey() []byte {
return b
}

func (w *DiskStorage) CheckpointKey() []byte {
b := make([]byte, 14)
binary.BigEndian.PutUint64(b[0:8], w.id)
copy(b[8:10], []byte("ck"))
binary.BigEndian.PutUint32(b[10:14], w.gid)
return b
}

func (w *DiskStorage) EntryKey(idx uint64) []byte {
b := make([]byte, 20)
binary.BigEndian.PutUint64(b[0:8], w.id)
Expand Down Expand Up @@ -129,6 +138,38 @@ func (w *DiskStorage) StoreRaftId(id uint64) error {
})
}

func (w *DiskStorage) UpdateCheckpoint(snap *pb.Snapshot) error {
return w.db.Update(func(txn *badger.Txn) error {
data, err := snap.Marshal()
if err != nil {
return err
}
return txn.Set(w.CheckpointKey(), data)
})
}

func (w *DiskStorage) Checkpoint() (uint64, error) {
var applied uint64
err := w.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(w.CheckpointKey())
if err == badger.ErrKeyNotFound {
return nil
}
if err != nil {
return err
}
return item.Value(func(val []byte) error {
var snap pb.Snapshot
if err := snap.Unmarshal(val); err != nil {
return err
}
applied = snap.Index
return nil
})
})
return applied, err
}

// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
Expand All @@ -144,7 +185,7 @@ func (w *DiskStorage) Term(idx uint64) (uint64, error) {
return 0, raft.ErrCompacted
}

var e pb.Entry
var e raftpb.Entry
if _, err := w.seekEntry(&e, idx, false); err == errNotFound {
return 0, raft.ErrUnavailable
} else if err != nil {
Expand All @@ -158,7 +199,7 @@ func (w *DiskStorage) Term(idx uint64) (uint64, error) {

var errNotFound = errors.New("Unable to find raft entry")

func (w *DiskStorage) seekEntry(e *pb.Entry, seekTo uint64, reverse bool) (uint64, error) {
func (w *DiskStorage) seekEntry(e *raftpb.Entry, seekTo uint64, reverse bool) (uint64, error) {
var index uint64
err := w.db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
Expand Down Expand Up @@ -195,7 +236,7 @@ var (
// into the latest Snapshot).
func (w *DiskStorage) FirstIndex() (uint64, error) {
if val, ok := w.cache.Load(snapshotKey); ok {
snap, ok := val.(*pb.Snapshot)
snap, ok := val.(*raftpb.Snapshot)
if ok && !raft.IsEmptySnap(*snap) {
return snap.Metadata.Index + 1, nil
}
Expand Down Expand Up @@ -269,9 +310,9 @@ func (w *DiskStorage) deleteUntil(batch *badger.WriteBatch, until uint64) error
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
func (w *DiskStorage) Snapshot() (snap pb.Snapshot, rerr error) {
func (w *DiskStorage) Snapshot() (snap raftpb.Snapshot, rerr error) {
if val, ok := w.cache.Load(snapshotKey); ok {
snap, ok := val.(*pb.Snapshot)
snap, ok := val.(*raftpb.Snapshot)
if ok && !raft.IsEmptySnap(*snap) {
return *snap, nil
}
Expand All @@ -294,7 +335,7 @@ func (w *DiskStorage) Snapshot() (snap pb.Snapshot, rerr error) {
// setSnapshot would store the snapshot. We can delete all the entries up until the snapshot
// index. But, keep the raft entry at the snapshot index, to make it easier to build the logic; like
// the dummy entry in MemoryStorage.
func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error {
func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s raftpb.Snapshot) error {
if raft.IsEmptySnap(s) {
return nil
}
Expand All @@ -306,7 +347,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error
return err
}

e := pb.Entry{Term: s.Metadata.Term, Index: s.Metadata.Index}
e := raftpb.Entry{Term: s.Metadata.Term, Index: s.Metadata.Index}
data, err = e.Marshal()
if err != nil {
return err
Expand All @@ -330,7 +371,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error
}

// SetHardState saves the current HardState.
func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st pb.HardState) error {
func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st raftpb.HardState) error {
if raft.IsEmptyHardState(st) {
return nil
}
Expand All @@ -342,7 +383,7 @@ func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st pb.HardState) er
}

// reset resets the entries. Used for testing.
func (w *DiskStorage) reset(es []pb.Entry) error {
func (w *DiskStorage) reset(es []raftpb.Entry) error {
w.cache = new(sync.Map) // reset cache.

// Clean out the state.
Expand Down Expand Up @@ -402,7 +443,7 @@ func (w *DiskStorage) deleteFrom(batch *badger.WriteBatch, from uint64) error {
return w.deleteKeys(batch, keys)
}

func (w *DiskStorage) HardState() (hd pb.HardState, rerr error) {
func (w *DiskStorage) HardState() (hd raftpb.HardState, rerr error) {
w.elog.Printf("HardState")
defer w.elog.Printf("Done")
err := w.db.View(func(txn *badger.Txn) error {
Expand All @@ -421,14 +462,14 @@ func (w *DiskStorage) HardState() (hd pb.HardState, rerr error) {
}

// InitialState returns the saved HardState and ConfState information.
func (w *DiskStorage) InitialState() (hs pb.HardState, cs pb.ConfState, err error) {
func (w *DiskStorage) InitialState() (hs raftpb.HardState, cs raftpb.ConfState, err error) {
w.elog.Printf("InitialState")
defer w.elog.Printf("Done")
hs, err = w.HardState()
if err != nil {
return
}
var snap pb.Snapshot
var snap raftpb.Snapshot
snap, err = w.Snapshot()
if err != nil {
return
Expand All @@ -454,15 +495,15 @@ func (w *DiskStorage) NumEntries() (int, error) {
return count, err
}

func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error) {
func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []raftpb.Entry, rerr error) {
err := w.db.View(func(txn *badger.Txn) error {
if hi-lo == 1 { // We only need one entry.
item, err := txn.Get(w.EntryKey(lo))
if err != nil {
return err
}
return item.Value(func(val []byte) error {
var e pb.Entry
var e raftpb.Entry
if err = e.Unmarshal(val); err != nil {
return err
}
Expand All @@ -483,7 +524,7 @@ func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr er
first := true
for itr.Seek(start); itr.Valid(); itr.Next() {
item := itr.Item()
var e pb.Entry
var e raftpb.Entry
if err := item.Value(func(val []byte) error {
return e.Unmarshal(val)
}); err != nil {
Expand Down Expand Up @@ -511,7 +552,7 @@ func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr er
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error) {
func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []raftpb.Entry, rerr error) {
w.elog.Printf("Entries: [%d, %d) maxSize:%d", lo, hi, maxSize)
defer w.elog.Printf("Done")
first, err := w.FirstIndex()
Expand All @@ -533,7 +574,7 @@ func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error
return w.allEntries(lo, hi, maxSize)
}

func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) error {
func (w *DiskStorage) CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte) error {
glog.V(2).Infof("CreateSnapshot i=%d, cs=%+v", i, cs)
first, err := w.FirstIndex()
if err != nil {
Expand All @@ -544,15 +585,15 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er
return raft.ErrSnapOutOfDate
}

var e pb.Entry
var e raftpb.Entry
if _, err := w.seekEntry(&e, i, false); err != nil {
return err
}
if e.Index != i {
return errNotFound
}

var snap pb.Snapshot
var snap raftpb.Snapshot
snap.Metadata.Index = i
snap.Metadata.Term = e.Term
x.AssertTrue(cs != nil)
Expand All @@ -574,7 +615,7 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er
// first, then HardState and Snapshot if they are not empty. If persistent storage supports atomic
// writes then all of them can be written together. Note that when writing an Entry with Index i,
// any previously-persisted entries with Index >= i must be discarded.
func (w *DiskStorage) Save(h pb.HardState, es []pb.Entry, snap pb.Snapshot) error {
func (w *DiskStorage) Save(h raftpb.HardState, es []raftpb.Entry, snap raftpb.Snapshot) error {
batch := w.db.NewWriteBatch()
defer batch.Cancel()

Expand All @@ -591,7 +632,7 @@ func (w *DiskStorage) Save(h pb.HardState, es []pb.Entry, snap pb.Snapshot) erro
}

// Append the new entries to storage.
func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []pb.Entry) error {
func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []raftpb.Entry) error {
if len(entries) == 0 {
return nil
}
Expand Down
Loading