Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] JetStream catchup may return/delete too much #6213

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9197,20 +9197,26 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbBelowMax(); seq++ {
var sm *StoreMsg
var err error
// Is we should use load next do so here.
// If we should use load next do so here.
if useLoadNext {
var nseq uint64
sm, nseq, err = mset.store.LoadNextMsg(fwcs, true, seq, &smv)
if err == nil && nseq > seq {
// If we jumped over the requested last sequence, clamp it down.
// Otherwise, we would send too much to the follower.
if nseq > last {
nseq = last
sm = nil
}
dr.First, dr.Num = seq, nseq-seq
// Jump ahead
seq = nseq
} else if err == ErrStoreEOF {
dr.First, dr.Num = seq, state.LastSeq-seq
dr.First, dr.Num = seq, last-seq
// Clear EOF here for normal processing.
err = nil
// Jump ahead
seq = state.LastSeq
seq = last
}
} else {
sm, err = mset.store.LoadMsg(seq, &smv)
Expand Down
90 changes: 90 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6725,6 +6725,96 @@ func TestJetStreamConsumerAckOutOfBounds(t *testing.T) {
require_Equal(t, ci.AckFloor.Stream, 1)
}

func TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes(t *testing.T) {
tests := []struct {
title string
catchupRequest *streamSyncRequest
setup func(js nats.JetStreamContext)
assert func(sub *nats.Subscription)
}{
{
title: "within-delete-gap",
setup: func(js nats.JetStreamContext) {},
},
{
title: "EOF",
setup: func(js nats.JetStreamContext) {
err := js.DeleteMsg("TEST", 100)
require_NoError(t, err)
},
},
}

for _, test := range tests {
t.Run(test.title, func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo", "bar"},
Replicas: 3,
})
require_NoError(t, err)

// Starts and ends with subject "foo", we'll purge so there's a large gap of deletes in the middle.
// This should force runCatchup to use LoadNextMsg instead of LoadMsg.
for i := 0; i < 100; i++ {
subject := "bar"
if i == 0 || i == 99 {
subject = "foo"
}
_, err = js.Publish(subject, nil)
require_NoError(t, err)
}
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Subject: "bar"})
require_NoError(t, err)

// Optionally run some extra setup.
test.setup(js)

// Reconnect to stream leader.
l := c.streamLeader(globalAccountName, "TEST")
nc.Close()
nc, _ = jsClientConnect(t, l, nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

// Setup wiretap and grab stream.
sendSubject := "test-wiretap"
sub, err := nc.SubscribeSync(sendSubject)
require_NoError(t, err)
err = nc.Flush() // Must flush, otherwise our subscription could be too late.
require_NoError(t, err)
acc, err := l.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)

// Run custom catchup request and the test's asserts.
sreq := &streamSyncRequest{Peer: "peer", FirstSeq: 5, LastSeq: 5, DeleteRangesOk: true}
require_True(t, mset.srv.startGoRoutine(func() { mset.runCatchup(sendSubject, sreq) }))

// Our first message should be a skip msg.
msg, err := sub.NextMsg(time.Second)
require_NoError(t, err)
require_Equal(t, entryOp(msg.Data[0]), streamMsgOp)
subj, _, _, _, seq, ts, err := decodeStreamMsg(msg.Data[1:])
require_NoError(t, err)
require_Equal(t, seq, 5)
require_Equal(t, subj, _EMPTY_)
require_Equal(t, ts, 0)

// And end with EOF.
msg, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_Len(t, len(msg.Data), 0)
})
}
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
Loading