Skip to content

Commit

Permalink
NRG: Support for replying to forwarded proposals (#6157)
Browse files Browse the repository at this point in the history
This adds support for reply subjects on forwarded proposals, so we can
know whether or not a leader has acted upon those proposals yet.

The `ForwardProposal` function does NOT yet use this functionality as we
cannot yet know in a mixed-version or upgrade scenario yet if the remote
side will be able to respond.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison authored Nov 22, 2024
2 parents 4ecfb58 + b3fb60a commit 972646f
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 13 deletions.
55 changes: 44 additions & 11 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ type raft struct {
hcommit uint64 // The commit at the time that applies were paused
pobserver bool // Whether we were an observer at the time that applies were paused

prop *ipQueue[*Entry] // Proposals
prop *ipQueue[*proposedEntry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
Expand All @@ -211,6 +211,11 @@ type raft struct {
quit chan struct{} // Raft group shutdown
}

type proposedEntry struct {
*Entry
reply string // Optional, to respond once proposal handled
}

// cacthupState structure that holds our subscription, and catchup term and index
// as well as starting term and index and how many updates we have seen.
type catchupState struct {
Expand Down Expand Up @@ -385,7 +390,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
quit: make(chan struct{}),
reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"),
votes: newIPQueue[*voteResponse](s, qpfx+"vresp"),
prop: newIPQueue[*Entry](s, qpfx+"entry"),
prop: newIPQueue[*proposedEntry](s, qpfx+"entry"),
entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"),
resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"),
Expand Down Expand Up @@ -811,7 +816,7 @@ func (n *raft) Propose(data []byte) error {
if werr := n.werr; werr != nil {
return werr
}
n.prop.push(newEntry(EntryNormal, data))
n.prop.push(newProposedEntry(newEntry(EntryNormal, data), _EMPTY_))
return nil
}

Expand All @@ -830,20 +835,21 @@ func (n *raft) ProposeMulti(entries []*Entry) error {
return werr
}
for _, e := range entries {
n.prop.push(e)
n.prop.push(newProposedEntry(e, _EMPTY_))
}
return nil
}

// ForwardProposal will forward the proposal to the leader if known.
// If we are the leader this is the same as calling propose.
// FIXME(dlc) - We could have a reply subject and wait for a response
// for retries, but would need to not block and be in separate Go routine.
func (n *raft) ForwardProposal(entry []byte) error {
if n.Leader() {
return n.Propose(entry)
}

// TODO: Currently we do not set a reply subject, even though we are
// now capable of responding. Do this once enough time has passed,
// i.e. maybe in 2.12.
n.sendRPC(n.psubj, _EMPTY_, entry)
return nil
}
Expand All @@ -862,7 +868,7 @@ func (n *raft) ProposeAddPeer(peer string) error {
prop := n.prop
n.RUnlock()

prop.push(newEntry(EntryAddPeer, []byte(peer)))
prop.push(newProposedEntry(newEntry(EntryAddPeer, []byte(peer)), _EMPTY_))
return nil
}

Expand Down Expand Up @@ -898,7 +904,7 @@ func (n *raft) ProposeRemovePeer(peer string) error {
// peer remove and then notifying the rest of the group that the
// peer was removed.
if isLeader {
prop.push(newEntry(EntryRemovePeer, []byte(peer)))
prop.push(newProposedEntry(newEntry(EntryRemovePeer, []byte(peer)), _EMPTY_))
n.doRemovePeerAsLeader(peer)
return nil
}
Expand Down Expand Up @@ -2177,6 +2183,26 @@ func (ae *appendEntry) returnToPool() {
aePool.Put(ae)
}

// Pool for proposedEntry re-use.
var pePool = sync.Pool{
New: func() any {
return &proposedEntry{}
},
}

// Create a new proposedEntry.
func newProposedEntry(entry *Entry, reply string) *proposedEntry {
pe := pePool.Get().(*proposedEntry)
pe.Entry, pe.reply = entry, reply
return pe
}

// Will return this proosed entry.
func (pe *proposedEntry) returnToPool() {
pe.Entry, pe.reply = nil, _EMPTY_
pePool.Put(pe)
}

type EntryType uint8

const (
Expand Down Expand Up @@ -2386,7 +2412,7 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _

// Need to copy since this is underlying client/route buffer.
peer := copyBytes(msg)
prop.push(newEntry(EntryRemovePeer, peer))
prop.push(newProposedEntry(newEntry(EntryRemovePeer, peer), reply))
}

// Called when a peer has forwarded a proposal.
Expand All @@ -2407,7 +2433,7 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account,
return
}

prop.push(newEntry(EntryNormal, msg))
prop.push(newProposedEntry(newEntry(EntryNormal, msg), reply))
}

func (n *raft) runAsLeader() {
Expand Down Expand Up @@ -2475,7 +2501,7 @@ func (n *raft) runAsLeader() {
if b.Type == EntryRemovePeer {
n.doRemovePeerAsLeader(string(b.Data))
}
entries = append(entries, b)
entries = append(entries, b.Entry)
// Increment size.
sz += len(b.Data) + 1
// If below thresholds go ahead and send.
Expand All @@ -2491,6 +2517,13 @@ func (n *raft) runAsLeader() {
if len(entries) > 0 {
n.sendAppendEntry(entries)
}
// Respond to any proposals waiting for a confirmation.
for _, pe := range es {
if pe.reply != _EMPTY_ {
n.sendReply(pe.reply, nil)
}
pe.returnToPool()
}
n.prop.recycle(&es)

case <-hb.C:
Expand Down
27 changes: 25 additions & 2 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package server

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -372,15 +373,15 @@ func TestNRGSwitchStateClearsQueues(t *testing.T) {
s := c.servers[0] // RunBasicJetStreamServer not available

n := &raft{
prop: newIPQueue[*Entry](s, "prop"),
prop: newIPQueue[*proposedEntry](s, "prop"),
resp: newIPQueue[*appendEntryResponse](s, "resp"),
leadc: make(chan bool, 1), // for switchState
}
n.state.Store(int32(Leader))
require_Equal(t, n.prop.len(), 0)
require_Equal(t, n.resp.len(), 0)

n.prop.push(&Entry{})
n.prop.push(&proposedEntry{&Entry{}, _EMPTY_})
n.resp.push(&appendEntryResponse{})
require_Equal(t, n.prop.len(), 1)
require_Equal(t, n.resp.len(), 1)
Expand Down Expand Up @@ -1631,3 +1632,25 @@ func TestNRGTruncateDownToCommitted(t *testing.T) {
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 2)
}

func TestNRGForwardProposalResponse(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

rg := c.createRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

n := rg.nonLeader().node().(*raft)
psubj := n.psubj

data := make([]byte, binary.MaxVarintLen64)
dn := binary.PutVarint(data, int64(123))

_, err := nc.Request(psubj, data[:dn], time.Second*5)
require_NoError(t, err)

rg.waitOnTotal(t, 123)
}

0 comments on commit 972646f

Please sign in to comment.