Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
NRG: Signal on initial applied messages, ensure 'expected per subject…
Browse files Browse the repository at this point in the history
…' consistency

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
MauriceVanVeen committed Dec 2, 2024

Unverified

The committer email address is not verified.
1 parent 211af6a commit 9861c16
Showing 6 changed files with 243 additions and 33 deletions.
20 changes: 20 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
@@ -1608,5 +1608,25 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamExpectedLastSeqPerSubjectNotReady",
"code": 503,
"error_code": 10163,
"description": "expected last sequence per subject temporarily unavailable",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamExpectedLastSeqPerSubjectConflict",
"code": 409,
"error_code": 10164,
"description": "subject for expected last sequence is in process",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
142 changes: 109 additions & 33 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
@@ -1278,7 +1278,7 @@ func (s *Server) checkForNRGOrphans() {

func (js *jetStream) monitorCluster() {
s, n := js.server(), js.getMetaGroup()
qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()
qch, rqch, lch, aq, aflrch := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ(), n.AppliedFloorC()

defer s.grWG.Done()

@@ -1424,6 +1424,9 @@ func (js *jetStream) monitorCluster() {
}
aq.recycle(&ces)

case <-aflrch:
// ignore

case isLeader = <-lch:
// Process the change.
js.processLeaderChange(isLeader)
@@ -2388,7 +2391,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}
}()

qch, mqch, lch, aq, uch, ourPeerId := n.QuitC(), mset.monitorQuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), meta.ID()
qch, mqch, lch, aq, uch, aflrch, ourPeerId := n.QuitC(), mset.monitorQuitC(), n.LeadChangeC(), n.ApplyQ(), mset.updateC(), n.AppliedFloorC(), meta.ID()

s.Debugf("Starting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group())
defer s.Debugf("Exiting stream monitor for '%s > %s' [%s]", sa.Client.serviceAccount(), sa.Config.Name, n.Group())
@@ -2610,6 +2613,16 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
doSnapshot()
}

case <-aflrch:
// Once leader is initially up-to-date, mark ready to process 'expected per subject' messages.
// This ensures consistency, since we could otherwise let multiple updates per subject through if one update
// was part of our log, and another got added shortly after leader change.
if mset != nil {
mset.clMu.Lock()
mset.expectedPerSubjectReady = true
mset.clMu.Unlock()
}

case isLeader = <-lch:
if isLeader {
if mset != nil && n != nil && sendSnapshot && !isRecovering {
@@ -3112,6 +3125,16 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
mset.clMu.Unlock()
}

// Clear expected per subject state after processing.
if mset.expectedPerSubjectSequence != nil {
mset.clMu.Lock()
if subj, found := mset.expectedPerSubjectSequence[lseq]; found {
delete(mset.expectedPerSubjectSequence, lseq)
delete(mset.expectedPerSubjectInProcess, subj)
}
mset.clMu.Unlock()
}

if err != nil {
if err == errLastSeqMismatch {

@@ -3349,9 +3372,13 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
return
}

// Clear inflight if we have it.
mset.clMu.Lock()
// Clear inflight if we have it.
mset.inflight = nil
// Clear expected per subject state.
mset.expectedPerSubjectReady = false
mset.expectedPerSubjectSequence = nil
mset.expectedPerSubjectInProcess = nil
mset.clMu.Unlock()

js.mu.Lock()
@@ -4874,7 +4901,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
// from underneath the one that is running since it will be the same raft node.
defer n.Stop()

qch, lch, aq, uch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), cc.meta.ID()
qch, lch, aq, uch, aflrch, ourPeerId := n.QuitC(), n.LeadChangeC(), n.ApplyQ(), o.updateC(), n.AppliedFloorC(), cc.meta.ID()

s.Debugf("Starting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group())
defer s.Debugf("Exiting consumer monitor for '%s > %s > %s' [%s]", o.acc.Name, ca.Stream, ca.Name, n.Group())
@@ -4988,6 +5015,10 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
}
}
aq.recycle(&ces)

case <-aflrch:
// ignore

case isLeader = <-lch:
if recovering && !isLeader {
js.setConsumerAssignmentRecovering(ca)
@@ -8072,35 +8103,6 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}
return err
}
// Expected last sequence per subject.
// We can check for last sequence per subject but only if the expected seq <= lseq.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq <= lseq {
// Allow override of the subject used for the check.
seqSubj := subject
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
seqSubj = optSubj
}

var smv StoreMsg
var fseq uint64
sm, err := store.LoadLastMsg(seqSubj, &smv)
if sm != nil {
fseq = sm.seq
}
if err == ErrStoreMsgNotFound && seq == 0 {
fseq, err = 0, nil
}
if err != nil || fseq != seq {
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastSequenceError(fseq)
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq)
}
}
// Expected stream name can also be pre-checked.
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
if canRespond {
@@ -8198,6 +8200,80 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}
}

if len(hdr) > 0 {
// Expected last sequence per subject.
if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil {
// Wait for initial stored but not applied messages to be applied, ensuring consistency before allowing updates.
if !mset.expectedPerSubjectReady {
// Could have set inflight above, cleanup here.
delete(mset.inflight, mset.clseq)
mset.clMu.Unlock()
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamExpectedLastSeqPerSubjectNotReadyError()
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return fmt.Errorf("expected last sequence per subject temporarily unavailable")
}

// Allow override of the subject used for the check.
seqSubj := subject
if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ {
seqSubj = optSubj
}

// If subject is already in process, block as otherwise we could have multiple messages inflight with same subject.
if _, found := mset.expectedPerSubjectInProcess[seqSubj]; found {
// Could have set inflight above, cleanup here.
delete(mset.inflight, mset.clseq)
mset.clMu.Unlock()
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamExpectedLastSeqPerSubjectConflictError()
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return fmt.Errorf("subject for expected last sequence is in process")
}

var smv StoreMsg
var fseq uint64
sm, err := store.LoadLastMsg(seqSubj, &smv)
if sm != nil {
fseq = sm.seq
}
if err == ErrStoreMsgNotFound && seq == 0 {
fseq, err = 0, nil
}
if err != nil || fseq != seq {
// Could have set inflight above, cleanup here.
delete(mset.inflight, mset.clseq)
mset.clMu.Unlock()
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastSequenceError(fseq)
b, _ := json.Marshal(resp)
outq.sendMsg(reply, b)
}
return fmt.Errorf("last sequence by subject mismatch: %d vs %d", seq, fseq)
}

// Track sequence and subject.
if mset.expectedPerSubjectSequence == nil {
mset.expectedPerSubjectSequence = make(map[uint64]string)
}
if mset.expectedPerSubjectInProcess == nil {
mset.expectedPerSubjectInProcess = make(map[string]struct{})
}
mset.expectedPerSubjectSequence[mset.clseq] = seqSubj
mset.expectedPerSubjectInProcess[seqSubj] = struct{}{}
}
}

esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), compressOK)
var mtKey uint64
if mt != nil {
54 changes: 54 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
@@ -5087,3 +5087,57 @@ func TestJetStreamClusterStreamAckMsgR3SignalsRemovedMsg(t *testing.T) {
return nil
})
}

func TestJetStreamClusterExpectedPerSubjectConsistency(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.LimitsPolicy,
Replicas: 3,
})
require_NoError(t, err)

s := c.streamLeader(globalAccountName, "TEST")
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)

// Block updates when not ready.
mset.clMu.Lock()
mset.expectedPerSubjectReady = false
mset.clMu.Unlock()
_, err = js.Publish("foo", nil, nats.ExpectLastSequencePerSubject(0))
require_Error(t, err, NewJSStreamExpectedLastSeqPerSubjectNotReadyError())

// Block updates when subject already in process.
mset.clMu.Lock()
mset.expectedPerSubjectReady = true
mset.expectedPerSubjectSequence = map[uint64]string{0: "foo"}
mset.expectedPerSubjectInProcess = map[string]struct{}{"foo": {}}
mset.clMu.Unlock()
_, err = js.Publish("foo", nil, nats.ExpectLastSequencePerSubject(0))
require_Error(t, err, NewJSStreamExpectedLastSeqPerSubjectConflictError())

// Allow updates when ready and subject not already in process.
mset.clMu.Lock()
mset.expectedPerSubjectReady = true
mset.expectedPerSubjectSequence = nil
mset.expectedPerSubjectInProcess = nil
mset.clMu.Unlock()
pa, err := js.Publish("foo", nil, nats.ExpectLastSequencePerSubject(0))
require_NoError(t, err)
require_Equal(t, pa.Sequence, 1)

// Should be cleaned up after publish.
mset.clMu.Lock()
defer mset.clMu.Unlock()
require_Len(t, len(mset.expectedPerSubjectSequence), 0)
require_Len(t, len(mset.expectedPerSubjectInProcess), 0)
}
28 changes: 28 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
@@ -356,6 +356,12 @@ const (
// JSStreamDuplicateMessageConflict duplicate message id is in process
JSStreamDuplicateMessageConflict ErrorIdentifier = 10158

// JSStreamExpectedLastSeqPerSubjectConflict subject for expected last sequence is in process
JSStreamExpectedLastSeqPerSubjectConflict ErrorIdentifier = 10164

// JSStreamExpectedLastSeqPerSubjectNotReady expected last sequence per subject temporarily unavailable
JSStreamExpectedLastSeqPerSubjectNotReady ErrorIdentifier = 10163

// JSStreamExternalApiOverlapErrF stream external api prefix {prefix} must not overlap with {subject}
JSStreamExternalApiOverlapErrF ErrorIdentifier = 10021

@@ -608,6 +614,8 @@ var (
JSStreamCreateErrF: {Code: 500, ErrCode: 10049, Description: "{err}"},
JSStreamDeleteErrF: {Code: 500, ErrCode: 10050, Description: "{err}"},
JSStreamDuplicateMessageConflict: {Code: 409, ErrCode: 10158, Description: "duplicate message id is in process"},
JSStreamExpectedLastSeqPerSubjectConflict: {Code: 409, ErrCode: 10164, Description: "subject for expected last sequence is in process"},
JSStreamExpectedLastSeqPerSubjectNotReady: {Code: 503, ErrCode: 10163, Description: "expected last sequence per subject temporarily unavailable"},
JSStreamExternalApiOverlapErrF: {Code: 400, ErrCode: 10021, Description: "stream external api prefix {prefix} must not overlap with {subject}"},
JSStreamExternalDelPrefixOverlapsErrF: {Code: 400, ErrCode: 10022, Description: "stream external delivery prefix {prefix} overlaps with stream subject {subject}"},
JSStreamGeneralErrorF: {Code: 500, ErrCode: 10051, Description: "{err}"},
@@ -1997,6 +2005,26 @@ func NewJSStreamDuplicateMessageConflictError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSStreamDuplicateMessageConflict]
}

// NewJSStreamExpectedLastSeqPerSubjectConflictError creates a new JSStreamExpectedLastSeqPerSubjectConflict error: "subject for expected last sequence is in process"
func NewJSStreamExpectedLastSeqPerSubjectConflictError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSStreamExpectedLastSeqPerSubjectConflict]
}

// NewJSStreamExpectedLastSeqPerSubjectNotReadyError creates a new JSStreamExpectedLastSeqPerSubjectNotReady error: "expected last sequence per subject temporarily unavailable"
func NewJSStreamExpectedLastSeqPerSubjectNotReadyError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSStreamExpectedLastSeqPerSubjectNotReady]
}

// NewJSStreamExternalApiOverlapError creates a new JSStreamExternalApiOverlapErrF error: "stream external api prefix {prefix} must not overlap with {subject}"
func NewJSStreamExternalApiOverlapError(prefix interface{}, subject interface{}, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Loading

0 comments on commit 9861c16

Please sign in to comment.