From b3fb60ad699cc18d2e292a35badaad3416734a3a Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 21 Nov 2024 10:53:16 +0000 Subject: [PATCH] NRG: Support for replying to forwarded proposals This adds support for reply subjects on forwarded proposals, so we can know whether or not a leader has acted upon those proposals yet. The `ForwardProposal` function does NOT yet use this functionality as we cannot know in a mixed-version or upgrade scenario yet if the remote side will be able to respond. Signed-off-by: Neil Twigg --- server/raft.go | 55 ++++++++++++++++++++++++++++++++++++--------- server/raft_test.go | 27 ++++++++++++++++++++-- 2 files changed, 69 insertions(+), 13 deletions(-) diff --git a/server/raft.go b/server/raft.go index cbcf9b9e4d..edcd423f51 100644 --- a/server/raft.go +++ b/server/raft.go @@ -201,7 +201,7 @@ type raft struct { hcommit uint64 // The commit at the time that applies were paused pobserver bool // Whether we were an observer at the time that applies were paused - prop *ipQueue[*Entry] // Proposals + prop *ipQueue[*proposedEntry] // Proposals entry *ipQueue[*appendEntry] // Append entries resp *ipQueue[*appendEntryResponse] // Append entries responses apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer) @@ -211,6 +211,11 @@ type raft struct { quit chan struct{} // Raft group shutdown } +type proposedEntry struct { + *Entry + reply string // Optional, to respond once proposal handled +} + // cacthupState structure that holds our subscription, and catchup term and index // as well as starting term and index and how many updates we have seen. type catchupState struct { @@ -385,7 +390,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel quit: make(chan struct{}), reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"), votes: newIPQueue[*voteResponse](s, qpfx+"vresp"), - prop: newIPQueue[*Entry](s, qpfx+"entry"), + prop: newIPQueue[*proposedEntry](s, qpfx+"entry"), entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"), resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"), apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"), @@ -811,7 +816,7 @@ func (n *raft) Propose(data []byte) error { if werr := n.werr; werr != nil { return werr } - n.prop.push(newEntry(EntryNormal, data)) + n.prop.push(newProposedEntry(newEntry(EntryNormal, data), _EMPTY_)) return nil } @@ -830,20 +835,21 @@ func (n *raft) ProposeMulti(entries []*Entry) error { return werr } for _, e := range entries { - n.prop.push(e) + n.prop.push(newProposedEntry(e, _EMPTY_)) } return nil } // ForwardProposal will forward the proposal to the leader if known. // If we are the leader this is the same as calling propose. -// FIXME(dlc) - We could have a reply subject and wait for a response -// for retries, but would need to not block and be in separate Go routine. func (n *raft) ForwardProposal(entry []byte) error { if n.Leader() { return n.Propose(entry) } + // TODO: Currently we do not set a reply subject, even though we are + // now capable of responding. Do this once enough time has passed, + // i.e. maybe in 2.12. n.sendRPC(n.psubj, _EMPTY_, entry) return nil } @@ -862,7 +868,7 @@ func (n *raft) ProposeAddPeer(peer string) error { prop := n.prop n.RUnlock() - prop.push(newEntry(EntryAddPeer, []byte(peer))) + prop.push(newProposedEntry(newEntry(EntryAddPeer, []byte(peer)), _EMPTY_)) return nil } @@ -898,7 +904,7 @@ func (n *raft) ProposeRemovePeer(peer string) error { // peer remove and then notifying the rest of the group that the // peer was removed. if isLeader { - prop.push(newEntry(EntryRemovePeer, []byte(peer))) + prop.push(newProposedEntry(newEntry(EntryRemovePeer, []byte(peer)), _EMPTY_)) n.doRemovePeerAsLeader(peer) return nil } @@ -2177,6 +2183,26 @@ func (ae *appendEntry) returnToPool() { aePool.Put(ae) } +// Pool for proposedEntry re-use. +var pePool = sync.Pool{ + New: func() any { + return &proposedEntry{} + }, +} + +// Create a new proposedEntry. +func newProposedEntry(entry *Entry, reply string) *proposedEntry { + pe := pePool.Get().(*proposedEntry) + pe.Entry, pe.reply = entry, reply + return pe +} + +// Will return this proosed entry. +func (pe *proposedEntry) returnToPool() { + pe.Entry, pe.reply = nil, _EMPTY_ + pePool.Put(pe) +} + type EntryType uint8 const ( @@ -2386,7 +2412,7 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _ // Need to copy since this is underlying client/route buffer. peer := copyBytes(msg) - prop.push(newEntry(EntryRemovePeer, peer)) + prop.push(newProposedEntry(newEntry(EntryRemovePeer, peer), reply)) } // Called when a peer has forwarded a proposal. @@ -2407,7 +2433,7 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account, return } - prop.push(newEntry(EntryNormal, msg)) + prop.push(newProposedEntry(newEntry(EntryNormal, msg), reply)) } func (n *raft) runAsLeader() { @@ -2475,7 +2501,7 @@ func (n *raft) runAsLeader() { if b.Type == EntryRemovePeer { n.doRemovePeerAsLeader(string(b.Data)) } - entries = append(entries, b) + entries = append(entries, b.Entry) // Increment size. sz += len(b.Data) + 1 // If below thresholds go ahead and send. @@ -2491,6 +2517,13 @@ func (n *raft) runAsLeader() { if len(entries) > 0 { n.sendAppendEntry(entries) } + // Respond to any proposals waiting for a confirmation. + for _, pe := range es { + if pe.reply != _EMPTY_ { + n.sendReply(pe.reply, nil) + } + pe.returnToPool() + } n.prop.recycle(&es) case <-hb.C: diff --git a/server/raft_test.go b/server/raft_test.go index edeacdbe14..41a130a460 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -15,6 +15,7 @@ package server import ( "bytes" + "encoding/binary" "errors" "fmt" "math" @@ -372,7 +373,7 @@ func TestNRGSwitchStateClearsQueues(t *testing.T) { s := c.servers[0] // RunBasicJetStreamServer not available n := &raft{ - prop: newIPQueue[*Entry](s, "prop"), + prop: newIPQueue[*proposedEntry](s, "prop"), resp: newIPQueue[*appendEntryResponse](s, "resp"), leadc: make(chan bool, 1), // for switchState } @@ -380,7 +381,7 @@ func TestNRGSwitchStateClearsQueues(t *testing.T) { require_Equal(t, n.prop.len(), 0) require_Equal(t, n.resp.len(), 0) - n.prop.push(&Entry{}) + n.prop.push(&proposedEntry{&Entry{}, _EMPTY_}) n.resp.push(&appendEntryResponse{}) require_Equal(t, n.prop.len(), 1) require_Equal(t, n.resp.len(), 1) @@ -1631,3 +1632,25 @@ func TestNRGTruncateDownToCommitted(t *testing.T) { n.processAppendEntry(aeHeartbeat, n.aesub) require_Equal(t, n.commit, 2) } + +func TestNRGForwardProposalResponse(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!")) + defer nc.Close() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + n := rg.nonLeader().node().(*raft) + psubj := n.psubj + + data := make([]byte, binary.MaxVarintLen64) + dn := binary.PutVarint(data, int64(123)) + + _, err := nc.Request(psubj, data[:dn], time.Second*5) + require_NoError(t, err) + + rg.waitOnTotal(t, 123) +}