Skip to content

Commit

Permalink
Fix up predicate snapshot (dgraph-io#2487)
Browse files Browse the repository at this point in the history
Fix up both the way we create a snapshot and the way we stream it. Make the logic robust and add tests to ensure that the calculation of snapshot is right.

Streaming no longer needs to worry about `min_ts`. We no longer send a snapshot upfront. Raft would request it as it desires, and the follower sends the snapshot details it needs from the leader. The leader in turn checks if its snapshot matches with what the follower wants. This is a better logic, but not entirely foolproof (comment in predicate.go explains).

Bring in Badger changes, which add a `DropAll` method, which is called before applying the snapshot.
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent e399f52 commit 9bf71d3
Show file tree
Hide file tree
Showing 16 changed files with 731 additions and 437 deletions.
3 changes: 2 additions & 1 deletion conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

var (
ErrDuplicateRaftId = x.Errorf("Node is already part of group")
ErrNoNode = fmt.Errorf("No node has been set up yet")
)

type Node struct {
Expand Down Expand Up @@ -418,7 +419,7 @@ func (n *Node) AddToCluster(ctx context.Context, pid uint64) error {

func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
if n.Raft() == nil {
return errNoNode
return ErrNoNode
}
if _, ok := n.Peer(id); !ok && id != n.RaftContext.Id {
return x.Errorf("Node %d not part of group", id)
Expand Down
13 changes: 5 additions & 8 deletions conn/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package conn
import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"sync"

Expand Down Expand Up @@ -115,7 +114,7 @@ func (w *RaftServer) IsPeer(ctx context.Context, rc *intern.RaftContext) (*inter
error) {
node := w.GetNode()
if node == nil || node.Raft() == nil {
return &intern.PeerResponse{}, errNoNode
return &intern.PeerResponse{}, ErrNoNode
}

if node._confState == nil {
Expand All @@ -139,7 +138,7 @@ func (w *RaftServer) JoinCluster(ctx context.Context,
// TODO: Uncomment this after groups is removed.
node := w.GetNode()
if node == nil || node.Raft() == nil {
return nil, errNoNode
return nil, ErrNoNode
}
// Only process one JoinCluster request at a time.
node.joinLock.Lock()
Expand Down Expand Up @@ -168,14 +167,12 @@ func (w *RaftServer) JoinCluster(ctx context.Context,
return &api.Payload{}, err
}

var (
errNoNode = fmt.Errorf("No node has been set up yet")
)
var ()

func (w *RaftServer) applyMessage(ctx context.Context, msg raftpb.Message) error {
node := w.GetNode()
if node == nil || node.Raft() == nil {
return errNoNode
return ErrNoNode
}

c := make(chan error, 1)
Expand All @@ -198,7 +195,7 @@ func (w *RaftServer) RaftMessage(ctx context.Context,
if rc != nil {
n := w.GetNode()
if n == nil {
return &api.Payload{}, errNoNode
return &api.Payload{}, ErrNoNode
}
n.Connect(rc.Id, rc.Addr)
}
Expand Down
3 changes: 2 additions & 1 deletion posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ func GetNoStore(key []byte) (*List, error) {
return getNew(key, pstore) // This retrieves a new *List and sets refcount to 1.
}

// This doesn't sync, so call this only when you don't care about dirty posting lists in // memory(for example before populating snapshot) or after calling syncAllMarks
// This doesn't sync, so call this only when you don't care about dirty posting lists in
// memory(for example before populating snapshot) or after calling syncAllMarks
func EvictLRU() {
lcache.Reset()
}
Expand Down
4 changes: 4 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (tx *Txn) CommitMutations(ctx context.Context, commitTs uint64) error {
sort.SliceStable(tx.deltas, func(i, j int) bool {
return bytes.Compare(tx.deltas[i].key, tx.deltas[j].key) < 0
})
// TODO: Simplify this. All we need to do is to get the PL for the key, and if it has the
// postings for the startTs, we commit them. Otherwise, we skip.
// Also, if the snapshot read ts is above the commit ts, then we just delete the postings from
// memory, instead of writing them back again.
var prevKey []byte
var pl *intern.PostingList
var plist *List
Expand Down
6 changes: 3 additions & 3 deletions posting/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func (o *oracle) RegisterStartTs(ts uint64) *Txn {
return txn
}

// MinPendingStartTs returns the min start ts which is currently pending a commit or abort decision.
func (o *oracle) MinPendingStartTs() uint64 {
// minPendingStartTs returns the min start ts which is currently pending a commit or abort decision.
func (o *oracle) minPendingStartTs() uint64 {
o.RLock()
defer o.RUnlock()
min := uint64(math.MaxUint64)
Expand All @@ -107,7 +107,7 @@ func (o *oracle) PurgeTs() uint64 {
// o.MinPendingStartTs can be inf, but we don't want Zero to delete new
// records that haven't yet reached us. So, we also consider MaxAssigned
// that we have received so far, so only records below MaxAssigned are purged.
return x.Min(o.MinPendingStartTs()-1, o.MaxAssigned())
return x.Min(o.minPendingStartTs()-1, o.MaxAssigned())
}

func (o *oracle) TxnOlderThan(dur time.Duration) (res []uint64) {
Expand Down
443 changes: 221 additions & 222 deletions protos/intern/internal.pb.go

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions protos/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ message KeyValues {
}

message Snapshot {
RaftContext context = 1;
uint64 index = 2;
uint64 min_pending_start_ts = 3;
RaftContext context = 1;
uint64 index = 2;
uint64 read_ts = 3;
}

message Proposal {
Expand Down Expand Up @@ -401,12 +401,12 @@ service Zero {

service Worker {
// Data serving RPCs.
rpc Mutate (Mutations) returns (api.TxnContext) {}
rpc ServeTask (Query) returns (Result) {}
rpc PredicateAndSchemaData (SnapshotMeta) returns (stream KVS) {}
rpc Sort (SortMessage) returns (SortResult) {}
rpc Schema (SchemaRequest) returns (SchemaResult) {}
rpc PurgeTs (api.Payload) returns (Num) {}
rpc Mutate (Mutations) returns (api.TxnContext) {}
rpc ServeTask (Query) returns (Result) {}
rpc StreamSnapshot (Snapshot) returns (stream KVS) {}
rpc Sort (SortMessage) returns (SortResult) {}
rpc Schema (SchemaRequest) returns (SchemaResult) {}
rpc PurgeTs (api.Payload) returns (Num) {}

rpc Export (ExportPayload) returns (ExportPayload) {}
rpc ReceivePredicate(stream KVS) returns (api.Payload) {}
Expand Down
13 changes: 12 additions & 1 deletion vendor/github.com/dgraph-io/badger/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions vendor/github.com/dgraph-io/badger/errors.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 56 additions & 0 deletions vendor/github.com/dgraph-io/badger/levels.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 107 additions & 0 deletions vendor/github.com/dgraph-io/badger/managed_db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9bf71d3

Please sign in to comment.