Skip to content

Commit 84ae1cd

Browse files
committed
raft: support sending MsgApp from a log snapshot
Epic: none Release note: none
1 parent 79e76a1 commit 84ae1cd

File tree

3 files changed

+107
-0
lines changed

3 files changed

+107
-0
lines changed

pkg/kv/kvserver/replica_raft.go

+60
Original file line numberDiff line numberDiff line change
@@ -1431,6 +1431,66 @@ func (r *Replica) tick(
14311431
return true, nil
14321432
}
14331433

1434+
// processMsgApps sends MsgApp to all peers whose send stream is ready to send.
1435+
//
1436+
// FIXME: find the right placement in RACv2 code. Potentially this just needs to
1437+
// be inlined into the Ready handler.
1438+
func (r *Replica) processMsgApps(_ context.Context) error {
1439+
r.raftMu.Lock()
1440+
defer r.raftMu.Unlock()
1441+
1442+
// We are the leader at the given term.
1443+
var term uint64 // FIXME: we should know it
1444+
1445+
// Grab the snapshot of the log, if we are still the leader of the term. This
1446+
// only locks Replica.mu for reads, and returns quickly. No IO is performed.
1447+
var logSnap raft.LogSnapshot
1448+
if !func() bool {
1449+
r.mu.RLock()
1450+
defer r.mu.Unlock()
1451+
rg := r.mu.internalRaftGroup
1452+
// We need to be the leader of the given term to be able to send MsgApps.
1453+
if rg.Term() != term || rg.Lead() != raftpb.PeerID(r.replicaID) {
1454+
return false
1455+
}
1456+
logSnap = rg.LogSnapshot()
1457+
return true
1458+
}() {
1459+
return nil
1460+
}
1461+
1462+
// We are still holding raftMu, so it is safe to use the log snapshot for
1463+
// constructing MsgApps. The log will not be mutated in storage. This will
1464+
// potentially incur storage reads.
1465+
//
1466+
// FIXME: iterate over all peers to whom we should send a MsgApp.
1467+
slices := make(map[roachpb.ReplicaID]raft.LogSlice, 5)
1468+
for peer := roachpb.ReplicaID(0); peer < 1; peer++ {
1469+
// FIXME: should know the parameters, as instructed by the send streams.
1470+
var after, last, maxSize uint64
1471+
slices[peer] = logSnap.LogSlice(after, last, maxSize)
1472+
}
1473+
if len(slices) == 0 { // nothing to send
1474+
return nil
1475+
}
1476+
1477+
// Now grab the Replica.mu again (for writes), and send the MsgApp messages.
1478+
// No IO happens here. The messages are stashed in RawNode message queue, and
1479+
// will be dispatched with the next Ready handling. Make sure to do all this
1480+
// right before the raft scheduler runs the Ready handler, to minimize
1481+
// latency.
1482+
return r.withRaftGroup(func(rn *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) {
1483+
for peer, slice := range slices {
1484+
// NB: the message sending can fail here, if we lost leadership in the
1485+
// meantime, or the Next index is misaligned with the passed-in slice.
1486+
//
1487+
// Potentially need to update the send stream accordingly from here.
1488+
_ = rn.SendMsgApp(raftpb.PeerID(peer), slice)
1489+
}
1490+
return true, nil
1491+
})
1492+
}
1493+
14341494
func (r *Replica) processRACv2PiggybackedAdmitted(ctx context.Context) {
14351495
r.raftMu.Lock()
14361496
defer r.raftMu.Unlock()

pkg/raft/raft.go

+24
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,30 @@ func (r *raft) maybeSendAppend(to pb.PeerID) bool {
660660
return true
661661
}
662662

663+
func (r *raft) sendMsgApp(to pb.PeerID, ls logSlice) bool {
664+
if r.state != StateLeader || r.Term != ls.term {
665+
return false
666+
}
667+
pr := r.trk.Progress(to)
668+
if pr == nil || pr.State != tracker.StateReplicate || pr.Next != ls.prev.index+1 {
669+
return false
670+
}
671+
commit := r.raftLog.committed
672+
// Send the MsgApp, and update the progress accordingly.
673+
r.send(pb.Message{
674+
To: to,
675+
Type: pb.MsgApp,
676+
Index: ls.prev.index,
677+
LogTerm: ls.prev.term,
678+
Entries: ls.entries,
679+
Commit: commit,
680+
Match: pr.Match,
681+
})
682+
pr.SentEntries(len(ls.entries), uint64(payloadsSize(ls.entries)))
683+
pr.MaybeUpdateSentCommit(commit)
684+
return true
685+
}
686+
663687
// maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given
664688
// node. Returns true iff the snapshot message has been emitted successfully.
665689
func (r *raft) maybeSendSnapshot(to pb.PeerID, pr *tracker.Progress) bool {

pkg/raft/rawnode.go

+23
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,29 @@ func (rn *RawNode) Step(m pb.Message) error {
115115
return rn.raft.Step(m)
116116
}
117117

118+
// LogSnapshot returns the point-in-time state of the raft log.
119+
//
120+
// To use the returned log snapshot correctly (see SendMsgApp method), the
121+
// caller must ensure that the log storage between this call and the usage is
122+
// not mutated.
123+
func (rn *RawNode) LogSnapshot() LogSnapshot {
124+
return rn.raft.raftLog.snap()
125+
}
126+
127+
// SendMsgApp conditionally sends a MsgApp message containing the given log
128+
// slice to the given peer.
129+
//
130+
// The message can be sent only if all the conditions are true:
131+
// - this node is the leader of term to which the slice corresponds
132+
// - the given peer exists
133+
// - the replication flow to the given peer is in StateReplicate
134+
// - the first slice index matches the Next index to send to this peer
135+
//
136+
// Returns true iff the message was sent.
137+
func (rn *RawNode) SendMsgApp(to pb.PeerID, slice LogSlice) bool {
138+
return rn.raft.sendMsgApp(to, slice)
139+
}
140+
118141
// Ready returns the outstanding work that the application needs to handle. This
119142
// includes appending and applying entries or a snapshot, updating the HardState,
120143
// and sending messages. The returned Ready() *must* be handled and subsequently

0 commit comments

Comments
 (0)