Skip to content

Commit

Permalink
[FIXED] JetStream catchup may return/delete too much (#6213)
Browse files Browse the repository at this point in the history
JetStream catchup could have a follower delete too much because it could
send too large delete ranges.

This was due to `LoadNextMsg` being used and not checking if the
requested `last` sequence was exceeded. This could happen if
`LoadNextMsg` was used AND `last < state.LastSeq` AND there's at least
one deleted message at sequence `last+1`.

Given you have a stream with a message at sequence 1 and 100, with a gap
in the middle, there were two issues when requesting only to receive
what message is stored at sequence 5 (for example):
- A delete range would be sent with sequence 5 and number of deletes of
95, this should be 1 delete.
- If the message at sequence 100 was deleted it would also return the
same as above, this should also be only 1 delete.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Dec 4, 2024
2 parents d7239ac + 74ffe24 commit 7f99c52
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 3 deletions.
12 changes: 9 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9197,20 +9197,26 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbBelowMax(); seq++ {
var sm *StoreMsg
var err error
// Is we should use load next do so here.
// If we should use load next do so here.
if useLoadNext {
var nseq uint64
sm, nseq, err = mset.store.LoadNextMsg(fwcs, true, seq, &smv)
if err == nil && nseq > seq {
// If we jumped over the requested last sequence, clamp it down.
// Otherwise, we would send too much to the follower.
if nseq > last {
nseq = last
sm = nil
}
dr.First, dr.Num = seq, nseq-seq
// Jump ahead
seq = nseq
} else if err == ErrStoreEOF {
dr.First, dr.Num = seq, state.LastSeq-seq
dr.First, dr.Num = seq, last-seq
// Clear EOF here for normal processing.
err = nil
// Jump ahead
seq = state.LastSeq
seq = last
}
} else {
sm, err = mset.store.LoadMsg(seq, &smv)
Expand Down
90 changes: 90 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6725,6 +6725,96 @@ func TestJetStreamConsumerAckOutOfBounds(t *testing.T) {
require_Equal(t, ci.AckFloor.Stream, 1)
}

func TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes(t *testing.T) {
tests := []struct {
title string
catchupRequest *streamSyncRequest
setup func(js nats.JetStreamContext)
assert func(sub *nats.Subscription)
}{
{
title: "within-delete-gap",
setup: func(js nats.JetStreamContext) {},
},
{
title: "EOF",
setup: func(js nats.JetStreamContext) {
err := js.DeleteMsg("TEST", 100)
require_NoError(t, err)
},
},
}

for _, test := range tests {
t.Run(test.title, func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
require_NoError(t, err)

// Starts and ends with subject "foo", we'll purge so there's a large gap of deletes in the middle.
// This should force runCatchup to use LoadNextMsg instead of LoadMsg.
for i := 0; i < 100; i++ {
subject := "bar"
if i == 0 || i == 99 {
subject = "foo"
}
_, err = js.Publish(subject, nil)
require_NoError(t, err)
}
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "bar"})
require_NoError(t, err)

// Optionally run some extra setup.
test.setup(js)

// Reconnect to stream leader.
l := c.streamLeader(globalAccountName, "TEST")
nc.Close()
nc, _ = jsClientConnect(t, l, nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

// Setup wiretap and grab stream.
sendSubject := "test-wiretap"
sub, err := nc.SubscribeSync(sendSubject)
require_NoError(t, err)
err = nc.Flush() // Must flush, otherwise our subscription could be too late.
require_NoError(t, err)
acc, err := l.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)

// Run custom catchup request and the test's asserts.
sreq := &streamSyncRequest{Peer: "peer", FirstSeq: 5, LastSeq: 5, DeleteRangesOk: true}
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq) }))

// Our first message should be a skip msg.
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, entryOp(msg.Data[0]), streamMsgOp)
subj, _, _, _, seq, ts, err := decodeStreamMsg(msg.Data[1:])
require_NoError(t, err)
require_Equal(t, seq, 5)
require_Equal(t, subj, _EMPTY_)
require_Equal(t, ts, 0)

// And end with EOF.
msg, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_Len(t, len(msg.Data), 0)
})
}
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down

0 comments on commit 7f99c52

Please sign in to comment.