From 4122ab992c1ab6dca5508176ea1af8f34211f32a Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Tue, 5 Nov 2024 22:10:31 +0100 Subject: [PATCH] Improve unpinning (#6078) Changes introduced: **Do not set default pinned TTL if it's not a pinned client consumer** In one case, we were not checking if given consumer config is a pinned one when setting pinned TTL. That caused the TTL to be always set, which in turn incremented API Level even if Priority Groups were not used. **Rename advisories** Advisories were following ADR, but ADR names were fixed. This follows up the ADR names fix. **Improve unpinning** It was possible that the same client was picked after `unpin` call. This commit makes sure that different waiting request will be picked. Signed-off-by: Tomasz Pietrek --------- Signed-off-by: Tomasz Pietrek --- server/consumer.go | 26 +++++--- server/jetstream_cluster.go | 2 +- server/jetstream_consumer_test.go | 101 +++++++++++++++++++++++++++++- server/jetstream_events.go | 10 +-- 4 files changed, 123 insertions(+), 16 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 1d739f3bf6f..13b8e97edb4 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1593,9 +1593,9 @@ func (o *consumer) sendDeleteAdvisoryLocked() { } func (o *consumer) sendPinnedAdvisoryLocked(group string) { - e := JSStreamGroupPinnedAdvisory{ + e := JSConsumerGroupPinnedAdvisory{ TypedEvent: TypedEvent{ - Type: JSStreamGroupPinnedAdvisoryType, + Type: JSConsumerGroupPinnedAdvisoryType, ID: nuid.Next(), Time: time.Now().UTC(), }, @@ -1617,9 +1617,9 @@ func (o *consumer) sendPinnedAdvisoryLocked(group string) { } func (o *consumer) sendUnpinnedAdvisoryLocked(group string, reason string) { - e := JSStreamGroupUnPinnedAdvisory{ + e := JSConsumerGroupUnPinnedAdvisory{ TypedEvent: TypedEvent{ - Type: JSStreamGroupPinnedAdvisoryType, + Type: JSConsumerGroupPinnedAdvisoryType, ID: nuid.Next(), Time: time.Now().UTC(), }, @@ -3562,8 +3562,20 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest { if wr.expires.IsZero() || time.Now().Before(wr.expires) { if needNewPin { - o.currentPinId = nuid.Next() - wr.priorityGroup.Id = o.currentPinId + if wr.priorityGroup.Id == _EMPTY_ { + o.currentPinId = nuid.Next() + wr.priorityGroup.Id = o.currentPinId + } else { + // There is pin id set, but not a matching one. Send a notification to the client and remove the request. + // Probably this is the old pin id. + o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, []byte(JSPullRequestWrongPinID), nil, nil, 0)) + o.waiting.removeCurrent() + if o.node != nil { + o.removeClusterPendingRequest(wr.reply) + } + wr.recycle() + continue + } } else if o.currentPinId != _EMPTY_ { // Check if we have a match on the currentNuid if wr.priorityGroup != nil && wr.priorityGroup.Id == o.currentPinId { @@ -3638,8 +3650,6 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest { o.removeClusterPendingRequest(wr.reply) } wr.recycle() - // We did not find any wr, so let's reset the newly set pin. - o.currentPinId = _EMPTY_ } return nil diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 0f47e4e9439..5eb6d9fa97c 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7455,7 +7455,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec cfg.MaxAckPending = JsDefaultMaxAckPending } - if cfg.PinnedTTL == 0 { + if cfg.PriorityPolicy == PriorityPinnedClient && cfg.PinnedTTL == 0 { cfg.PinnedTTL = JsDefaultPinnedTTL } diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 5452ecd3e17..7d5b2f69863 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1567,7 +1567,7 @@ func TestJetStreamConsumerPinned(t *testing.T) { nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.C", reply2, reqb) require_NoError(t, err) - // This is the firs Pull Request, so it should becom the pinned one. + // This is the first Pull Request, so it should become the pinned one. msg, err := replies.NextMsg(time.Second) require_NoError(t, err) require_NotNil(t, msg) @@ -1700,7 +1700,7 @@ func TestJetStreamConsumerUnpinNoMessages(t *testing.T) { PriorityGroups: []string{"A"}, PriorityPolicy: PriorityPinnedClient, AckPolicy: AckExplicit, - PinnedTTL: 10 * time.Second, + PinnedTTL: 30 * time.Second, }) require_NoError(t, err) @@ -1761,6 +1761,103 @@ func TestJetStreamConsumerUnpinNoMessages(t *testing.T) { require_NotEqual(t, msg.Header.Get("Nats-Pin-Id"), pinId) } +// In some scenarios, if the next waiting request is the same as the old pinned, it could be picked as a new pin. +// This test replicates that behavior and checks if the new pin is different than the old one. +func TestJetStreamConsumerUnpinPickDifferentRequest(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + acc := s.GlobalAccount() + + mset, err := acc.addStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: LimitsPolicy, + }) + require_NoError(t, err) + + _, err = mset.addConsumer(&ConsumerConfig{ + Durable: "C", + FilterSubject: "foo", + PriorityGroups: []string{"A"}, + PriorityPolicy: PriorityPinnedClient, + AckPolicy: AckExplicit, + PinnedTTL: 30 * time.Second, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "foo", "data") + + req := JSApiConsumerGetNextRequest{Batch: 5, Expires: 15 * time.Second, PriorityGroup: PriorityGroup{ + Group: "A", + }} + + reqBytes, err := json.Marshal(req) + require_NoError(t, err) + + firstInbox := "FIRST" + firstReplies, err := nc.SubscribeSync(firstInbox) + require_NoError(t, err) + nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.C", firstInbox, reqBytes) + + msg, err := firstReplies.NextMsg(1 * time.Second) + require_NoError(t, err) + pinId := msg.Header.Get("Nats-Pin-Id") + require_NotEqual(t, pinId, "") + fmt.Printf("INITIAL PIN: %v\n", msg.Header.Get("Nats-Pin-Id")) + + reqPinned := JSApiConsumerGetNextRequest{Batch: 5, Expires: 15 * time.Second, PriorityGroup: PriorityGroup{ + Group: "A", + Id: pinId, + }} + _, err = json.Marshal(reqPinned) + require_NoError(t, err) + + secondInbox := "SECOND" + secondReplies, err := nc.SubscribeSync(secondInbox) + require_NoError(t, err) + nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.C", secondInbox, reqBytes) + + _, err = secondReplies.NextMsg(1 * time.Second) + require_Error(t, err) + + unpinRequest := func(t *testing.T, nc *nats.Conn, stream, consumer, group string) *ApiError { + var response JSApiConsumerUnpinResponse + request := JSApiConsumerUnpinRequest{Group: group} + requestData, err := json.Marshal(request) + require_NoError(t, err) + msg, err := nc.Request(fmt.Sprintf("$JS.API.CONSUMER.UNPIN.%s.%s", stream, consumer), requestData, time.Second*1) + require_NoError(t, err) + err = json.Unmarshal(msg.Data, &response) + require_NoError(t, err) + return response.Error + } + + unpinRequest(t, nc, "TEST", "C", "A") + _, err = firstReplies.NextMsg(1 * time.Second) + // If there are no messages in the stream, do not expect unpin message to arrive. + // Advisory will be sent immediately, but messages with headers - only when there is anything to be sent. + require_Error(t, err) + // Send a new message to the stream. + sendStreamMsg(t, nc, "foo", "data") + // Check if the old pinned will get the information about bad pin. + msg, err = firstReplies.NextMsg(1 * time.Second) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "423") + // Make sure that the old pin is cleared. + require_Equal(t, msg.Header.Get("Nats-Pin-Id"), "") + + // Try different wr. + msg, err = secondReplies.NextMsg(1 * time.Second) + require_NoError(t, err) + // Make sure that its pin is different than the old one and not empty. + require_NotEqual(t, msg.Header.Get("Nats-Pin-Id"), pinId) + require_NotEqual(t, msg.Header.Get("Nats-Pin-Id"), "") +} + func TestJetStreamConsumerUnpin(t *testing.T) { single := RunBasicJetStreamServer(t) defer single.Shutdown() diff --git a/server/jetstream_events.go b/server/jetstream_events.go index da262f72bc1..3480337bdbe 100644 --- a/server/jetstream_events.go +++ b/server/jetstream_events.go @@ -271,10 +271,10 @@ type JSConsumerQuorumLostAdvisory struct { Domain string `json:"domain,omitempty"` } -const JSStreamGroupPinnedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_group_pinned" +const JSConsumerGroupPinnedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_group_pinned" -// JSStreamGroupPinnedAdvisory that a group switched to a new pinned client -type JSStreamGroupPinnedAdvisory struct { +// JSConsumerGroupPinnedAdvisory that a group switched to a new pinned client +type JSConsumerGroupPinnedAdvisory struct { TypedEvent Account string `json:"account,omitempty"` Stream string `json:"stream"` @@ -286,8 +286,8 @@ type JSStreamGroupPinnedAdvisory struct { const JSStreamGroupUnPinnedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_group_unpinned" -// JSStreamGroupUnPinnedAdvisory indicates that a pin was lost -type JSStreamGroupUnPinnedAdvisory struct { +// JSConsumerGroupUnPinnedAdvisory indicates that a pin was lost +type JSConsumerGroupUnPinnedAdvisory struct { TypedEvent Account string `json:"account,omitempty"` Stream string `json:"stream"`