Skip to content

Commit

Permalink
Fix panic if event is dropped by queue (#5532) (#5536)
Browse files Browse the repository at this point in the history
- Only increase event seq-no if event has been pushed
- Fix invalid ACK list if empty list is given to concat
- Only execute ack handlers if number of events being ACKed > 0
- Add missing debug log when collecting first ACK in ACK list

(cherry picked from commit 470b3d3)
  • Loading branch information
tsg authored and ruflin committed Nov 8, 2017
1 parent a5c9c5e commit ee862c0
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di

- Fix http status phrase parsing not allow spaces. {pull}5312[5312]
- Fix missing length check in the PostgreSQL module. {pull}5457[5457]
- Fix panic in ACK handler if event is dropped on blocked queue {issue}5524[5524]

*Winlogbeat*

Expand Down
32 changes: 23 additions & 9 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,20 @@ func (l *ackLoop) run() {
count, events := lst.count()
l.lst.concat(&lst)

// log.Debugf("ackloop: scheduledACKs count=%v events=%v\n", count, events)
// log.Debug("ACK List:")
// for current := l.lst.head; current != nil; current = current.next {
// log.Debugf(" ack entry(seq=%v, start=%v, count=%v",
// current.seq, current.start, current.count)
// }

l.batchesSched += uint64(count)
l.totalSched += uint64(events)

case <-l.sig:
acked += l.handleBatchSig()
acks = l.broker.acks
if acked > 0 {
acks = l.broker.acks
}
}

// log.Debug("ackloop INFO")
Expand Down Expand Up @@ -87,12 +94,14 @@ func (l *ackLoop) handleBatchSig() int {
count += current.count
}

if e := l.broker.eventer; e != nil {
e.OnACK(count)
}
if count > 0 {
if e := l.broker.eventer; e != nil {
e.OnACK(count)
}

// report acks to waiting clients
l.processACK(lst, count)
// report acks to waiting clients
l.processACK(lst, count)
}

for !lst.empty() {
releaseACKChan(lst.pop())
Expand All @@ -110,15 +119,15 @@ func (l *ackLoop) collectAcked() chanList {
lst := chanList{}

acks := l.lst.pop()
l.onACK(acks)
lst.append(acks)

done := false
for !l.lst.empty() && !done {
acks := l.lst.front()
select {
case <-acks.ch:
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)
l.batchesACKed++
l.onACK(acks)
lst.append(l.lst.pop())

default:
Expand All @@ -128,3 +137,8 @@ func (l *ackLoop) collectAcked() chanList {

return lst
}

func (l *ackLoop) onACK(acks *ackChan) {
l.batchesACKed++
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)
}
4 changes: 4 additions & 0 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ func (l *chanList) prepend(ch *ackChan) {
}

func (l *chanList) concat(other *chanList) {
if other.head == nil {
return
}

if l.head == nil {
*l = *other
return
Expand Down
15 changes: 12 additions & 3 deletions libbeat/publisher/queue/memqueue/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ackProducer struct {
}

type openState struct {
log logger
isOpen atomic.Bool
done chan struct{}
events chan pushRequest
Expand All @@ -37,6 +38,7 @@ type ackHandler func(count int)

func newProducer(b *Broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer {
openState := openState{
log: b.logger,
isOpen: atomic.MakeBool(true),
done: make(chan struct{}),
events: b.events,
Expand Down Expand Up @@ -69,11 +71,18 @@ func (p *forgetfullProducer) Cancel() int {
}

func (p *ackProducer) Publish(event publisher.Event) bool {
return p.openState.publish(p.makeRequest(event))
return p.updSeq(p.openState.publish(p.makeRequest(event)))
}

func (p *ackProducer) TryPublish(event publisher.Event) bool {
return p.openState.tryPublish(p.makeRequest(event))
return p.updSeq(p.openState.tryPublish(p.makeRequest(event)))
}

func (p *ackProducer) updSeq(ok bool) bool {
if ok {
p.seq++
}
return ok
}

func (p *ackProducer) makeRequest(event publisher.Event) pushRequest {
Expand All @@ -82,7 +91,6 @@ func (p *ackProducer) makeRequest(event publisher.Event) pushRequest {
seq: p.seq,
state: &p.state,
}
p.seq++
return req
}

Expand Down Expand Up @@ -126,6 +134,7 @@ func (st *openState) tryPublish(req pushRequest) bool {
st.events = nil
return false
default:
st.log.Debugf("Dropping event, queue is blocked (seq=%v) ", req.seq)
return false
}
}

0 comments on commit ee862c0

Please sign in to comment.