Skip to content

Commit

Permalink
Diagnose Raft WAL via debug tool (#3319)
Browse files Browse the repository at this point in the history
This PR adds a way to print out the Raft WAL for both Alpha and Zero, so we can get a better understanding of how data is flowing through the system and diagnose issues.
  • Loading branch information
manishrjain authored Apr 26, 2019
1 parent eb0a4c4 commit 562a385
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 5 deletions.
227 changes: 223 additions & 4 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package debug

import (
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"log"
"math"
"sort"
"strconv"
"strings"

Expand All @@ -31,9 +33,12 @@ import (
"github.com/dgraph-io/dgraph/codec"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
humanize "github.com/dustin/go-humanize"
"github.com/spf13/cobra"
"go.etcd.io/etcd/raft/raftpb"
)

var (
Expand All @@ -48,6 +53,7 @@ type flagOptions struct {
predicate string
readOnly bool
pdir string
wdir string
itemMeta bool
jepsen string
readTs uint64
Expand Down Expand Up @@ -76,6 +82,7 @@ func init() {
flag.StringVarP(&opt.keyLookup, "lookup", "l", "", "Hex of key to lookup.")
flag.BoolVarP(&opt.keyHistory, "history", "y", false, "Show all versions of a key.")
flag.StringVarP(&opt.pdir, "postings", "p", "", "Directory where posting lists are stored.")
flag.StringVarP(&opt.wdir, "wal", "w", "", "Directory where Raft write-ahead logs are stored.")
flag.BoolVar(&opt.sizeHistogram, "histogram", false,
"Show a histogram of the key and value sizes.")
}
Expand Down Expand Up @@ -657,20 +664,232 @@ func sizeHistogram(db *badger.DB) {
valueSizeHistogram.PrintHistogram()
}

func printAlphaProposal(buf *bytes.Buffer, pr pb.Proposal, pending map[uint64]bool) {
switch {
case pr.Mutations != nil:
fmt.Fprintf(buf, " Mutation . StartTs: %d . Edges: %d .",
pr.Mutations.StartTs, len(pr.Mutations.Edges))
if len(pr.Mutations.Edges) > 0 {
pending[pr.Mutations.StartTs] = true
} else {
fmt.Fprintf(buf, " Mutation: %+v .", pr.Mutations)
}
fmt.Fprintf(buf, " Pending txns: %d .", len(pending))
case len(pr.Kv) > 0:
fmt.Fprintf(buf, " KV . Size: %d ", len(pr.Kv))
case pr.State != nil:
fmt.Fprintf(buf, " State . %+v ", pr.State)
case pr.Delta != nil:
fmt.Fprintf(buf, " Delta .")
sort.Slice(pr.Delta.Txns, func(i, j int) bool {
ti := pr.Delta.Txns[i]
tj := pr.Delta.Txns[j]
return ti.StartTs < tj.StartTs
})
fmt.Fprintf(buf, " Max: %d .", pr.Delta.GetMaxAssigned())
for _, txn := range pr.Delta.Txns {
fmt.Fprintf(buf, " %d → %d .", txn.StartTs, txn.CommitTs)
delete(pending, txn.StartTs)
}
fmt.Fprintf(buf, " Pending txns: %d .", len(pending))
case pr.Snapshot != nil:
fmt.Fprintf(buf, " Snapshot . %+v ", pr.Snapshot)
}
}

func printZeroProposal(buf *bytes.Buffer, zpr pb.ZeroProposal) {
switch {
case len(zpr.SnapshotTs) > 0:
fmt.Fprintf(buf, " Snapshot: %+v .", zpr.SnapshotTs)
case zpr.Member != nil:
fmt.Fprintf(buf, " Member: %+v .", zpr.Member)
case zpr.Tablet != nil:
fmt.Fprintf(buf, " Tablet: %+v .", zpr.Tablet)
case zpr.MaxLeaseId > 0:
fmt.Fprintf(buf, " MaxLeaseId: %d .", zpr.MaxLeaseId)
case zpr.MaxRaftId > 0:
fmt.Fprintf(buf, " MaxRaftId: %d .", zpr.MaxRaftId)
case zpr.MaxTxnTs > 0:
fmt.Fprintf(buf, " MaxTxnTs: %d .", zpr.MaxTxnTs)
case zpr.Txn != nil:
txn := zpr.Txn
fmt.Fprintf(buf, " Txn %d → %d .", txn.StartTs, txn.CommitTs)
default:
fmt.Fprintf(buf, " Proposal: %+v .", zpr)
}
}

func parseWal(db *badger.DB) error {
rids := make(map[uint64]bool)
gids := make(map[uint32]bool)

parseIds := func(item *badger.Item) {
key := item.Key()
switch {
case len(key) == 14:
// hard state and snapshot key.
rid := binary.BigEndian.Uint64(key[0:8])
rids[rid] = true

gid := binary.BigEndian.Uint32(key[10:14])
gids[gid] = true
case len(key) == 20:
// entry key.
rid := binary.BigEndian.Uint64(key[0:8])
rids[rid] = true

gid := binary.BigEndian.Uint32(key[8:12])
gids[gid] = true
default:
// Ignore other keys.
}
}

err := db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = false
itr := txn.NewIterator(opt)
defer itr.Close()

for itr.Rewind(); itr.Valid(); itr.Next() {
parseIds(itr.Item())
}
return nil
})
if err != nil {
return err
}
fmt.Printf("rids: %v\n", rids)
fmt.Printf("gids: %v\n", gids)

pending := make(map[uint64]bool)
printEntry := func(es raftpb.Entry) {
var buf bytes.Buffer
fmt.Fprintf(&buf, "%d . %d . %v . %-6s .", es.Term, es.Index, es.Type,
humanize.Bytes(uint64(es.Size())))
if es.Type == raftpb.EntryConfChange {
fmt.Printf("%s\n", buf.Bytes())
return
}
var pr pb.Proposal
var zpr pb.ZeroProposal
if err := pr.Unmarshal(es.Data); err == nil {
printAlphaProposal(&buf, pr, pending)
} else if err := zpr.Unmarshal(es.Data); err == nil {
printZeroProposal(&buf, zpr)
} else {
fmt.Printf("%s Unable to parse Proposal: %v\n", buf.Bytes(), err)
return
}
fmt.Printf("%s\n", buf.Bytes())
}

printRaft := func(store *raftwal.DiskStorage) {
fmt.Println()
snap, err := store.Snapshot()
if err != nil {
fmt.Printf("Got error while retrieving snapshot: %v\n", err)
} else {
fmt.Printf("Snapshot Metadata: %+v\n", snap.Metadata)
var ds pb.Snapshot
var ms pb.MembershipState
if err := ds.Unmarshal(snap.Data); err == nil {
fmt.Printf("Snapshot Alpha: %+v\n", ds)
} else if err := ms.Unmarshal(snap.Data); err == nil {
for gid, group := range ms.GetGroups() {
fmt.Printf("\nGROUP: %d\n", gid)
for _, member := range group.GetMembers() {
fmt.Printf("Member: %+v .\n", member)
}
for _, tablet := range group.GetTablets() {
fmt.Printf("Tablet: %+v .\n", tablet)
}
group.Members = nil
group.Tablets = nil
fmt.Printf("Group: %d %+v .\n", gid, group)
}
ms.Groups = nil
fmt.Printf("\nSnapshot Zero: %+v\n", ms)
} else {
fmt.Printf("Unable to unmarshal Dgraph snapshot: %v", err)
}
}
fmt.Println()

if hs, err := store.HardState(); err != nil {
fmt.Printf("Got error while retrieving hardstate: %v\n", err)
} else {
fmt.Printf("Hardstate: %+v\n", hs)
}

lastIdx, err := store.LastIndex()
if err != nil {
fmt.Printf("Got error while retrieving last index: %v\n", err)
return
}
startIdx := snap.Metadata.Index + 1
fmt.Printf("Last Index: %d . Num Entries: %d .\n\n", lastIdx, lastIdx-startIdx)

pending = make(map[uint64]bool)
for startIdx < lastIdx-1 {
entries, err := store.Entries(startIdx, lastIdx, 64<<20 /* 64 MB Max Size */)
if err != nil {
fmt.Printf("Got error while retrieving entries: %v\n", err)
return
}
for _, ent := range entries {
printEntry(ent)
startIdx = x.Max(startIdx, ent.Index)
}
}
}

for rid, _ := range rids {
for gid, _ := range gids {
fmt.Printf("Iterating with Raft Id = %d Groupd Id = %d\n", rid, gid)
store := raftwal.Init(db, rid, gid)
printRaft(store)
}
}
return nil
}

func run() {
dir := opt.pdir
isWal := false
if len(dir) == 0 {
dir = opt.wdir
isWal = true
}
bopts := badger.DefaultOptions
bopts.Dir = opt.pdir
bopts.ValueDir = opt.pdir
bopts.Dir = dir
bopts.ValueDir = dir
bopts.TableLoadingMode = options.MemoryMap
bopts.ReadOnly = opt.readOnly

x.AssertTruef(len(bopts.Dir) > 0, "No posting dir specified.")
x.AssertTruef(len(bopts.Dir) > 0, "No posting or wal dir specified.")
fmt.Printf("Opening DB: %s\n", bopts.Dir)

db, err := badger.OpenManaged(bopts)
var db *badger.DB
var err error
if isWal {
db, err = badger.Open(bopts)
} else {
db, err = badger.OpenManaged(bopts)
}
x.Check(err)
defer db.Close()

if isWal {
if err := parseWal(db); err != nil {
fmt.Printf("\nGot error while parsing WAL: %v\n", err)
}
fmt.Println("Done")
// WAL can't execute the getMinMax function, so we need to deal with it
// here, instead of in the select case below.
return
}

min, max := getMinMax(db, opt.readTs)
fmt.Printf("Min commit: %d. Max commit: %d, w.r.t %d\n", min, max, opt.readTs)

Expand Down
4 changes: 3 additions & 1 deletion raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ type DiskStorage struct {

func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage {
w := &DiskStorage{db: db, id: id, gid: gid, cache: new(sync.Map)}
x.Check(w.StoreRaftId(id))
if prev, err := RaftId(db); err != nil || prev != id {
x.Check(w.StoreRaftId(id))
}
w.elog = trace.NewEventLog("Badger", "RaftStorage")

snap, err := w.Snapshot()
Expand Down

0 comments on commit 562a385

Please sign in to comment.