Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NRG: Support for replying to forwarded proposals #6157

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 44 additions & 11 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ type raft struct {
hcommit uint64 // The commit at the time that applies were paused
pobserver bool // Whether we were an observer at the time that applies were paused

prop *ipQueue[*Entry] // Proposals
prop *ipQueue[*proposedEntry] // Proposals
entry *ipQueue[*appendEntry] // Append entries
resp *ipQueue[*appendEntryResponse] // Append entries responses
apply *ipQueue[*CommittedEntry] // Apply queue (committed entries to be passed to upper layer)
Expand All @@ -211,6 +211,11 @@ type raft struct {
quit chan struct{} // Raft group shutdown
}

type proposedEntry struct {
*Entry
reply string // Optional, to respond once proposal handled
}

// cacthupState structure that holds our subscription, and catchup term and index
// as well as starting term and index and how many updates we have seen.
type catchupState struct {
Expand Down Expand Up @@ -385,7 +390,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
quit: make(chan struct{}),
reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"),
votes: newIPQueue[*voteResponse](s, qpfx+"vresp"),
prop: newIPQueue[*Entry](s, qpfx+"entry"),
prop: newIPQueue[*proposedEntry](s, qpfx+"entry"),
entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"),
resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"),
apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"),
Expand Down Expand Up @@ -811,7 +816,7 @@ func (n *raft) Propose(data []byte) error {
if werr := n.werr; werr != nil {
return werr
}
n.prop.push(newEntry(EntryNormal, data))
n.prop.push(newProposedEntry(newEntry(EntryNormal, data), _EMPTY_))
return nil
}

Expand All @@ -830,20 +835,21 @@ func (n *raft) ProposeMulti(entries []*Entry) error {
return werr
}
for _, e := range entries {
n.prop.push(e)
n.prop.push(newProposedEntry(e, _EMPTY_))
}
return nil
}

// ForwardProposal will forward the proposal to the leader if known.
// If we are the leader this is the same as calling propose.
// FIXME(dlc) - We could have a reply subject and wait for a response
// for retries, but would need to not block and be in separate Go routine.
func (n *raft) ForwardProposal(entry []byte) error {
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
if n.Leader() {
return n.Propose(entry)
}

// TODO: Currently we do not set a reply subject, even though we are
// now capable of responding. Do this once enough time has passed,
// i.e. maybe in 2.12.
n.sendRPC(n.psubj, _EMPTY_, entry)
return nil
}
Expand All @@ -862,7 +868,7 @@ func (n *raft) ProposeAddPeer(peer string) error {
prop := n.prop
n.RUnlock()

prop.push(newEntry(EntryAddPeer, []byte(peer)))
prop.push(newProposedEntry(newEntry(EntryAddPeer, []byte(peer)), _EMPTY_))
return nil
}

Expand Down Expand Up @@ -898,7 +904,7 @@ func (n *raft) ProposeRemovePeer(peer string) error {
// peer remove and then notifying the rest of the group that the
// peer was removed.
if isLeader {
prop.push(newEntry(EntryRemovePeer, []byte(peer)))
prop.push(newProposedEntry(newEntry(EntryRemovePeer, []byte(peer)), _EMPTY_))
n.doRemovePeerAsLeader(peer)
return nil
}
Expand Down Expand Up @@ -2177,6 +2183,26 @@ func (ae *appendEntry) returnToPool() {
aePool.Put(ae)
}

// Pool for proposedEntry re-use.
var pePool = sync.Pool{
New: func() any {
return &proposedEntry{}
},
}

// Create a new proposedEntry.
func newProposedEntry(entry *Entry, reply string) *proposedEntry {
pe := pePool.Get().(*proposedEntry)
pe.Entry, pe.reply = entry, reply
return pe
}

// Will return this proosed entry.
func (pe *proposedEntry) returnToPool() {
pe.Entry, pe.reply = nil, _EMPTY_
pePool.Put(pe)
}

type EntryType uint8

const (
Expand Down Expand Up @@ -2386,7 +2412,7 @@ func (n *raft) handleForwardedRemovePeerProposal(sub *subscription, c *client, _

// Need to copy since this is underlying client/route buffer.
peer := copyBytes(msg)
prop.push(newEntry(EntryRemovePeer, peer))
prop.push(newProposedEntry(newEntry(EntryRemovePeer, peer), reply))
}

// Called when a peer has forwarded a proposal.
Expand All @@ -2407,7 +2433,7 @@ func (n *raft) handleForwardedProposal(sub *subscription, c *client, _ *Account,
return
}

prop.push(newEntry(EntryNormal, msg))
prop.push(newProposedEntry(newEntry(EntryNormal, msg), reply))
}

func (n *raft) runAsLeader() {
Expand Down Expand Up @@ -2475,7 +2501,7 @@ func (n *raft) runAsLeader() {
if b.Type == EntryRemovePeer {
n.doRemovePeerAsLeader(string(b.Data))
}
entries = append(entries, b)
entries = append(entries, b.Entry)
// Increment size.
sz += len(b.Data) + 1
// If below thresholds go ahead and send.
Expand All @@ -2491,6 +2517,13 @@ func (n *raft) runAsLeader() {
if len(entries) > 0 {
n.sendAppendEntry(entries)
}
// Respond to any proposals waiting for a confirmation.
for _, pe := range es {
if pe.reply != _EMPTY_ {
n.sendReply(pe.reply, nil)
}
pe.returnToPool()
}
n.prop.recycle(&es)

case <-hb.C:
Expand Down
27 changes: 25 additions & 2 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package server

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -372,15 +373,15 @@ func TestNRGSwitchStateClearsQueues(t *testing.T) {
s := c.servers[0] // RunBasicJetStreamServer not available

n := &raft{
prop: newIPQueue[*Entry](s, "prop"),
prop: newIPQueue[*proposedEntry](s, "prop"),
resp: newIPQueue[*appendEntryResponse](s, "resp"),
leadc: make(chan bool, 1), // for switchState
}
n.state.Store(int32(Leader))
require_Equal(t, n.prop.len(), 0)
require_Equal(t, n.resp.len(), 0)

n.prop.push(&Entry{})
n.prop.push(&proposedEntry{&Entry{}, _EMPTY_})
n.resp.push(&appendEntryResponse{})
require_Equal(t, n.prop.len(), 1)
require_Equal(t, n.resp.len(), 1)
Expand Down Expand Up @@ -1631,3 +1632,25 @@ func TestNRGTruncateDownToCommitted(t *testing.T) {
n.processAppendEntry(aeHeartbeat, n.aesub)
require_Equal(t, n.commit, 2)
}

func TestNRGForwardProposalResponse(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

rg := c.createRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

n := rg.nonLeader().node().(*raft)
psubj := n.psubj

data := make([]byte, binary.MaxVarintLen64)
dn := binary.PutVarint(data, int64(123))

_, err := nc.Request(psubj, data[:dn], time.Second*5)
require_NoError(t, err)

rg.waitOnTotal(t, 123)
}