From 19bcf6c7892de8ef8908311891344f7409b8a825 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 22 Apr 2019 16:13:35 -0700 Subject: [PATCH 1/8] Start work on parsing WAL. --- dgraph/cmd/debug/run.go | 51 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 7af32335ed2..57019bb7165 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -18,6 +18,7 @@ package debug import ( "bytes" + "encoding/binary" "encoding/hex" "fmt" "io" @@ -48,6 +49,7 @@ type flagOptions struct { predicate string readOnly bool pdir string + wdir string itemMeta bool jepsen string readTs uint64 @@ -76,6 +78,7 @@ func init() { flag.StringVarP(&opt.keyLookup, "lookup", "l", "", "Hex of key to lookup.") flag.BoolVarP(&opt.keyHistory, "history", "y", false, "Show all versions of a key.") flag.StringVarP(&opt.pdir, "postings", "p", "", "Directory where posting lists are stored.") + flag.StringVarP(&opt.wdir, "wal", "w", "", "Directory where Raft write-ahead logs are stored.") flag.BoolVar(&opt.sizeHistogram, "histogram", false, "Show a histogram of the key and value sizes.") } @@ -649,17 +652,55 @@ func sizeHistogram(db *badger.DB) { valueSizeHistogram.PrintHistogram() } +func parseWal(db *badger.DB) error { + printKey := func(buf *bytes.Buffer, key []byte) { + if len(key) < 14 { + fmt.Fprintf(buf, "key = %s\n", hex.Dump(key)) + } + id := binary.BigEndian.Uint64(key[0:8]) + fmt.Fprintf(buf, " id: %d ", id) + switch len(key) { + case 14: + fmt.Fprintf(buf, " %s %d", key[8:10], binary.BigEndian.U + if len(key) < 10 { + } + } + + return db.View(func(txn *badger.Txn) error { + itr := txn.NewIterator(badger.DefaultIteratorOptions) + defer itr.Close() + + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + var buf bytes.Buffer + + } + }) +} + func run() { + dir := opt.pdir + isWal := false + if len(dir) == 0 { + dir = opt.wdir + isWal = true + } bopts := badger.DefaultOptions - bopts.Dir = opt.pdir - bopts.ValueDir = opt.pdir + bopts.Dir = dir + bopts.ValueDir = dir bopts.TableLoadingMode = options.MemoryMap bopts.ReadOnly = opt.readOnly - x.AssertTruef(len(bopts.Dir) > 0, "No posting dir specified.") + x.AssertTruef(len(bopts.Dir) > 0, "No posting or wal dir specified.") fmt.Printf("Opening DB: %s\n", bopts.Dir) - db, err := badger.OpenManaged(bopts) + var db *badger.DB + var err error + if isWal { + db, err = badger.Open(bopts) + } else { + db, err := badger.OpenManaged(bopts) + } x.Check(err) defer db.Close() @@ -667,6 +708,8 @@ func run() { fmt.Printf("Min commit: %d. Max commit: %d, w.r.t %d\n", min, max, opt.readTs) switch { + case isWal: + parseWal(db) case len(opt.keyLookup) > 0: lookup(db) case len(opt.jepsen) > 0: From 5d2dd0d373ab0386818115d28f798ec75accd701 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 24 Apr 2019 18:31:06 -0700 Subject: [PATCH 2/8] Working code to debug the WAL log --- dgraph/cmd/debug/run.go | 161 +++++++++++++++++++++++++++++++++++----- raftwal/storage.go | 4 +- 2 files changed, 146 insertions(+), 19 deletions(-) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 57019bb7165..428626d0bd7 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -24,6 +24,7 @@ import ( "io" "log" "math" + "sort" "strconv" "strings" @@ -32,9 +33,12 @@ import ( "github.com/dgraph-io/dgraph/codec" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/raftwal" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" + humanize "github.com/dustin/go-humanize" "github.com/spf13/cobra" + "go.etcd.io/etcd/raft/raftpb" ) var ( @@ -653,29 +657,143 @@ func sizeHistogram(db *badger.DB) { } func parseWal(db *badger.DB) error { - printKey := func(buf *bytes.Buffer, key []byte) { - if len(key) < 14 { - fmt.Fprintf(buf, "key = %s\n", hex.Dump(key)) - } - id := binary.BigEndian.Uint64(key[0:8]) - fmt.Fprintf(buf, " id: %d ", id) - switch len(key) { - case 14: - fmt.Fprintf(buf, " %s %d", key[8:10], binary.BigEndian.U - if len(key) < 10 { + rids := make(map[uint64]bool) + gids := make(map[uint32]bool) + + parseIds := func(item *badger.Item) { + key := item.Key() + switch { + case len(key) == 14: + // hard state and snapshot key. + rid := binary.BigEndian.Uint64(key[0:8]) + rids[rid] = true + + gid := binary.BigEndian.Uint32(key[10:14]) + gids[gid] = true + case len(key) == 20: + // entry key. + rid := binary.BigEndian.Uint64(key[0:8]) + rids[rid] = true + + gid := binary.BigEndian.Uint32(key[8:12]) + gids[gid] = true } } - return db.View(func(txn *badger.Txn) error { - itr := txn.NewIterator(badger.DefaultIteratorOptions) + err := db.View(func(txn *badger.Txn) error { + opt := badger.DefaultIteratorOptions + opt.PrefetchValues = false + itr := txn.NewIterator(opt) defer itr.Close() for itr.Rewind(); itr.Valid(); itr.Next() { - item := itr.Item() - var buf bytes.Buffer - + parseIds(itr.Item()) } + return nil }) + if err != nil { + return err + } + fmt.Printf("rids: %v\n", rids) + fmt.Printf("gids: %v\n", gids) + + pending := make(map[uint64]bool) + printEntry := func(es raftpb.Entry) { + var buf bytes.Buffer + fmt.Fprintf(&buf, "%d . %d . %v . %-6s . ", es.Term, es.Index, es.Type, + humanize.Bytes(uint64(es.Size()))) + if es.Type == raftpb.EntryConfChange { + fmt.Printf("%s\n", buf.Bytes()) + return + } + var pr pb.Proposal + if err := pr.Unmarshal(es.Data); err != nil { + fmt.Printf("%s Unable to parse Proposal: %v\n", buf.Bytes(), err) + return + } + switch { + case pr.Mutations != nil: + fmt.Fprintf(&buf, "Mutation . StartTs: %d . Edges: %d .", + pr.Mutations.StartTs, len(pr.Mutations.Edges)) + if len(pr.Mutations.Edges) > 0 { + pending[pr.Mutations.StartTs] = true + } + fmt.Fprintf(&buf, " Pending txns: %d .", len(pending)) + case len(pr.Kv) > 0: + fmt.Fprintf(&buf, "KV . Size: %d ", len(pr.Kv)) + case pr.State != nil: + fmt.Fprintf(&buf, "State . %+v ", pr.State) + case pr.Delta != nil: + fmt.Fprintf(&buf, "Delta .") + sort.Slice(pr.Delta.Txns, func(i, j int) bool { + ti := pr.Delta.Txns[i] + tj := pr.Delta.Txns[j] + return ti.StartTs < tj.StartTs + }) + fmt.Fprintf(&buf, " Max: %d .", pr.Delta.GetMaxAssigned()) + for _, txn := range pr.Delta.Txns { + fmt.Fprintf(&buf, " %d → %d .", txn.StartTs, txn.CommitTs) + delete(pending, txn.StartTs) + } + fmt.Fprintf(&buf, " Pending txns: %d .", len(pending)) + case pr.Snapshot != nil: + fmt.Fprintf(&buf, "Snapshot . %+v ", pr.Snapshot) + } + fmt.Printf("%s\n", buf.Bytes()) + } + + printRaft := func(store *raftwal.DiskStorage) { + fmt.Println() + snap, err := store.Snapshot() + if err != nil { + fmt.Printf("Got error while retrieving snapshot: %v\n", err) + } else { + fmt.Printf("Snapshot Metadata: %+v\n", snap.Metadata) + var ds pb.Snapshot + if err := ds.Unmarshal(snap.Data); err != nil { + fmt.Printf("Unable to unmarshal Dgraph snapshot: %v", err) + } else { + fmt.Printf("Snapshot Dgraph: %+v\n", ds) + } + } + fmt.Println() + + if hs, err := store.HardState(); err != nil { + fmt.Printf("Got error while retrieving hardstate: %v\n", err) + } else { + fmt.Printf("Hardstate: %+v\n", hs) + } + + lastIdx, err := store.LastIndex() + if err != nil { + fmt.Printf("Got error while retrieving last index: %v\n", err) + return + } + fmt.Printf("Last Index: %d\n\n", lastIdx) + + startIdx := snap.Metadata.Index + 1 + pending = make(map[uint64]bool) + for startIdx < lastIdx-1 { + entries, err := store.Entries(startIdx, lastIdx, 64<<20) + if err != nil { + fmt.Printf("Got error while retrieving entries: %v\n", err) + return + } + for _, ent := range entries { + printEntry(ent) + startIdx = x.Max(startIdx, ent.Index) + } + } + } + + for rid, _ := range rids { + for gid, _ := range gids { + fmt.Printf("Iterating with Raft Id = %d Groupd Id = %d\n", rid, gid) + store := raftwal.Init(db, rid, gid) + printRaft(store) + } + } + return nil } func run() { @@ -699,17 +817,24 @@ func run() { if isWal { db, err = badger.Open(bopts) } else { - db, err := badger.OpenManaged(bopts) + db, err = badger.OpenManaged(bopts) } x.Check(err) defer db.Close() + if isWal { + if err := parseWal(db); err != nil { + fmt.Printf("\nGot error while parsing WAL: %v\n", err) + } + fmt.Println("Done") + return + } + + // WAL can't execute this following function. min, max := getMinMax(db, opt.readTs) fmt.Printf("Min commit: %d. Max commit: %d, w.r.t %d\n", min, max, opt.readTs) switch { - case isWal: - parseWal(db) case len(opt.keyLookup) > 0: lookup(db) case len(opt.jepsen) > 0: diff --git a/raftwal/storage.go b/raftwal/storage.go index f2ad80f85b3..6a3995da06f 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -44,7 +44,9 @@ type DiskStorage struct { func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage { w := &DiskStorage{db: db, id: id, gid: gid, cache: new(sync.Map)} - x.Check(w.StoreRaftId(id)) + if prev, err := RaftId(db); err != nil || prev != id { + x.Check(w.StoreRaftId(id)) + } w.elog = trace.NewEventLog("Badger", "RaftStorage") snap, err := w.Snapshot() From e123059ef30bca9a553b6d8c615d999d15fef882 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 24 Apr 2019 18:57:10 -0700 Subject: [PATCH 3/8] Also print out number of entries. --- dgraph/cmd/debug/run.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 428626d0bd7..0a09f64ea23 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -769,9 +769,9 @@ func parseWal(db *badger.DB) error { fmt.Printf("Got error while retrieving last index: %v\n", err) return } - fmt.Printf("Last Index: %d\n\n", lastIdx) - startIdx := snap.Metadata.Index + 1 + fmt.Printf("Last Index: %d . Num Entries: %d .\n\n", lastIdx, lastIdx-startIdx) + pending = make(map[uint64]bool) for startIdx < lastIdx-1 { entries, err := store.Entries(startIdx, lastIdx, 64<<20) From 12de53c7c600efa51ba69ad5b0006e4b909fc049 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 24 Apr 2019 19:01:54 -0700 Subject: [PATCH 4/8] Print out mutation proto if there are no edges. --- dgraph/cmd/debug/run.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 0a09f64ea23..4acc190e724 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -717,6 +717,8 @@ func parseWal(db *badger.DB) error { pr.Mutations.StartTs, len(pr.Mutations.Edges)) if len(pr.Mutations.Edges) > 0 { pending[pr.Mutations.StartTs] = true + } else { + fmt.Fprintf(&buf, " Mutation: %+v .", pr.Mutations) } fmt.Fprintf(&buf, " Pending txns: %d .", len(pending)) case len(pr.Kv) > 0: From 2734c13fad4887261ab5501ef2ddee9f595240a5 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 24 Apr 2019 19:14:36 -0700 Subject: [PATCH 5/8] Fix sentence --- dgraph/cmd/debug/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 4acc190e724..6b0f2edce10 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -832,7 +832,7 @@ func run() { return } - // WAL can't execute this following function. + // WAL can't execute the following function. min, max := getMinMax(db, opt.readTs) fmt.Printf("Min commit: %d. Max commit: %d, w.r.t %d\n", min, max, opt.readTs) From 558cf6778fc48da041f44c49b720fec6a1330f8f Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Thu, 25 Apr 2019 15:03:04 -0700 Subject: [PATCH 6/8] Ability to parse Zero proposals as well. --- dgraph/cmd/debug/run.go | 103 ++++++++++++++++++++++++++-------------- 1 file changed, 68 insertions(+), 35 deletions(-) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 6b0f2edce10..6a76a040e4e 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -656,6 +656,61 @@ func sizeHistogram(db *badger.DB) { valueSizeHistogram.PrintHistogram() } +func printAlphaProposal(buf *bytes.Buffer, pr pb.Proposal, pending map[uint64]bool) { + switch { + case pr.Mutations != nil: + fmt.Fprintf(buf, " Mutation . StartTs: %d . Edges: %d .", + pr.Mutations.StartTs, len(pr.Mutations.Edges)) + if len(pr.Mutations.Edges) > 0 { + pending[pr.Mutations.StartTs] = true + } else { + fmt.Fprintf(buf, " Mutation: %+v .", pr.Mutations) + } + fmt.Fprintf(buf, " Pending txns: %d .", len(pending)) + case len(pr.Kv) > 0: + fmt.Fprintf(buf, " KV . Size: %d ", len(pr.Kv)) + case pr.State != nil: + fmt.Fprintf(buf, " State . %+v ", pr.State) + case pr.Delta != nil: + fmt.Fprintf(buf, " Delta .") + sort.Slice(pr.Delta.Txns, func(i, j int) bool { + ti := pr.Delta.Txns[i] + tj := pr.Delta.Txns[j] + return ti.StartTs < tj.StartTs + }) + fmt.Fprintf(buf, " Max: %d .", pr.Delta.GetMaxAssigned()) + for _, txn := range pr.Delta.Txns { + fmt.Fprintf(buf, " %d → %d .", txn.StartTs, txn.CommitTs) + delete(pending, txn.StartTs) + } + fmt.Fprintf(buf, " Pending txns: %d .", len(pending)) + case pr.Snapshot != nil: + fmt.Fprintf(buf, " Snapshot . %+v ", pr.Snapshot) + } +} + +func printZeroProposal(buf *bytes.Buffer, zpr pb.ZeroProposal) { + switch { + case len(zpr.SnapshotTs) > 0: + fmt.Fprintf(buf, " Snapshot: %+v .", zpr.SnapshotTs) + case zpr.Member != nil: + fmt.Fprintf(buf, " Member: %+v .", zpr.Member) + case zpr.Tablet != nil: + fmt.Fprintf(buf, " Tablet: %+v .", zpr.Tablet) + case zpr.MaxLeaseId > 0: + fmt.Fprintf(buf, " MaxLeaseId: %d .", zpr.MaxLeaseId) + case zpr.MaxRaftId > 0: + fmt.Fprintf(buf, " MaxRaftId: %d .", zpr.MaxRaftId) + case zpr.MaxTxnTs > 0: + fmt.Fprintf(buf, " MaxTxnTs: %d .", zpr.MaxTxnTs) + case zpr.Txn != nil: + txn := zpr.Txn + fmt.Fprintf(buf, " Txn %d → %d .", txn.StartTs, txn.CommitTs) + default: + fmt.Fprintf(buf, " Proposal: %+v .", zpr) + } +} + func parseWal(db *badger.DB) error { rids := make(map[uint64]bool) gids := make(map[uint32]bool) @@ -700,47 +755,22 @@ func parseWal(db *badger.DB) error { pending := make(map[uint64]bool) printEntry := func(es raftpb.Entry) { var buf bytes.Buffer - fmt.Fprintf(&buf, "%d . %d . %v . %-6s . ", es.Term, es.Index, es.Type, + fmt.Fprintf(&buf, "%d . %d . %v . %-6s .", es.Term, es.Index, es.Type, humanize.Bytes(uint64(es.Size()))) if es.Type == raftpb.EntryConfChange { fmt.Printf("%s\n", buf.Bytes()) return } var pr pb.Proposal - if err := pr.Unmarshal(es.Data); err != nil { + var zpr pb.ZeroProposal + if err := pr.Unmarshal(es.Data); err == nil { + printAlphaProposal(&buf, pr, pending) + } else if err := zpr.Unmarshal(es.Data); err == nil { + printZeroProposal(&buf, zpr) + } else { fmt.Printf("%s Unable to parse Proposal: %v\n", buf.Bytes(), err) return } - switch { - case pr.Mutations != nil: - fmt.Fprintf(&buf, "Mutation . StartTs: %d . Edges: %d .", - pr.Mutations.StartTs, len(pr.Mutations.Edges)) - if len(pr.Mutations.Edges) > 0 { - pending[pr.Mutations.StartTs] = true - } else { - fmt.Fprintf(&buf, " Mutation: %+v .", pr.Mutations) - } - fmt.Fprintf(&buf, " Pending txns: %d .", len(pending)) - case len(pr.Kv) > 0: - fmt.Fprintf(&buf, "KV . Size: %d ", len(pr.Kv)) - case pr.State != nil: - fmt.Fprintf(&buf, "State . %+v ", pr.State) - case pr.Delta != nil: - fmt.Fprintf(&buf, "Delta .") - sort.Slice(pr.Delta.Txns, func(i, j int) bool { - ti := pr.Delta.Txns[i] - tj := pr.Delta.Txns[j] - return ti.StartTs < tj.StartTs - }) - fmt.Fprintf(&buf, " Max: %d .", pr.Delta.GetMaxAssigned()) - for _, txn := range pr.Delta.Txns { - fmt.Fprintf(&buf, " %d → %d .", txn.StartTs, txn.CommitTs) - delete(pending, txn.StartTs) - } - fmt.Fprintf(&buf, " Pending txns: %d .", len(pending)) - case pr.Snapshot != nil: - fmt.Fprintf(&buf, "Snapshot . %+v ", pr.Snapshot) - } fmt.Printf("%s\n", buf.Bytes()) } @@ -752,10 +782,13 @@ func parseWal(db *badger.DB) error { } else { fmt.Printf("Snapshot Metadata: %+v\n", snap.Metadata) var ds pb.Snapshot - if err := ds.Unmarshal(snap.Data); err != nil { - fmt.Printf("Unable to unmarshal Dgraph snapshot: %v", err) + var ms pb.MembershipState + if err := ds.Unmarshal(snap.Data); err == nil { + fmt.Printf("Snapshot Alpha: %+v\n", ds) + } else if err := ms.Unmarshal(snap.Data); err == nil { + fmt.Printf("Snapshot Zero: %+v\n", ms) } else { - fmt.Printf("Snapshot Dgraph: %+v\n", ds) + fmt.Printf("Unable to unmarshal Dgraph snapshot: %v", err) } } fmt.Println() From e84a814a9404645295d0590ab6fd4263fc38e4f3 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Thu, 25 Apr 2019 15:21:36 -0700 Subject: [PATCH 7/8] Address Martin's comments --- dgraph/cmd/debug/run.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 6a76a040e4e..443405f9d79 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -732,6 +732,8 @@ func parseWal(db *badger.DB) error { gid := binary.BigEndian.Uint32(key[8:12]) gids[gid] = true + default: + // Ignore other keys. } } @@ -809,7 +811,7 @@ func parseWal(db *badger.DB) error { pending = make(map[uint64]bool) for startIdx < lastIdx-1 { - entries, err := store.Entries(startIdx, lastIdx, 64<<20) + entries, err := store.Entries(startIdx, lastIdx, 64<<20 /* 64 MB Max Size */) if err != nil { fmt.Printf("Got error while retrieving entries: %v\n", err) return @@ -862,10 +864,11 @@ func run() { fmt.Printf("\nGot error while parsing WAL: %v\n", err) } fmt.Println("Done") + // WAL can't execute the getMinMax function, so we need to deal with it + // here, instead of in the select case below. return } - // WAL can't execute the following function. min, max := getMinMax(db, opt.readTs) fmt.Printf("Min commit: %d. Max commit: %d, w.r.t %d\n", min, max, opt.readTs) From c155dda0da4d465ee2bfb2000a3fddd4f29b1714 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Thu, 25 Apr 2019 15:31:46 -0700 Subject: [PATCH 8/8] Print membership state better. --- dgraph/cmd/debug/run.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 443405f9d79..34983cc3dab 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -788,7 +788,20 @@ func parseWal(db *badger.DB) error { if err := ds.Unmarshal(snap.Data); err == nil { fmt.Printf("Snapshot Alpha: %+v\n", ds) } else if err := ms.Unmarshal(snap.Data); err == nil { - fmt.Printf("Snapshot Zero: %+v\n", ms) + for gid, group := range ms.GetGroups() { + fmt.Printf("\nGROUP: %d\n", gid) + for _, member := range group.GetMembers() { + fmt.Printf("Member: %+v .\n", member) + } + for _, tablet := range group.GetTablets() { + fmt.Printf("Tablet: %+v .\n", tablet) + } + group.Members = nil + group.Tablets = nil + fmt.Printf("Group: %d %+v .\n", gid, group) + } + ms.Groups = nil + fmt.Printf("\nSnapshot Zero: %+v\n", ms) } else { fmt.Printf("Unable to unmarshal Dgraph snapshot: %v", err) }