Skip to content

Commit

Permalink
Fix for deadlock with stream mirrors or sources where origin is inter…
Browse files Browse the repository at this point in the history
…est or workqueue policy.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 22, 2021
1 parent 3a4fd2b commit eab45b4
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 7 deletions.
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (

const (
// VERSION is the current version for the server.
VERSION = "2.6.0"
VERSION = "2.6.1"

// PROTO is the currently supported protocol.
// 0 was the original
Expand Down
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2360,7 +2360,7 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6
// If we are ack none and mset is interest only we should make sure stream removes interest.
if ap == AckNone && mset.cfg.Retention != LimitsPolicy {
if o.node == nil || o.cfg.Direct {
mset.amch <- seq
mset.ackq.push(seq)
} else {
o.updateAcks(dseq, seq)
}
Expand Down
48 changes: 48 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3456,3 +3456,51 @@ func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) {
fetchMsgs(t, sub, 100, 5*time.Second)
checkConsumerWith(300, 50, 175)
}

func TestNoRaceJetStreamClusterInterestRetentionDeadlock(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

// This can trigger deadlock with current architecture.
// If stream is !limitsRetention and consumer is DIRECT and ack none we will try to place the msg seq
// onto a chan for the stream to consider removing. All conditions above must hold to trigger.

// We will attempt to trigger here with a stream mirror setup which uses and R=1 DIRECT consumer to replicate msgs.
_, err := js.AddStream(&nats.StreamConfig{Name: "S", Retention: nats.InterestPolicy, Storage: nats.MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Create a mirror which will create the consumer profile to trigger.
_, err = js.AddStream(&nats.StreamConfig{Name: "M", Mirror: &nats.StreamSource{Name: "S"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Queue up alot of messages.
numRequests := 20_000
for i := 0; i < numRequests; i++ {
js.PublishAsync("S", []byte("Q"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("S")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 0 {
return fmt.Errorf("Expected 0 msgs, got state: %+v", si.State)
}
return nil
})
}
66 changes: 61 additions & 5 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type stream struct {
outq *jsOutQ
msgs *inbound
store StreamStore
amch chan uint64
ackq *ackMsgQueue
lseq uint64
lmsgId string
consumers map[string]*consumer
Expand Down Expand Up @@ -353,7 +353,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt

// For no-ack consumers when we are interest retention.
if cfg.Retention != LimitsPolicy {
mset.amch = make(chan uint64, 1024)
mset.ackq = &ackMsgQueue{mch: make(chan struct{}, 1)}
}

jsa.streams[cfg.Name] = mset
Expand Down Expand Up @@ -3040,14 +3040,66 @@ func (mset *stream) subjects() []string {
return append(mset.cfg.Subjects[:0:0], mset.cfg.Subjects...)
}

// Linked list for async ack of messages.
// When we have a consumer to a stream that is interest based and the
// consumer is R=1 and acknone. This is how mirrors and sources replicate.
type ackMsgQueue struct {
sync.Mutex
mch chan struct{}
seqs []uint64
back []uint64
}

// Push onto the queue.
func (q *ackMsgQueue) push(seq uint64) {
q.Lock()
notify := len(q.seqs) == 0
q.seqs = append(q.seqs, seq)
q.Unlock()
if notify {
select {
case q.mch <- struct{}{}:
default:
}
}
}

// Pop all pending off.
func (q *ackMsgQueue) pop() []uint64 {
q.Lock()
seqs := q.seqs
q.seqs, q.back = q.back, nil
q.Unlock()
return seqs
}

func (q *ackMsgQueue) recycle(seqs []uint64) {
const maxAckQueueReuse = 8 * 1024
if cap(seqs) > maxAckQueueReuse {
return
}
q.Lock()
q.back = seqs[:0]
q.Unlock()
}

func (mset *stream) internalLoop() {
mset.mu.RLock()
s := mset.srv
c := s.createInternalJetStreamClient()
c.registerWithAccount(mset.acc)
defer c.closeConnection(ClientClosed)
outq, qch, mch, amch := mset.outq, mset.qch, mset.msgs.mch, mset.amch
outq, qch, mch := mset.outq, mset.qch, mset.msgs.mch
isClustered := mset.cfg.Replicas > 1

// For the ack msgs queue for interest retention.
var (
amch chan struct{}
ackq *ackMsgQueue
)
if mset.ackq != nil {
ackq, amch = mset.ackq, mset.ackq.mch
}
mset.mu.RUnlock()

// Raw scratch buffer.
Expand Down Expand Up @@ -3097,8 +3149,12 @@ func (mset *stream) internalLoop() {
mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0)
}
}
case seq := <-amch:
mset.ackMsg(nil, seq)
case <-amch:
seqs := ackq.pop()
for _, seq := range seqs {
mset.ackMsg(nil, seq)
}
ackq.recycle(seqs)
case <-qch:
return
case <-s.quitCh:
Expand Down

0 comments on commit eab45b4

Please sign in to comment.