Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Deadlock with stream mirrors #2561

Merged
merged 1 commit into from
Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (

const (
// VERSION is the current version for the server.
VERSION = "2.6.0"
VERSION = "2.6.1"

// PROTO is the currently supported protocol.
// 0 was the original
Expand Down
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2360,7 +2360,7 @@ func (o *consumer) deliverMsg(dsubj, subj string, hdr, msg []byte, seq, dc uint6
// If we are ack none and mset is interest only we should make sure stream removes interest.
if ap == AckNone && mset.cfg.Retention != LimitsPolicy {
if o.node == nil || o.cfg.Direct {
mset.amch <- seq
mset.ackq.push(seq)
} else {
o.updateAcks(dseq, seq)
}
Expand Down
48 changes: 48 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3456,3 +3456,51 @@ func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) {
fetchMsgs(t, sub, 100, 5*time.Second)
checkConsumerWith(300, 50, 175)
}

func TestNoRaceJetStreamClusterInterestRetentionDeadlock(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

// This can trigger deadlock with current architecture.
// If stream is !limitsRetention and consumer is DIRECT and ack none we will try to place the msg seq
// onto a chan for the stream to consider removing. All conditions above must hold to trigger.

// We will attempt to trigger here with a stream mirror setup which uses and R=1 DIRECT consumer to replicate msgs.
_, err := js.AddStream(&nats.StreamConfig{Name: "S", Retention: nats.InterestPolicy, Storage: nats.MemoryStorage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Create a mirror which will create the consumer profile to trigger.
_, err = js.AddStream(&nats.StreamConfig{Name: "M", Mirror: &nats.StreamSource{Name: "S"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Queue up alot of messages.
numRequests := 20_000
for i := 0; i < numRequests; i++ {
js.PublishAsync("S", []byte("Q"))
}
select {
case <-js.PublishAsyncComplete():
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}

checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("S")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si.State.Msgs != 0 {
return fmt.Errorf("Expected 0 msgs, got state: %+v", si.State)
}
return nil
})
}
66 changes: 61 additions & 5 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type stream struct {
outq *jsOutQ
msgs *inbound
store StreamStore
amch chan uint64
ackq *ackMsgQueue
lseq uint64
lmsgId string
consumers map[string]*consumer
Expand Down Expand Up @@ -353,7 +353,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt

// For no-ack consumers when we are interest retention.
if cfg.Retention != LimitsPolicy {
mset.amch = make(chan uint64, 1024)
mset.ackq = &ackMsgQueue{mch: make(chan struct{}, 1)}
}

jsa.streams[cfg.Name] = mset
Expand Down Expand Up @@ -3040,14 +3040,66 @@ func (mset *stream) subjects() []string {
return append(mset.cfg.Subjects[:0:0], mset.cfg.Subjects...)
}

// Linked list for async ack of messages.
// When we have a consumer to a stream that is interest based and the
// consumer is R=1 and acknone. This is how mirrors and sources replicate.
type ackMsgQueue struct {
sync.Mutex
mch chan struct{}
seqs []uint64
back []uint64
}

// Push onto the queue.
func (q *ackMsgQueue) push(seq uint64) {
q.Lock()
notify := len(q.seqs) == 0
q.seqs = append(q.seqs, seq)
q.Unlock()
if notify {
select {
case q.mch <- struct{}{}:
default:
}
}
}

// Pop all pending off.
func (q *ackMsgQueue) pop() []uint64 {
q.Lock()
seqs := q.seqs
q.seqs, q.back = q.back, nil
q.Unlock()
return seqs
}

func (q *ackMsgQueue) recycle(seqs []uint64) {
const maxAckQueueReuse = 8 * 1024
if cap(seqs) > maxAckQueueReuse {
return
}
q.Lock()
q.back = seqs[:0]
q.Unlock()
}

func (mset *stream) internalLoop() {
mset.mu.RLock()
s := mset.srv
c := s.createInternalJetStreamClient()
c.registerWithAccount(mset.acc)
defer c.closeConnection(ClientClosed)
outq, qch, mch, amch := mset.outq, mset.qch, mset.msgs.mch, mset.amch
outq, qch, mch := mset.outq, mset.qch, mset.msgs.mch
isClustered := mset.cfg.Replicas > 1

// For the ack msgs queue for interest retention.
var (
amch chan struct{}
ackq *ackMsgQueue
)
if mset.ackq != nil {
ackq, amch = mset.ackq, mset.ackq.mch
}
mset.mu.RUnlock()

// Raw scratch buffer.
Expand Down Expand Up @@ -3097,8 +3149,12 @@ func (mset *stream) internalLoop() {
mset.processJetStreamMsg(im.subj, im.rply, im.hdr, im.msg, 0, 0)
}
}
case seq := <-amch:
mset.ackMsg(nil, seq)
case <-amch:
seqs := ackq.pop()
for _, seq := range seqs {
mset.ackMsg(nil, seq)
}
ackq.recycle(seqs)
case <-qch:
return
case <-s.quitCh:
Expand Down