Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Consistently report AckFloor when replicated #6232

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2948,23 +2948,20 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
}
}

// If we are replicated and we are not the leader or we are filtered, we need to pull certain data from our store.
isLeader := o.isLeader()
if rg != nil && rg.node != nil && o.store != nil && (!isLeader || o.isFiltered()) {
// If we are replicated, we need to pull certain data from our store.
if rg != nil && rg.node != nil && o.store != nil {
state, err := o.store.BorrowState()
if err != nil {
o.mu.Unlock()
return nil
}
if !isLeader {
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
// If we are the leader we could have o.sseq that is skipped ahead.
// To maintain consistency in reporting (e.g. jsz) we always take the state for our delivered/ackfloor stream sequence.
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
if !o.isLeader() {
info.NumAckPending = len(state.Pending)
info.NumRedelivered = len(state.Redelivered)
} else {
// Since we are filtered and we are the leader we could have o.sseq that is skipped ahead.
// To maintain consistency in reporting (e.g. jsz) we take the state for our delivered stream sequence.
info.Delivered.Stream = state.Delivered.Stream
}
}

Expand Down
62 changes: 38 additions & 24 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3575,7 +3575,7 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
sub, err := js.PullSubscribe("foo", "C")
require_NoError(t, err)

// Publish as many messages as the ack floor check threshold +5.
// Publish as many messages as the ack floor check threshold +5 (what we set ackfloor to later).
totalMessages := 55
for i := 0; i < totalMessages; i++ {
sendStreamMsg(t, nc, "foo", "HELLO")
Expand All @@ -3585,19 +3585,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
_, err = sub.Fetch(10)
require_NoError(t, err)

// We will grab the state with delivered being 10 and ackfloor being 0 directly.
cl := c.consumerLeader(globalAccountName, "TEST", "C")
require_NotNil(t, cl)

mset, err := cl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("C")
require_NotNil(t, o)
o.mu.RLock()
state, err := o.store.State()
o.mu.RUnlock()
require_NoError(t, err)
require_NotNil(t, state)
// We will initialize the state with delivered being 10 and ackfloor being 0 directly.
// Fetch will asynchronously propagate this state, so can't reliably request this from the leader immediately.
state := &ConsumerState{Delivered: SequencePair{Consumer: 10, Stream: 10}}

// Now let messages expire.
checkFor(t, 5*time.Second, time.Second, func() error {
Expand Down Expand Up @@ -3634,17 +3624,35 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
require_NoError(t, o.raftNode().InstallSnapshot(snap))
}

cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C")
cl := c.consumerLeader(globalAccountName, "TEST", "C")
require_NotNil(t, cl)
err = cl.JetStreamStepdownConsumer(globalAccountName, "TEST", "C")
require_NoError(t, err)
c.waitOnConsumerLeader(globalAccountName, "TEST", "C")

checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
ci, err := js.ConsumerInfo("TEST", "C")
if err != nil {
return err
}
// Replicated state should stay the same.
if ci.AckFloor.Stream != 5 && ci.AckFloor.Consumer != 5 {
return fmt.Errorf("replicated AckFloor not correct, expected %d, got %+v", 5, ci.AckFloor)
}

cl = c.consumerLeader(globalAccountName, "TEST", "C")
mset, err := cl.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("C")
require_NotNil(t, o)
o.mu.RLock()
defer o.mu.RUnlock()

// Make sure we catch this and adjust.
if ci.AckFloor.Stream == uint64(totalMessages) && ci.AckFloor.Consumer == 10 {
return nil
if o.asflr != uint64(totalMessages) && o.adflr != 10 {
return fmt.Errorf("leader AckFloor not correct, expected %d, got %+v", 10, ci.AckFloor)
}
return fmt.Errorf("AckFloor not correct, expected %d, got %+v", totalMessages, ci.AckFloor)
return nil
})
}

Expand Down Expand Up @@ -5458,12 +5466,18 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {

// Want to compare sans cluster details which we know will change due to leader change.
// Also last activity for delivered can be slightly off so nil out as well.
checkConsumerInfo := func(a, b *nats.ConsumerInfo) {
checkConsumerInfo := func(a, b *nats.ConsumerInfo, replicated bool) {
t.Helper()
require_Equal(t, a.Delivered.Consumer, 10)
require_Equal(t, a.Delivered.Stream, 10)
require_Equal(t, a.AckFloor.Consumer, 10)
require_Equal(t, a.AckFloor.Stream, 10)
// If replicated, agreed upon state is used. Otherwise, o.asflr and o.adflr would be skipped ahead for R1.
if replicated {
require_Equal(t, a.AckFloor.Consumer, 0)
require_Equal(t, a.AckFloor.Stream, 0)
} else {
require_Equal(t, a.AckFloor.Consumer, 10)
require_Equal(t, a.AckFloor.Stream, 10)
}
require_Equal(t, a.NumPending, 40)
require_Equal(t, a.NumRedelivered, 0)
a.Cluster, b.Cluster = nil, nil
Expand All @@ -5473,7 +5487,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
}
}

checkConsumerInfo(cia, cib)
checkConsumerInfo(cia, cib, true)

// Memory based.
sub, err = js.PullSubscribe("foo", "mem",
Expand Down Expand Up @@ -5503,7 +5517,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
cib, err = js.ConsumerInfo("TEST", "mem")
require_NoError(t, err)

checkConsumerInfo(cia, cib)
checkConsumerInfo(cia, cib, true)

// Now file based but R1 and server restart.
sub, err = js.PullSubscribe("foo", "r1",
Expand Down Expand Up @@ -5537,7 +5551,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
// Created can skew a small bit due to server restart, this is expected.
now := time.Now()
cia.Created, cib.Created = now, now
checkConsumerInfo(cia, cib)
checkConsumerInfo(cia, cib, false)
}

func TestJetStreamClusterConsumerDefaultsFromStream(t *testing.T) {
Expand Down
16 changes: 11 additions & 5 deletions server/jetstream_super_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1665,14 +1665,20 @@ func TestJetStreamSuperClusterConsumerDeliverNewBug(t *testing.T) {
}

c.waitOnConsumerLeader("$G", "T", "d")
ci, err = js.ConsumerInfo("T", "d")

cl := c.consumerLeader(globalAccountName, "T", "d")
mset, err := cl.GlobalAccount().lookupStream("T")
require_NoError(t, err)
o := mset.lookupConsumer("d")
require_NotNil(t, o)
o.mu.RLock()
defer o.mu.RUnlock()

if ci.Delivered.Consumer != 0 || ci.Delivered.Stream != 100 {
t.Fatalf("Incorrect consumer delivered info: %+v", ci.Delivered)
if o.dseq-1 != 0 || o.sseq-1 != 100 {
t.Fatalf("Incorrect consumer delivered info: dseq=%d, sseq=%d", o.dseq-1, o.sseq-1)
}
if ci.NumPending != 0 {
t.Fatalf("Did not expect NumPending, got %d", ci.NumPending)
if np := o.checkNumPending(); np != 0 {
t.Fatalf("Did not expect NumPending, got %d", np)
}
}

Expand Down
Loading