Skip to content

Commit

Permalink
Improve unpinning (#6078)
Browse files Browse the repository at this point in the history
Changes introduced:

**Do not set default pinned TTL if it's not a pinned client consumer**

In one case, we were not checking if given consumer config is a pinned
one when setting pinned TTL. That caused the TTL to be always set, which
in turn incremented API Level even if Priority Groups were not used.


**Rename advisories**

Advisories were following ADR, but ADR names were fixed.
This follows up the ADR names fix.


**Improve unpinning**

It was possible that the same client was picked after `unpin` call.
This commit makes sure that different waiting request will be picked.


Signed-off-by: Tomasz Pietrek <tomasz@nats.io>

---------

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema authored Nov 5, 2024
1 parent ec877ee commit 4122ab9
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 16 deletions.
26 changes: 18 additions & 8 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1593,9 +1593,9 @@ func (o *consumer) sendDeleteAdvisoryLocked() {
}

func (o *consumer) sendPinnedAdvisoryLocked(group string) {
e := JSStreamGroupPinnedAdvisory{
e := JSConsumerGroupPinnedAdvisory{
TypedEvent: TypedEvent{
Type: JSStreamGroupPinnedAdvisoryType,
Type: JSConsumerGroupPinnedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Expand All @@ -1617,9 +1617,9 @@ func (o *consumer) sendPinnedAdvisoryLocked(group string) {

}
func (o *consumer) sendUnpinnedAdvisoryLocked(group string, reason string) {
e := JSStreamGroupUnPinnedAdvisory{
e := JSConsumerGroupUnPinnedAdvisory{
TypedEvent: TypedEvent{
Type: JSStreamGroupPinnedAdvisoryType,
Type: JSConsumerGroupPinnedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Expand Down Expand Up @@ -3562,8 +3562,20 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {

if wr.expires.IsZero() || time.Now().Before(wr.expires) {
if needNewPin {
o.currentPinId = nuid.Next()
wr.priorityGroup.Id = o.currentPinId
if wr.priorityGroup.Id == _EMPTY_ {
o.currentPinId = nuid.Next()
wr.priorityGroup.Id = o.currentPinId
} else {
// There is pin id set, but not a matching one. Send a notification to the client and remove the request.
// Probably this is the old pin id.
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, []byte(JSPullRequestWrongPinID), nil, nil, 0))
o.waiting.removeCurrent()
if o.node != nil {
o.removeClusterPendingRequest(wr.reply)
}
wr.recycle()
continue
}
} else if o.currentPinId != _EMPTY_ {
// Check if we have a match on the currentNuid
if wr.priorityGroup != nil && wr.priorityGroup.Id == o.currentPinId {
Expand Down Expand Up @@ -3638,8 +3650,6 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
o.removeClusterPendingRequest(wr.reply)
}
wr.recycle()
// We did not find any wr, so let's reset the newly set pin.
o.currentPinId = _EMPTY_
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7455,7 +7455,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
cfg.MaxAckPending = JsDefaultMaxAckPending
}

if cfg.PinnedTTL == 0 {
if cfg.PriorityPolicy == PriorityPinnedClient && cfg.PinnedTTL == 0 {
cfg.PinnedTTL = JsDefaultPinnedTTL
}

Expand Down
101 changes: 99 additions & 2 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,7 +1567,7 @@ func TestJetStreamConsumerPinned(t *testing.T) {
nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.C", reply2, reqb)
require_NoError(t, err)

// This is the firs Pull Request, so it should becom the pinned one.
// This is the first Pull Request, so it should become the pinned one.
msg, err := replies.NextMsg(time.Second)
require_NoError(t, err)
require_NotNil(t, msg)
Expand Down Expand Up @@ -1700,7 +1700,7 @@ func TestJetStreamConsumerUnpinNoMessages(t *testing.T) {
PriorityGroups: []string{"A"},
PriorityPolicy: PriorityPinnedClient,
AckPolicy: AckExplicit,
PinnedTTL: 10 * time.Second,
PinnedTTL: 30 * time.Second,
})
require_NoError(t, err)

Expand Down Expand Up @@ -1761,6 +1761,103 @@ func TestJetStreamConsumerUnpinNoMessages(t *testing.T) {
require_NotEqual(t, msg.Header.Get("Nats-Pin-Id"), pinId)
}

// In some scenarios, if the next waiting request is the same as the old pinned, it could be picked as a new pin.
// This test replicates that behavior and checks if the new pin is different than the old one.
func TestJetStreamConsumerUnpinPickDifferentRequest(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

acc := s.GlobalAccount()

mset, err := acc.addStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: LimitsPolicy,
})
require_NoError(t, err)

_, err = mset.addConsumer(&ConsumerConfig{
Durable: "C",
FilterSubject: "foo",
PriorityGroups: []string{"A"},
PriorityPolicy: PriorityPinnedClient,
AckPolicy: AckExplicit,
PinnedTTL: 30 * time.Second,
})
require_NoError(t, err)

sendStreamMsg(t, nc, "foo", "data")

req := JSApiConsumerGetNextRequest{Batch: 5, Expires: 15 * time.Second, PriorityGroup: PriorityGroup{
Group: "A",
}}

reqBytes, err := json.Marshal(req)
require_NoError(t, err)

firstInbox := "FIRST"
firstReplies, err := nc.SubscribeSync(firstInbox)
require_NoError(t, err)
nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.C", firstInbox, reqBytes)

msg, err := firstReplies.NextMsg(1 * time.Second)
require_NoError(t, err)
pinId := msg.Header.Get("Nats-Pin-Id")
require_NotEqual(t, pinId, "")
fmt.Printf("INITIAL PIN: %v\n", msg.Header.Get("Nats-Pin-Id"))

reqPinned := JSApiConsumerGetNextRequest{Batch: 5, Expires: 15 * time.Second, PriorityGroup: PriorityGroup{
Group: "A",
Id: pinId,
}}
_, err = json.Marshal(reqPinned)
require_NoError(t, err)

secondInbox := "SECOND"
secondReplies, err := nc.SubscribeSync(secondInbox)
require_NoError(t, err)
nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.C", secondInbox, reqBytes)

_, err = secondReplies.NextMsg(1 * time.Second)
require_Error(t, err)

unpinRequest := func(t *testing.T, nc *nats.Conn, stream, consumer, group string) *ApiError {
var response JSApiConsumerUnpinResponse
request := JSApiConsumerUnpinRequest{Group: group}
requestData, err := json.Marshal(request)
require_NoError(t, err)
msg, err := nc.Request(fmt.Sprintf("$JS.API.CONSUMER.UNPIN.%s.%s", stream, consumer), requestData, time.Second*1)
require_NoError(t, err)
err = json.Unmarshal(msg.Data, &response)
require_NoError(t, err)
return response.Error
}

unpinRequest(t, nc, "TEST", "C", "A")
_, err = firstReplies.NextMsg(1 * time.Second)
// If there are no messages in the stream, do not expect unpin message to arrive.
// Advisory will be sent immediately, but messages with headers - only when there is anything to be sent.
require_Error(t, err)
// Send a new message to the stream.
sendStreamMsg(t, nc, "foo", "data")
// Check if the old pinned will get the information about bad pin.
msg, err = firstReplies.NextMsg(1 * time.Second)
require_NoError(t, err)
require_Equal(t, msg.Header.Get("Status"), "423")
// Make sure that the old pin is cleared.
require_Equal(t, msg.Header.Get("Nats-Pin-Id"), "")

// Try different wr.
msg, err = secondReplies.NextMsg(1 * time.Second)
require_NoError(t, err)
// Make sure that its pin is different than the old one and not empty.
require_NotEqual(t, msg.Header.Get("Nats-Pin-Id"), pinId)
require_NotEqual(t, msg.Header.Get("Nats-Pin-Id"), "")
}

func TestJetStreamConsumerUnpin(t *testing.T) {
single := RunBasicJetStreamServer(t)
defer single.Shutdown()
Expand Down
10 changes: 5 additions & 5 deletions server/jetstream_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ type JSConsumerQuorumLostAdvisory struct {
Domain string `json:"domain,omitempty"`
}

const JSStreamGroupPinnedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_group_pinned"
const JSConsumerGroupPinnedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_group_pinned"

// JSStreamGroupPinnedAdvisory that a group switched to a new pinned client
type JSStreamGroupPinnedAdvisory struct {
// JSConsumerGroupPinnedAdvisory that a group switched to a new pinned client
type JSConsumerGroupPinnedAdvisory struct {
TypedEvent
Account string `json:"account,omitempty"`
Stream string `json:"stream"`
Expand All @@ -286,8 +286,8 @@ type JSStreamGroupPinnedAdvisory struct {

const JSStreamGroupUnPinnedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_group_unpinned"

// JSStreamGroupUnPinnedAdvisory indicates that a pin was lost
type JSStreamGroupUnPinnedAdvisory struct {
// JSConsumerGroupUnPinnedAdvisory indicates that a pin was lost
type JSConsumerGroupUnPinnedAdvisory struct {
TypedEvent
Account string `json:"account,omitempty"`
Stream string `json:"stream"`
Expand Down

0 comments on commit 4122ab9

Please sign in to comment.