Skip to content

Commit

Permalink
[FIXED] Hold JS lock when accessing stream assignment (#6173)
Browse files Browse the repository at this point in the history
Fixes this data race:
```
WARNING: DATA RACE
Read at 0x00c0034a4720 by goroutine 19950:
  runtime.mapiterinit()
      /home/travis/sdk/go1.23.3/src/runtime/map.go:877 +0x0
  github.com/nats-io/nats-server/v2/server.TestJetStreamClusterMaxConsumersMultipleConcurrentRequests()
      /home/travis/build/nats-io/nats-server/server/jetstream_cluster_2_test.go:2080 +0xcc6
  testing.tRunner()
      /home/travis/sdk/go1.23.3/src/testing/testing.go:1690 +0x226
  testing.(*T).Run.gowrap1()
      /home/travis/sdk/go1.23.3/src/testing/testing.go:1743 +0x44
Previous write at 0x00c0034a4720 by goroutine 20101:
  runtime.mapassign_faststr()
      /home/travis/sdk/go1.23.3/src/runtime/map_faststr.go:223 +0x0
  github.com/nats-io/nats-server/v2/server.(*jetStream).processConsumerAssignment()
      /home/travis/build/nats-io/nats-server/server/jetstream_cluster.go:4281 +0x834
  github.com/nats-io/nats-server/v2/server.(*jetStream).applyMetaEntries()
      /home/travis/build/nats-io/nats-server/server/jetstream_cluster.go:2015 +0x78a
  github.com/nats-io/nats-server/v2/server.(*jetStream).monitorCluster()
      /home/travis/build/nats-io/nats-server/server/jetstream_cluster.go:1413 +0x1194
  github.com/nats-io/nats-server/v2/server.(*jetStream).monitorCluster-fm()
      <autogenerated>:1 +0x33
  github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine.func1()
      /home/travis/build/nats-io/nats-server/server/server.go:3885 +0x59
```

`js.streamAssignment` mentions the lock should be held. Also found some
other places that didn't lock.
```go
// Will lookup a stream assignment.
// Lock should be held.
func (js *jetStream) streamAssignment(account, stream string) (sa *streamAssignment) {
```

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Nov 25, 2024
2 parents c909acb + 499686a commit 4978da5
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 1 deletion.
8 changes: 8 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3437,19 +3437,23 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account
}

// First check if the stream and consumer is there.
js.mu.RLock()
sa := js.streamAssignment(acc.Name, stream)
if sa == nil {
js.mu.RUnlock()
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

ca, ok := sa.consumers[consumer]
if !ok || ca == nil {
js.mu.RUnlock()
resp.Error = NewJSConsumerNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
js.mu.RUnlock()

// Then check if we are the leader.
mset, err := acc.lookupStream(stream)
Expand Down Expand Up @@ -4886,21 +4890,25 @@ func (s *Server) jsConsumerPauseRequest(sub *subscription, c *client, _ *Account
consumer := consumerNameFromSubject(subject)

if isClustered {
js.mu.RLock()
sa := js.streamAssignment(acc.Name, stream)
if sa == nil {
js.mu.RUnlock()
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

ca, ok := sa.consumers[consumer]
if !ok || ca == nil {
js.mu.RUnlock()
resp.Error = NewJSConsumerNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

nca := *ca
js.mu.RUnlock()
pauseUTC := req.PauseUntil.UTC()
if !pauseUTC.IsZero() {
nca.Config.PauseUntil = &pauseUTC
Expand Down
3 changes: 2 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4246,8 +4246,10 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
return
}

js.mu.Lock()
sa := js.streamAssignment(accName, stream)
if sa == nil {
js.mu.Unlock()
s.Debugf("Consumer create failed, could not locate stream '%s > %s'", accName, stream)
return
}
Expand All @@ -4259,7 +4261,6 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
var wasExisting bool

// Check if we have an existing consumer assignment.
js.mu.Lock()
if sa.consumers == nil {
sa.consumers = make(map[string]*consumerAssignment)
} else if oca := sa.consumers[ca.Name]; oca != nil {
Expand Down
2 changes: 2 additions & 0 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2075,11 +2075,13 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) {

metaLeader := c.leader()
mjs := metaLeader.getJetStream()
mjs.mu.RLock()
sa := mjs.streamAssignment(globalAccountName, "MAXCC")
require_NotNil(t, sa)
for _, ca := range sa.consumers {
require_False(t, ca.pending)
}
mjs.mu.RUnlock()
}

func TestJetStreamClusterAccountMaxStreamsAndConsumersMultipleConcurrentRequests(t *testing.T) {
Expand Down

0 comments on commit 4978da5

Please sign in to comment.