Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diagnose Raft WAL via debug tool #3319

Merged
merged 9 commits into from
Apr 26, 2019
Merged
227 changes: 223 additions & 4 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package debug

import (
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"log"
"math"
"sort"
"strconv"
"strings"

Expand All @@ -31,9 +33,12 @@ import (
"github.com/dgraph-io/dgraph/codec"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/raftwal"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
humanize "github.com/dustin/go-humanize"
"github.com/spf13/cobra"
"go.etcd.io/etcd/raft/raftpb"
)

var (
Expand All @@ -48,6 +53,7 @@ type flagOptions struct {
predicate string
readOnly bool
pdir string
wdir string
itemMeta bool
jepsen string
readTs uint64
Expand Down Expand Up @@ -76,6 +82,7 @@ func init() {
flag.StringVarP(&opt.keyLookup, "lookup", "l", "", "Hex of key to lookup.")
flag.BoolVarP(&opt.keyHistory, "history", "y", false, "Show all versions of a key.")
flag.StringVarP(&opt.pdir, "postings", "p", "", "Directory where posting lists are stored.")
flag.StringVarP(&opt.wdir, "wal", "w", "", "Directory where Raft write-ahead logs are stored.")
flag.BoolVar(&opt.sizeHistogram, "histogram", false,
"Show a histogram of the key and value sizes.")
}
Expand Down Expand Up @@ -657,20 +664,232 @@ func sizeHistogram(db *badger.DB) {
valueSizeHistogram.PrintHistogram()
}

func printAlphaProposal(buf *bytes.Buffer, pr pb.Proposal, pending map[uint64]bool) {
switch {
case pr.Mutations != nil:
fmt.Fprintf(buf, " Mutation . StartTs: %d . Edges: %d .",
pr.Mutations.StartTs, len(pr.Mutations.Edges))
if len(pr.Mutations.Edges) > 0 {
pending[pr.Mutations.StartTs] = true
} else {
fmt.Fprintf(buf, " Mutation: %+v .", pr.Mutations)
}
fmt.Fprintf(buf, " Pending txns: %d .", len(pending))
case len(pr.Kv) > 0:
fmt.Fprintf(buf, " KV . Size: %d ", len(pr.Kv))
case pr.State != nil:
fmt.Fprintf(buf, " State . %+v ", pr.State)
case pr.Delta != nil:
fmt.Fprintf(buf, " Delta .")
sort.Slice(pr.Delta.Txns, func(i, j int) bool {
ti := pr.Delta.Txns[i]
tj := pr.Delta.Txns[j]
return ti.StartTs < tj.StartTs
})
fmt.Fprintf(buf, " Max: %d .", pr.Delta.GetMaxAssigned())
for _, txn := range pr.Delta.Txns {
fmt.Fprintf(buf, " %d → %d .", txn.StartTs, txn.CommitTs)
delete(pending, txn.StartTs)
}
fmt.Fprintf(buf, " Pending txns: %d .", len(pending))
case pr.Snapshot != nil:
fmt.Fprintf(buf, " Snapshot . %+v ", pr.Snapshot)
}
}

func printZeroProposal(buf *bytes.Buffer, zpr pb.ZeroProposal) {
switch {
case len(zpr.SnapshotTs) > 0:
fmt.Fprintf(buf, " Snapshot: %+v .", zpr.SnapshotTs)
case zpr.Member != nil:
fmt.Fprintf(buf, " Member: %+v .", zpr.Member)
case zpr.Tablet != nil:
fmt.Fprintf(buf, " Tablet: %+v .", zpr.Tablet)
case zpr.MaxLeaseId > 0:
fmt.Fprintf(buf, " MaxLeaseId: %d .", zpr.MaxLeaseId)
case zpr.MaxRaftId > 0:
fmt.Fprintf(buf, " MaxRaftId: %d .", zpr.MaxRaftId)
case zpr.MaxTxnTs > 0:
fmt.Fprintf(buf, " MaxTxnTs: %d .", zpr.MaxTxnTs)
case zpr.Txn != nil:
txn := zpr.Txn
fmt.Fprintf(buf, " Txn %d → %d .", txn.StartTs, txn.CommitTs)
default:
fmt.Fprintf(buf, " Proposal: %+v .", zpr)
}
}

func parseWal(db *badger.DB) error {
rids := make(map[uint64]bool)
gids := make(map[uint32]bool)

parseIds := func(item *badger.Item) {
key := item.Key()
switch {
case len(key) == 14:
// hard state and snapshot key.
rid := binary.BigEndian.Uint64(key[0:8])
rids[rid] = true

gid := binary.BigEndian.Uint32(key[10:14])
gids[gid] = true
case len(key) == 20:
// entry key.
rid := binary.BigEndian.Uint64(key[0:8])
rids[rid] = true

gid := binary.BigEndian.Uint32(key[8:12])
gids[gid] = true
default:
// Ignore other keys.
}
}

err := db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = false
itr := txn.NewIterator(opt)
defer itr.Close()

for itr.Rewind(); itr.Valid(); itr.Next() {
parseIds(itr.Item())
}
return nil
})
if err != nil {
return err
}
fmt.Printf("rids: %v\n", rids)
fmt.Printf("gids: %v\n", gids)

pending := make(map[uint64]bool)
printEntry := func(es raftpb.Entry) {
var buf bytes.Buffer
fmt.Fprintf(&buf, "%d . %d . %v . %-6s .", es.Term, es.Index, es.Type,
humanize.Bytes(uint64(es.Size())))
if es.Type == raftpb.EntryConfChange {
fmt.Printf("%s\n", buf.Bytes())
return
}
var pr pb.Proposal
var zpr pb.ZeroProposal
if err := pr.Unmarshal(es.Data); err == nil {
printAlphaProposal(&buf, pr, pending)
} else if err := zpr.Unmarshal(es.Data); err == nil {
printZeroProposal(&buf, zpr)
} else {
fmt.Printf("%s Unable to parse Proposal: %v\n", buf.Bytes(), err)
return
}
fmt.Printf("%s\n", buf.Bytes())
}

printRaft := func(store *raftwal.DiskStorage) {
fmt.Println()
snap, err := store.Snapshot()
if err != nil {
fmt.Printf("Got error while retrieving snapshot: %v\n", err)
} else {
fmt.Printf("Snapshot Metadata: %+v\n", snap.Metadata)
var ds pb.Snapshot
var ms pb.MembershipState
if err := ds.Unmarshal(snap.Data); err == nil {
fmt.Printf("Snapshot Alpha: %+v\n", ds)
} else if err := ms.Unmarshal(snap.Data); err == nil {
for gid, group := range ms.GetGroups() {
fmt.Printf("\nGROUP: %d\n", gid)
for _, member := range group.GetMembers() {
fmt.Printf("Member: %+v .\n", member)
}
for _, tablet := range group.GetTablets() {
fmt.Printf("Tablet: %+v .\n", tablet)
}
group.Members = nil
group.Tablets = nil
fmt.Printf("Group: %d %+v .\n", gid, group)
}
ms.Groups = nil
fmt.Printf("\nSnapshot Zero: %+v\n", ms)
} else {
fmt.Printf("Unable to unmarshal Dgraph snapshot: %v", err)
}
}
fmt.Println()

if hs, err := store.HardState(); err != nil {
fmt.Printf("Got error while retrieving hardstate: %v\n", err)
} else {
fmt.Printf("Hardstate: %+v\n", hs)
}

lastIdx, err := store.LastIndex()
if err != nil {
fmt.Printf("Got error while retrieving last index: %v\n", err)
return
}
startIdx := snap.Metadata.Index + 1
fmt.Printf("Last Index: %d . Num Entries: %d .\n\n", lastIdx, lastIdx-startIdx)

pending = make(map[uint64]bool)
for startIdx < lastIdx-1 {
entries, err := store.Entries(startIdx, lastIdx, 64<<20 /* 64 MB Max Size */)
if err != nil {
fmt.Printf("Got error while retrieving entries: %v\n", err)
return
}
for _, ent := range entries {
printEntry(ent)
startIdx = x.Max(startIdx, ent.Index)
}
}
}

for rid, _ := range rids {
for gid, _ := range gids {
fmt.Printf("Iterating with Raft Id = %d Groupd Id = %d\n", rid, gid)
store := raftwal.Init(db, rid, gid)
printRaft(store)
}
}
return nil
}

func run() {
dir := opt.pdir
isWal := false
if len(dir) == 0 {
dir = opt.wdir
isWal = true
}
bopts := badger.DefaultOptions
bopts.Dir = opt.pdir
bopts.ValueDir = opt.pdir
bopts.Dir = dir
bopts.ValueDir = dir
bopts.TableLoadingMode = options.MemoryMap
bopts.ReadOnly = opt.readOnly

x.AssertTruef(len(bopts.Dir) > 0, "No posting dir specified.")
x.AssertTruef(len(bopts.Dir) > 0, "No posting or wal dir specified.")
fmt.Printf("Opening DB: %s\n", bopts.Dir)

db, err := badger.OpenManaged(bopts)
var db *badger.DB
var err error
if isWal {
db, err = badger.Open(bopts)
} else {
db, err = badger.OpenManaged(bopts)
}
x.Check(err)
defer db.Close()

if isWal {
if err := parseWal(db); err != nil {
fmt.Printf("\nGot error while parsing WAL: %v\n", err)
}
fmt.Println("Done")
// WAL can't execute the getMinMax function, so we need to deal with it
// here, instead of in the select case below.
return
}

min, max := getMinMax(db, opt.readTs)
fmt.Printf("Min commit: %d. Max commit: %d, w.r.t %d\n", min, max, opt.readTs)

Expand Down
4 changes: 3 additions & 1 deletion raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ type DiskStorage struct {

func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage {
w := &DiskStorage{db: db, id: id, gid: gid, cache: new(sync.Map)}
x.Check(w.StoreRaftId(id))
if prev, err := RaftId(db); err != nil || prev != id {
x.Check(w.StoreRaftId(id))
}
w.elog = trace.NewEventLog("Badger", "RaftStorage")

snap, err := w.Snapshot()
Expand Down