From eab45b404a891d941eb87069101c16b843b875d8 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 22 Sep 2021 10:35:56 -0700 Subject: [PATCH] Fix for deadlock with stream mirrors or sources where origin is interest or workqueue policy. Signed-off-by: Derek Collison --- server/const.go | 2 +- server/consumer.go | 2 +- server/norace_test.go | 48 +++++++++++++++++++++++++++++++ server/stream.go | 66 +++++++++++++++++++++++++++++++++++++++---- 4 files changed, 111 insertions(+), 7 deletions(-) diff --git a/server/const.go b/server/const.go index 79d6ab07759..99854ae3904 100644 --- a/server/const.go +++ b/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.6.0" + VERSION = "2.6.1" // PROTO is the currently supported protocol. // 0 was the original diff --git a/server/consumer.go b/server/consumer.go index fcf5b1730fc..bd49e033d16 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2360,7 +2360,7 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6 // If we are ack none and mset is interest only we should make sure stream removes interest. if ap == AckNone && mset.cfg.Retention != LimitsPolicy { if o.node == nil || o.cfg.Direct { - mset.amch <- seq + mset.ackq.push(seq) } else { o.updateAcks(dseq, seq) } diff --git a/server/norace_test.go b/server/norace_test.go index 14d17a6bb77..b131ff4a0aa 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -3456,3 +3456,51 @@ func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) { fetchMsgs(t, sub, 100, 5*time.Second) checkConsumerWith(300, 50, 175) } + +func TestNoRaceJetStreamClusterInterestRetentionDeadlock(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + // Client based API + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + // This can trigger deadlock with current architecture. + // If stream is !limitsRetention and consumer is DIRECT and ack none we will try to place the msg seq + // onto a chan for the stream to consider removing. All conditions above must hold to trigger. + + // We will attempt to trigger here with a stream mirror setup which uses and R=1 DIRECT consumer to replicate msgs. + _, err := js.AddStream(&nats.StreamConfig{Name: "S", Retention: nats.InterestPolicy, Storage: nats.MemoryStorage}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create a mirror which will create the consumer profile to trigger. + _, err = js.AddStream(&nats.StreamConfig{Name: "M", Mirror: &nats.StreamSource{Name: "S"}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Queue up alot of messages. + numRequests := 20_000 + for i := 0; i < numRequests; i++ { + js.PublishAsync("S", []byte("Q")) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + si, err := js.StreamInfo("S") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si.State.Msgs != 0 { + return fmt.Errorf("Expected 0 msgs, got state: %+v", si.State) + } + return nil + }) +} diff --git a/server/stream.go b/server/stream.go index 97efcdd0075..9f6378db51c 100644 --- a/server/stream.go +++ b/server/stream.go @@ -154,7 +154,7 @@ type stream struct { outq *jsOutQ msgs *inbound store StreamStore - amch chan uint64 + ackq *ackMsgQueue lseq uint64 lmsgId string consumers map[string]*consumer @@ -353,7 +353,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt // For no-ack consumers when we are interest retention. if cfg.Retention != LimitsPolicy { - mset.amch = make(chan uint64, 1024) + mset.ackq = &ackMsgQueue{mch: make(chan struct{}, 1)} } jsa.streams[cfg.Name] = mset @@ -3040,14 +3040,66 @@ func (mset *stream) subjects() []string { return append(mset.cfg.Subjects[:0:0], mset.cfg.Subjects...) } +// Linked list for async ack of messages. +// When we have a consumer to a stream that is interest based and the +// consumer is R=1 and acknone. This is how mirrors and sources replicate. +type ackMsgQueue struct { + sync.Mutex + mch chan struct{} + seqs []uint64 + back []uint64 +} + +// Push onto the queue. +func (q *ackMsgQueue) push(seq uint64) { + q.Lock() + notify := len(q.seqs) == 0 + q.seqs = append(q.seqs, seq) + q.Unlock() + if notify { + select { + case q.mch <- struct{}{}: + default: + } + } +} + +// Pop all pending off. +func (q *ackMsgQueue) pop() []uint64 { + q.Lock() + seqs := q.seqs + q.seqs, q.back = q.back, nil + q.Unlock() + return seqs +} + +func (q *ackMsgQueue) recycle(seqs []uint64) { + const maxAckQueueReuse = 8 * 1024 + if cap(seqs) > maxAckQueueReuse { + return + } + q.Lock() + q.back = seqs[:0] + q.Unlock() +} + func (mset *stream) internalLoop() { mset.mu.RLock() s := mset.srv c := s.createInternalJetStreamClient() c.registerWithAccount(mset.acc) defer c.closeConnection(ClientClosed) - outq, qch, mch, amch := mset.outq, mset.qch, mset.msgs.mch, mset.amch + outq, qch, mch := mset.outq, mset.qch, mset.msgs.mch isClustered := mset.cfg.Replicas > 1 + + // For the ack msgs queue for interest retention. + var ( + amch chan struct{} + ackq *ackMsgQueue + ) + if mset.ackq != nil { + ackq, amch = mset.ackq, mset.ackq.mch + } mset.mu.RUnlock() // Raw scratch buffer. @@ -3097,8 +3149,12 @@ func (mset *stream) internalLoop() { mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0) } } - case seq := <-amch: - mset.ackMsg(nil, seq) + case <-amch: + seqs := ackq.pop() + for _, seq := range seqs { + mset.ackMsg(nil, seq) + } + ackq.recycle(seqs) case <-qch: return case <-s.quitCh: