Skip to content

Commit

Permalink
Make ReadIndex work safely. (dgraph-io#2469)
Browse files Browse the repository at this point in the history
Fix a long-standing bug, where we were overwriting raftpb.Message.Context, before sending it over the wire. The understanding we had was that Context field was unused by raft library, but that was not the case. Specifically, MsgReadIndex was being sent as part of HeartBeat using the Context field.

Changed the proto that we use for sending Raft message batches, so that it carries RaftContext directly, instead of making it part of the raftpb.Message.

Turned linearizable reads back on for Zero, and for retrieving state from Zero.
More context here: etcd-io/etcd#9893
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent 765497e commit 75e2e6f
Show file tree
Hide file tree
Showing 7 changed files with 436 additions and 228 deletions.
42 changes: 25 additions & 17 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,14 @@ func (n *Node) doSendMessage(pool *Pool, data []byte) {

c := intern.NewRaftClient(client)
p := &api.Payload{Data: data}
batch := &intern.RaftBatch{
Context: n.RaftContext,
Payload: p,
}

ch := make(chan error, 1)
go func() {
_, err := c.RaftMessage(ctx, p)
_, err := c.RaftMessage(ctx, batch)
if err != nil {
x.Printf("Error while sending message to node with addr: %s, err: %v\n", pool.Addr, err)
}
Expand Down Expand Up @@ -464,6 +468,7 @@ var errReadIndex = x.Errorf("cannot get linearized read (time expired or no conf

func (n *Node) WaitLinearizableRead(ctx context.Context) error {
indexCh := make(chan uint64, 1)

select {
case n.requestCh <- linReadReq{indexCh: indexCh}:
case <-ctx.Done():
Expand All @@ -489,7 +494,7 @@ func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadSt
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

activeRctx := make([]byte, 8)
var activeRctx [8]byte
x.Check2(n.Rand.Read(activeRctx[:]))
if err := n.Raft().ReadIndex(ctx, activeRctx[:]); err != nil {
x.Errorf("Error while trying to call ReadIndex: %v\n", err)
Expand Down Expand Up @@ -543,6 +548,7 @@ func (n *Node) RunReadIndexLoop(closer *y.Closer, readStateCh <-chan raft.ReadSt
for _, req := range requests {
req.indexCh <- index
}
break
}
requests = requests[:0]
}
Expand Down Expand Up @@ -625,17 +631,10 @@ var (
)

func (w *RaftServer) applyMessage(ctx context.Context, msg raftpb.Message) error {
var rc intern.RaftContext
x.Check(rc.Unmarshal(msg.Context))

node := w.GetNode()
if node == nil || node.Raft() == nil {
return errNoNode
}
if rc.Group != node.RaftContext.Group {
return errNoNode
}
node.Connect(msg.From, rc.Addr)

c := make(chan error, 1)
go func() { c <- node.Raft().Step(ctx, msg) }()
Expand All @@ -648,24 +647,33 @@ func (w *RaftServer) applyMessage(ctx context.Context, msg raftpb.Message) error
}
}
func (w *RaftServer) RaftMessage(ctx context.Context,
query *api.Payload) (*api.Payload, error) {
batch *intern.RaftBatch) (*api.Payload, error) {
if ctx.Err() != nil {
return &api.Payload{}, ctx.Err()
}

for idx := 0; idx < len(query.Data); {
x.AssertTruef(len(query.Data[idx:]) >= 4,
"Slice left of size: %v. Expected at least 4.", len(query.Data[idx:]))
rc := batch.GetContext()
if rc != nil {
w.GetNode().Connect(rc.Id, rc.Addr)
}
if batch.GetPayload() == nil {
return &api.Payload{}, nil
}
data := batch.Payload.Data

for idx := 0; idx < len(data); {
x.AssertTruef(len(data[idx:]) >= 4,
"Slice left of size: %v. Expected at least 4.", len(data[idx:]))

sz := int(binary.LittleEndian.Uint32(query.Data[idx : idx+4]))
sz := int(binary.LittleEndian.Uint32(data[idx : idx+4]))
idx += 4
msg := raftpb.Message{}
if idx+sz > len(query.Data) {
if idx+sz > len(data) {
return &api.Payload{}, x.Errorf(
"Invalid query. Specified size %v overflows slice [%v,%v)\n",
sz, idx, len(query.Data))
sz, idx, len(data))
}
if err := msg.Unmarshal(query.Data[idx : idx+sz]); err != nil {
if err := msg.Unmarshal(data[idx : idx+sz]); err != nil {
x.Check(err)
}
if err := w.applyMessage(ctx, msg); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions dgraph/cmd/zero/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ func (st *state) getState(w http.ResponseWriter, r *http.Request) {
x.AddCorsHeaders(w)
w.Header().Set("Content-Type", "application/json")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := st.node.WaitLinearizableRead(ctx); err != nil {
w.WriteHeader(http.StatusInternalServerError)
x.SetStatus(w, x.Error, err.Error())
return
}
mstate := st.zero.membershipState()
if mstate == nil {
x.SetStatus(w, x.ErrorNoData, "No membership state found.")
Expand Down
3 changes: 0 additions & 3 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,6 @@ func (n *node) Run() {
var leader bool
ticker := time.NewTicker(20 * time.Millisecond)
defer ticker.Stop()
rcBytes, err := n.RaftContext.Marshal()
x.Check(err)

closer := y.NewCloser(4)
// snapshot can cause select loop to block while deleting entries, so run
Expand Down Expand Up @@ -592,7 +590,6 @@ func (n *node) Run() {
}

for _, msg := range rd.Messages {
msg.Context = rcBytes
n.Send(msg)
}
// Need to send membership state to dgraph nodes on leader change also.
Expand Down
8 changes: 3 additions & 5 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,10 +564,8 @@ func (s *Server) Update(stream intern.Zero_UpdateServer) error {
}

func (s *Server) latestMembershipState(ctx context.Context) (*intern.MembershipState, error) {
// TODO: Bring lin read for Zero back, once Etcd folks can tell why ReadStates are not being
// populated. NOTE: This is important to fix quickly.
// if err := s.Node.WaitLinearizableRead(ctx); err != nil {
// return nil, err
// }
if err := s.Node.WaitLinearizableRead(ctx); err != nil {
return nil, err
}
return s.membershipState(), nil
}
Loading

0 comments on commit 75e2e6f

Please sign in to comment.