diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index fbe3bd81943..46bea49b093 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -497,9 +497,6 @@ func printKeys(db *badger.DB) { // Don't use a switch case here. Because multiple of these can be true. In particular, // IsSchema can be true alongside IsData. - if pk.IsRaft() { - buf.WriteString("{r}") - } if pk.IsData() { buf.WriteString("{d}") } diff --git a/dgraph/cmd/debug/wal.go b/dgraph/cmd/debug/wal.go index 72fe57b3e99..0c55daddefe 100644 --- a/dgraph/cmd/debug/wal.go +++ b/dgraph/cmd/debug/wal.go @@ -91,6 +91,12 @@ func printRaft(db *badger.DB, store *raftwal.DiskStorage) { fmt.Printf("Hardstate: %+v\n", hs) } + if chk, err := store.Checkpoint(); err != nil { + fmt.Printf("Got error while retrieving checkpoint: %v\n", err) + } else { + fmt.Printf("Checkpoint: %d\n", chk) + } + lastIdx, err := store.LastIndex() if err != nil { fmt.Printf("Got error while retrieving last index: %v\n", err) diff --git a/raftwal/storage.go b/raftwal/storage.go index 120bbce8b00..657ca6531e0 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -27,9 +27,10 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/glog" "go.etcd.io/etcd/raft" - pb "go.etcd.io/etcd/raft/raftpb" + "go.etcd.io/etcd/raft/raftpb" "golang.org/x/net/trace" + "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" ) @@ -57,7 +58,7 @@ func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage { _, err = w.FirstIndex() if err == errNotFound { - ents := make([]pb.Entry, 1) + ents := make([]raftpb.Entry, 1) x.Check(w.reset(ents)) } else { x.Check(err) @@ -101,6 +102,14 @@ func (w *DiskStorage) HardStateKey() []byte { return b } +func (w *DiskStorage) CheckpointKey() []byte { + b := make([]byte, 14) + binary.BigEndian.PutUint64(b[0:8], w.id) + copy(b[8:10], []byte("ck")) + binary.BigEndian.PutUint32(b[10:14], w.gid) + return b +} + func (w *DiskStorage) EntryKey(idx uint64) []byte { b := make([]byte, 20) binary.BigEndian.PutUint64(b[0:8], w.id) @@ -129,6 +138,38 @@ func (w *DiskStorage) StoreRaftId(id uint64) error { }) } +func (w *DiskStorage) UpdateCheckpoint(snap *pb.Snapshot) error { + return w.db.Update(func(txn *badger.Txn) error { + data, err := snap.Marshal() + if err != nil { + return err + } + return txn.Set(w.CheckpointKey(), data) + }) +} + +func (w *DiskStorage) Checkpoint() (uint64, error) { + var applied uint64 + err := w.db.View(func(txn *badger.Txn) error { + item, err := txn.Get(w.CheckpointKey()) + if err == badger.ErrKeyNotFound { + return nil + } + if err != nil { + return err + } + return item.Value(func(val []byte) error { + var snap pb.Snapshot + if err := snap.Unmarshal(val); err != nil { + return err + } + applied = snap.Index + return nil + }) + }) + return applied, err +} + // Term returns the term of entry i, which must be in the range // [FirstIndex()-1, LastIndex()]. The term of the entry before // FirstIndex is retained for matching purposes even though the @@ -144,7 +185,7 @@ func (w *DiskStorage) Term(idx uint64) (uint64, error) { return 0, raft.ErrCompacted } - var e pb.Entry + var e raftpb.Entry if _, err := w.seekEntry(&e, idx, false); err == errNotFound { return 0, raft.ErrUnavailable } else if err != nil { @@ -158,7 +199,7 @@ func (w *DiskStorage) Term(idx uint64) (uint64, error) { var errNotFound = errors.New("Unable to find raft entry") -func (w *DiskStorage) seekEntry(e *pb.Entry, seekTo uint64, reverse bool) (uint64, error) { +func (w *DiskStorage) seekEntry(e *raftpb.Entry, seekTo uint64, reverse bool) (uint64, error) { var index uint64 err := w.db.View(func(txn *badger.Txn) error { opt := badger.DefaultIteratorOptions @@ -195,7 +236,7 @@ var ( // into the latest Snapshot). func (w *DiskStorage) FirstIndex() (uint64, error) { if val, ok := w.cache.Load(snapshotKey); ok { - snap, ok := val.(*pb.Snapshot) + snap, ok := val.(*raftpb.Snapshot) if ok && !raft.IsEmptySnap(*snap) { return snap.Metadata.Index + 1, nil } @@ -269,9 +310,9 @@ func (w *DiskStorage) deleteUntil(batch *badger.WriteBatch, until uint64) error // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable, // so raft state machine could know that Storage needs some time to prepare // snapshot and call Snapshot later. -func (w *DiskStorage) Snapshot() (snap pb.Snapshot, rerr error) { +func (w *DiskStorage) Snapshot() (snap raftpb.Snapshot, rerr error) { if val, ok := w.cache.Load(snapshotKey); ok { - snap, ok := val.(*pb.Snapshot) + snap, ok := val.(*raftpb.Snapshot) if ok && !raft.IsEmptySnap(*snap) { return *snap, nil } @@ -294,7 +335,7 @@ func (w *DiskStorage) Snapshot() (snap pb.Snapshot, rerr error) { // setSnapshot would store the snapshot. We can delete all the entries up until the snapshot // index. But, keep the raft entry at the snapshot index, to make it easier to build the logic; like // the dummy entry in MemoryStorage. -func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error { +func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s raftpb.Snapshot) error { if raft.IsEmptySnap(s) { return nil } @@ -306,7 +347,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error return err } - e := pb.Entry{Term: s.Metadata.Term, Index: s.Metadata.Index} + e := raftpb.Entry{Term: s.Metadata.Term, Index: s.Metadata.Index} data, err = e.Marshal() if err != nil { return err @@ -330,7 +371,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error } // SetHardState saves the current HardState. -func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st pb.HardState) error { +func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st raftpb.HardState) error { if raft.IsEmptyHardState(st) { return nil } @@ -342,7 +383,7 @@ func (w *DiskStorage) setHardState(batch *badger.WriteBatch, st pb.HardState) er } // reset resets the entries. Used for testing. -func (w *DiskStorage) reset(es []pb.Entry) error { +func (w *DiskStorage) reset(es []raftpb.Entry) error { w.cache = new(sync.Map) // reset cache. // Clean out the state. @@ -402,7 +443,7 @@ func (w *DiskStorage) deleteFrom(batch *badger.WriteBatch, from uint64) error { return w.deleteKeys(batch, keys) } -func (w *DiskStorage) HardState() (hd pb.HardState, rerr error) { +func (w *DiskStorage) HardState() (hd raftpb.HardState, rerr error) { w.elog.Printf("HardState") defer w.elog.Printf("Done") err := w.db.View(func(txn *badger.Txn) error { @@ -421,14 +462,14 @@ func (w *DiskStorage) HardState() (hd pb.HardState, rerr error) { } // InitialState returns the saved HardState and ConfState information. -func (w *DiskStorage) InitialState() (hs pb.HardState, cs pb.ConfState, err error) { +func (w *DiskStorage) InitialState() (hs raftpb.HardState, cs raftpb.ConfState, err error) { w.elog.Printf("InitialState") defer w.elog.Printf("Done") hs, err = w.HardState() if err != nil { return } - var snap pb.Snapshot + var snap raftpb.Snapshot snap, err = w.Snapshot() if err != nil { return @@ -454,7 +495,7 @@ func (w *DiskStorage) NumEntries() (int, error) { return count, err } -func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error) { +func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []raftpb.Entry, rerr error) { err := w.db.View(func(txn *badger.Txn) error { if hi-lo == 1 { // We only need one entry. item, err := txn.Get(w.EntryKey(lo)) @@ -462,7 +503,7 @@ func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr er return err } return item.Value(func(val []byte) error { - var e pb.Entry + var e raftpb.Entry if err = e.Unmarshal(val); err != nil { return err } @@ -483,7 +524,7 @@ func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr er first := true for itr.Seek(start); itr.Valid(); itr.Next() { item := itr.Item() - var e pb.Entry + var e raftpb.Entry if err := item.Value(func(val []byte) error { return e.Unmarshal(val) }); err != nil { @@ -511,7 +552,7 @@ func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr er // Entries returns a slice of log entries in the range [lo,hi). // MaxSize limits the total size of the log entries returned, but // Entries returns at least one entry if any. -func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error) { +func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []raftpb.Entry, rerr error) { w.elog.Printf("Entries: [%d, %d) maxSize:%d", lo, hi, maxSize) defer w.elog.Printf("Done") first, err := w.FirstIndex() @@ -533,7 +574,7 @@ func (w *DiskStorage) Entries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error return w.allEntries(lo, hi, maxSize) } -func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) error { +func (w *DiskStorage) CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte) error { glog.V(2).Infof("CreateSnapshot i=%d, cs=%+v", i, cs) first, err := w.FirstIndex() if err != nil { @@ -544,7 +585,7 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er return raft.ErrSnapOutOfDate } - var e pb.Entry + var e raftpb.Entry if _, err := w.seekEntry(&e, i, false); err != nil { return err } @@ -552,7 +593,7 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er return errNotFound } - var snap pb.Snapshot + var snap raftpb.Snapshot snap.Metadata.Index = i snap.Metadata.Term = e.Term x.AssertTrue(cs != nil) @@ -574,7 +615,7 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) er // first, then HardState and Snapshot if they are not empty. If persistent storage supports atomic // writes then all of them can be written together. Note that when writing an Entry with Index i, // any previously-persisted entries with Index >= i must be discarded. -func (w *DiskStorage) Save(h pb.HardState, es []pb.Entry, snap pb.Snapshot) error { +func (w *DiskStorage) Save(h raftpb.HardState, es []raftpb.Entry, snap raftpb.Snapshot) error { batch := w.db.NewWriteBatch() defer batch.Cancel() @@ -591,7 +632,7 @@ func (w *DiskStorage) Save(h pb.HardState, es []pb.Entry, snap pb.Snapshot) erro } // Append the new entries to storage. -func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []pb.Entry) error { +func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []raftpb.Entry) error { if len(entries) == 0 { return nil } diff --git a/worker/draft.go b/worker/draft.go index e22b3aed249..8c1ecc62463 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -21,7 +21,6 @@ import ( "encoding/binary" "errors" "fmt" - "math" "sort" "sync" "sync/atomic" @@ -636,7 +635,7 @@ func (n *node) retrieveSnapshot(snap pb.Snapshot) error { } func (n *node) proposeSnapshot(discardN int) error { - snap, err := n.calculateSnapshot(discardN) + snap, err := n.calculateSnapshot(0, discardN) if err != nil { glog.Warningf("Got error while calculating snapshot: %v", err) return err @@ -670,59 +669,24 @@ func (n *node) rampMeter() { } } -func (n *node) findRaftProgress() (uint64, error) { - var applied uint64 - err := pstore.View(func(txn *badger.Txn) error { - item, err := txn.Get(x.RaftKey()) - if err == badger.ErrKeyNotFound { - return nil - } - if err != nil { - return err - } - return item.Value(func(val []byte) error { - var snap pb.Snapshot - if err := snap.Unmarshal(val); err != nil { - return err - } - applied = snap.Index - return nil - }) - }) - return applied, err -} - func (n *node) updateRaftProgress() error { // Both leader and followers can independently update their Raft progress. We don't store // this in Raft WAL. Instead, this is used to just skip over log records that this Alpha // has already applied, to speed up things on a restart. - snap, err := n.calculateSnapshot(10) // 10 is a randomly chosen small number. - if err != nil { - return err - } - if snap == nil { - return nil - } - + // // Let's check what we already have. And only update if the new snap.Index is ahead of the last // stored applied. - applied, err := n.findRaftProgress() + applied, err := n.Store.Checkpoint() if err != nil { return err } - if snap.Index <= applied { - return nil - } - data, err := snap.Marshal() - x.Check(err) - txn := pstore.NewTransactionAt(math.MaxUint64, true) - defer txn.Discard() - - if err := txn.SetWithMeta(x.RaftKey(), data, x.ByteRaft); err != nil { + snap, err := n.calculateSnapshot(applied, 3) // 3 is a randomly chosen small number. + if err != nil || snap == nil || snap.Index <= applied { return err } - if err := txn.CommitAt(1, nil); err != nil { + + if err := n.Store.UpdateCheckpoint(snap); err != nil { return err } glog.V(2).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index) @@ -745,6 +709,14 @@ func (n *node) checkpointAndClose(done chan struct{}) { } if n.AmLeader() { + var calculate bool + if chk, err := n.Store.Checkpoint(); err == nil { + if first, err := n.Store.FirstIndex(); err == nil { + // Save some cycles by only calculating snapshot if the checkpoint has gone + // quite a bit further than the first index. + calculate = chk-first >= uint64(x.WorkerConfig.SnapshotAfter) + } + } // We keep track of the applied index in the p directory. Even if we don't take // snapshot for a while and let the Raft logs grow and restart, we would not have to // run all the log entries, because we can tell Raft.Config to set Applied to that @@ -756,8 +728,10 @@ func (n *node) checkpointAndClose(done chan struct{}) { // We use disk based storage for Raft. So, we're not too concerned about // snapshotting. We just need to do enough, so that we don't have a huge backlog of // entries to process on a restart. - if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil { - x.Errorf("While calculating and proposing snapshot: %v", err) + if calculate { + if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil { + glog.Errorf("While calculating and proposing snapshot: %v", err) + } } go n.abortOldTransactions() } @@ -788,11 +762,11 @@ func (n *node) Run() { go n.checkpointAndClose(done) go n.ReportRaftComms() - applied, err := n.findRaftProgress() + applied, err := n.Store.Checkpoint() if err != nil { glog.Errorf("While trying to find raft progress: %v", err) } else { - glog.Infof("Found Raft progress in p directory: %d", applied) + glog.Infof("Found Raft progress: %d", applied) } var timer x.Timer @@ -1040,7 +1014,7 @@ func (n *node) rollupLists(readTs uint64) error { case posting.BitSchemaPosting, posting.BitCompletePosting, posting.BitEmptyPosting: addTo(item.Key(), item.EstimatedSize()) return false - case x.ByteRaft: + case x.ByteUnused: return false default: return true @@ -1180,6 +1154,7 @@ func (n *node) abortOldTransactions() { // aborted. This way, we still keep all the mutations corresponding to this // start ts in the Raft logs. This is important, because we don't persist // pre-writes to disk in pstore. +// - In simple terms, this means we MUST keep all pending transactions in the Raft logs. // - Find the maximum commit timestamp that we have seen. // That would tell us about the maximum timestamp used to do any commits. This // ts is what we can use for future reads of this snapshot. @@ -1195,8 +1170,13 @@ func (n *node) abortOldTransactions() { // // At i7, min pending start ts = S3, therefore snapshotIdx = i5 - 1 = i4. // At i7, max commit ts = C1, therefore readTs = C1. -func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) { - _, span := otrace.StartSpan(n.ctx, "Calculate.Snapshot") +// +// This function also takes a startIdx, which can be used an optimization to skip over Raft entries. +// This is useful when we already have a previous snapshot checkpoint (all txns have concluded up +// until that last checkpoint) that we can use as a new start point for the snapshot calculation. +func (n *node) calculateSnapshot(startIdx uint64, discardN int) (*pb.Snapshot, error) { + _, span := otrace.StartSpan(n.ctx, "Calculate.Snapshot", + otrace.WithSampler(otrace.AlwaysSample())) defer span.End() // We do not need to block snapshot calculation because of a pending stream. Badger would have @@ -1212,6 +1192,11 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) { return nil, err } span.Annotatef(nil, "First index: %d", first) + if startIdx > first { + // If we're starting from a higher index, set first to that. + first = startIdx + span.Annotatef(nil, "Setting first to: %d", startIdx) + } rsnap, err := n.Store.Snapshot() if err != nil { diff --git a/worker/draft_test.go b/worker/draft_test.go index 59669961dc6..b313c7e08ca 100644 --- a/worker/draft_test.go +++ b/worker/draft_test.go @@ -76,7 +76,7 @@ func TestCalculateSnapshot(t *testing.T) { require.NoError(t, n.Store.Save(raftpb.HardState{}, entries, raftpb.Snapshot{})) n.Applied.SetDoneUntil(5) posting.Oracle().RegisterStartTs(2) - snap, err := n.calculateSnapshot(1) + snap, err := n.calculateSnapshot(0, 1) require.NoError(t, err) require.Equal(t, uint64(5), snap.ReadTs) require.Equal(t, uint64(1), snap.Index) @@ -104,7 +104,7 @@ func TestCalculateSnapshot(t *testing.T) { require.NoError(t, n.Store.Save(raftpb.HardState{}, entries, raftpb.Snapshot{})) n.Applied.SetDoneUntil(8) posting.Oracle().ResetTxns() - snap, err = n.calculateSnapshot(1) + snap, err = n.calculateSnapshot(0, 1) require.NoError(t, err) require.Equal(t, uint64(9), snap.ReadTs) require.Equal(t, uint64(8), snap.Index) @@ -120,7 +120,7 @@ func TestCalculateSnapshot(t *testing.T) { entries = append(entries, getEntryForMutation(9, 11)) require.NoError(t, n.Store.Save(raftpb.HardState{}, entries, raftpb.Snapshot{})) n.Applied.SetDoneUntil(9) - snap, err = n.calculateSnapshot(0) + snap, err = n.calculateSnapshot(0, 0) require.NoError(t, err) require.Nil(t, snap) } diff --git a/x/keys.go b/x/keys.go index c72ed4d640d..46cb6f7fdeb 100644 --- a/x/keys.go +++ b/x/keys.go @@ -45,8 +45,8 @@ const ( // ByteSplit is a constant to specify a given key corresponds to a posting list split // into multiple parts. ByteSplit = byte(0x01) - // ByteRaft is a constant to specify a given key stores RAFT information. - ByteRaft = byte(0xff) + // ByteUnused is a constant to specify keys which need to be discarded. + ByteUnused = byte(0xff) ) func writeAttr(buf []byte, attr string) []byte { @@ -73,13 +73,6 @@ func generateKey(typeByte byte, attr string, totalLen int) []byte { return buf } -func RaftKey() []byte { - buf := make([]byte, 5) - buf[0] = ByteRaft - AssertTrue(4 == copy(buf[1:5], []byte("raft"))) - return buf -} - // SchemaKey returns schema key for given attribute. Schema keys are stored // separately with unique prefix, since we need to iterate over all schema keys. // The structure of a schema key is as follows: @@ -233,11 +226,6 @@ type ParsedKey struct { bytePrefix byte } -// IsRaft returns whether the key is a RAFT key. -func (p ParsedKey) IsRaft() bool { - return p.bytePrefix == ByteRaft -} - // IsData returns whether the key is a data key. func (p ParsedKey) IsData() bool { return p.bytePrefix == DefaultPrefix && p.byteType == ByteData @@ -408,7 +396,7 @@ func Parse(key []byte) *ParsedKey { p := &ParsedKey{} p.bytePrefix = key[0] - if p.bytePrefix == ByteRaft { + if p.bytePrefix == ByteUnused { return p }