diff --git a/libbeat/publisher/queue/memqueue/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go index d6a78ae4ed7..0c6c40e5d79 100644 --- a/libbeat/publisher/queue/memqueue/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -24,32 +24,29 @@ package memqueue // Producer ACKs are run in the ackLoop go-routine. type ackLoop struct { broker *broker - sig chan batchAckMsg - lst chanList + + // A list of ACK channels given to queue consumers, + // used to maintain sequencing of event acknowledgements. + ackChans chanList totalACK uint64 processACK func(chanList, int) } -func newACKLoop(b *broker, processACK func(chanList, int)) *ackLoop { - l := &ackLoop{broker: b} - l.processACK = processACK - return l -} - func (l *ackLoop) run() { var ( // log = l.broker.logger - // Buffer up acked event counter in acked. If acked > 0, acks will be set to + // Buffer up event counter in ackCount. If ackCount > 0, acks will be set to // the broker.acks channel for sending the ACKs while potentially receiving // new batches from the broker event loop. // This concurrent bidirectionally communication pattern requiring 'select' // ensures we can not have any deadlock between the event loop and the ack // loop, as the ack loop will not block on any channel - acked int - acks chan int + ackCount int + ackChan chan int + sig chan batchAckMsg ) for { @@ -57,16 +54,16 @@ func (l *ackLoop) run() { case <-l.broker.done: return - case acks <- acked: - acks, acked = nil, 0 + case ackChan <- ackCount: + ackChan, ackCount = nil, 0 - case lst := <-l.broker.scheduledACKs: - l.lst.concat(&lst) + case chanList := <-l.broker.scheduledACKs: + l.ackChans.concat(&chanList) - case <-l.sig: - acked += l.handleBatchSig() - if acked > 0 { - acks = l.broker.acks + case <-sig: + ackCount += l.handleBatchSig() + if ackCount > 0 { + ackChan = l.broker.ackChan } } @@ -76,7 +73,7 @@ func (l *ackLoop) run() { // log.Debug("ackloop: total batches scheduled = ", l.batchesSched) // log.Debug("ackloop: total batches ack = ", l.batchesACKed) - l.sig = l.lst.channel() + sig = l.ackChans.channel() // if l.sig == nil { // log.Debug("ackloop: no ack scheduled") // } else { @@ -119,17 +116,15 @@ func (l *ackLoop) handleBatchSig() int { func (l *ackLoop) collectAcked() chanList { lst := chanList{} - acks := l.lst.pop() - l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count) + acks := l.ackChans.pop() lst.append(acks) done := false - for !l.lst.empty() && !done { - acks := l.lst.front() + for !l.ackChans.empty() && !done { + acks := l.ackChans.front() select { - case <-acks.ch: - l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count) - lst.append(l.lst.pop()) + case <-acks.ackChan: + lst.append(l.ackChans.pop()) default: done = true diff --git a/libbeat/publisher/queue/memqueue/batchbuf.go b/libbeat/publisher/queue/memqueue/batchbuf.go index d44bfbe9f38..9ba4b659ed5 100644 --- a/libbeat/publisher/queue/memqueue/batchbuf.go +++ b/libbeat/publisher/queue/memqueue/batchbuf.go @@ -19,49 +19,42 @@ package memqueue import "github.com/elastic/beats/v7/libbeat/publisher" +type queueEntry struct { + event interface{} + client clientState +} + type batchBuffer struct { next *batchBuffer flushed bool - events []publisher.Event - clients []clientState + entries []queueEntry } func newBatchBuffer(sz int) *batchBuffer { b := &batchBuffer{} - b.init(sz) + b.entries = make([]queueEntry, 0, sz) return b } -func (b *batchBuffer) init(sz int) { - b.events = make([]publisher.Event, 0, sz) - b.clients = make([]clientState, 0, sz) -} - -func (b *batchBuffer) add(event publisher.Event, st clientState) { - b.events = append(b.events, event) - b.clients = append(b.clients, st) +func (b *batchBuffer) add(event *publisher.Event, st clientState) { + b.entries = append(b.entries, queueEntry{event, st}) } func (b *batchBuffer) length() int { - return len(b.events) + return len(b.entries) } func (b *batchBuffer) cancel(st *produceState) int { - events := b.events[:0] - clients := b.clients[:0] + entries := b.entries[:0] - removed := 0 - for i := range b.clients { - if b.clients[i].state == st { - removed++ + removedCount := 0 + for _, entry := range b.entries { + if entry.client.state == st { + removedCount++ continue } - - events = append(events, b.events[i]) - clients = append(clients, b.clients[i]) + entries = append(entries, entry) } - - b.events = events - b.clients = clients - return removed + b.entries = entries + return removedCount } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index 3003b37d029..b1d4e13535e 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -39,15 +39,40 @@ type broker struct { bufSize int + /////////////////////////// // api channels - events chan pushRequest - requests chan getRequest - pubCancel chan producerCancelRequest + // Producers send requests to pushChan to add events to the queue. + pushChan chan pushRequest + + // Consumers send requests to getChan to read events from the queue. + getChan chan getRequest + + // Producers send requests to cancelChan to cancel events they've + // sent so far that have not yet reached a consumer. + cancelChan chan producerCancelRequest + + /////////////////////////// // internal channels - acks chan int + + // When ackLoop receives events ACKs from a consumer, it sends the number + // of ACKed events to ackChan to notify the event loop that those + // events can be removed from the queue. + ackChan chan int + + // When events are sent to consumers, the ACK channels for their batches + // are collected into chanLists and sent to scheduledACKs. + // These are then read by ackLoop and concatenated to its internal + // chanList of all outstanding ACK channels. scheduledACKs chan chanList + // A listener that should be notified when ACKs are processed. + // ackLoop calls this listener's OnACK function when it advances + // the consumer ACK position. + // Right now this listener always points at the Pipeline associated with + // this queue. Pipeline.OnACK then forwards the notification to + // Pipeline.observer.queueACKed(), which updates the beats registry + // if needed. ackListener queue.ACKListener // wait group for worker shutdown @@ -62,17 +87,19 @@ type Settings struct { InputQueueSize int } -type ackChan struct { - next *ackChan - ch chan batchAckMsg - seq uint +// batchACKState stores the metadata associated with a batch of events sent to +// a consumer. When the consumer ACKs that batch, a batchAckMsg is sent on +// ackChan and received by +type batchACKState struct { + next *batchACKState + ackChan chan batchAckMsg start, count int // number of events waiting for ACK - states []clientState + entries []queueEntry } type chanList struct { - head *ackChan - tail *ackChan + head *batchACKState + tail *batchACKState } func init() { @@ -141,12 +168,12 @@ func NewQueue( logger: logger, // broker API channels - events: make(chan pushRequest, chanSize), - requests: make(chan getRequest), - pubCancel: make(chan producerCancelRequest, 5), + pushChan: make(chan pushRequest, chanSize), + getChan: make(chan getRequest), + cancelChan: make(chan producerCancelRequest, 5), // internal broker and ACK handler channels - acks: make(chan int), + ackChan: make(chan int), scheduledACKs: make(chan chanList), ackListener: settings.ACKListener, @@ -164,7 +191,9 @@ func NewQueue( } b.bufSize = sz - ack := newACKLoop(b, eventLoop.processACK) + ackLoop := &ackLoop{ + broker: b, + processACK: eventLoop.processACK} b.wg.Add(2) go func() { @@ -173,7 +202,7 @@ func NewQueue( }() go func() { defer b.wg.Done() - ack.run() + ackLoop.run() }() return b @@ -204,29 +233,28 @@ func (b *broker) Metrics() (queue.Metrics, error) { var ackChanPool = sync.Pool{ New: func() interface{} { - return &ackChan{ - ch: make(chan batchAckMsg, 1), + return &batchACKState{ + ackChan: make(chan batchAckMsg, 1), } }, } -func newACKChan(seq uint, start, count int, states []clientState) *ackChan { +func newBatchACKState(start, count int, entries []queueEntry) *batchACKState { //nolint: errcheck // Return value doesn't need to be checked before conversion. - ch := ackChanPool.Get().(*ackChan) + ch := ackChanPool.Get().(*batchACKState) ch.next = nil - ch.seq = seq ch.start = start ch.count = count - ch.states = states + ch.entries = entries return ch } -func releaseACKChan(c *ackChan) { +func releaseACKChan(c *batchACKState) { c.next = nil ackChanPool.Put(c) } -func (l *chanList) prepend(ch *ackChan) { +func (l *chanList) prepend(ch *batchACKState) { ch.next = l.head l.head = ch if l.tail == nil { @@ -248,7 +276,7 @@ func (l *chanList) concat(other *chanList) { l.tail = other.tail } -func (l *chanList) append(ch *ackChan) { +func (l *chanList) append(ch *batchACKState) { if l.head == nil { l.head = ch } else { @@ -261,7 +289,7 @@ func (l *chanList) empty() bool { return l.head == nil } -func (l *chanList) front() *ackChan { +func (l *chanList) front() *batchACKState { return l.head } @@ -269,10 +297,10 @@ func (l *chanList) channel() chan batchAckMsg { if l.head == nil { return nil } - return l.head.ch + return l.head.ackChan } -func (l *chanList) pop() *ackChan { +func (l *chanList) pop() *batchACKState { ch := l.head if ch != nil { l.head = ch.next diff --git a/libbeat/publisher/queue/memqueue/consume.go b/libbeat/publisher/queue/memqueue/consume.go index 1e4c7b76729..03018ad4fc5 100644 --- a/libbeat/publisher/queue/memqueue/consume.go +++ b/libbeat/publisher/queue/memqueue/consume.go @@ -37,17 +37,9 @@ type consumer struct { type batch struct { consumer *consumer events []publisher.Event - ack *ackChan - state ackState + ackChan chan batchAckMsg } -type ackState uint8 - -const ( - batchActive ackState = iota - batchACK -) - func newConsumer(b *broker) *consumer { return &consumer{ broker: b, @@ -62,18 +54,23 @@ func (c *consumer) Get(sz int) (queue.Batch, error) { } select { - case c.broker.requests <- getRequest{sz: sz, resp: c.resp}: + case c.broker.getChan <- getRequest{entryCount: sz, responseChan: c.resp}: case <-c.done: return nil, io.EOF } // if request has been send, we do have to wait for a response resp := <-c.resp + events := make([]publisher.Event, 0, len(resp.entries)) + for _, entry := range resp.entries { + if event, ok := entry.event.(*publisher.Event); ok { + events = append(events, *event) + } + } return &batch{ consumer: c, - events: resp.buf, - ack: resp.ack, - state: batchActive, + events: events, + ackChan: resp.ackChan, }, nil } @@ -81,31 +78,14 @@ func (c *consumer) Close() error { if c.closed.Swap(true) { return errors.New("already closed") } - close(c.done) return nil } func (b *batch) Events() []publisher.Event { - if b.state != batchActive { - panic("Get Events from inactive batch") - } return b.events } func (b *batch) ACK() { - if b.state != batchActive { - switch b.state { - case batchACK: - panic("Can not acknowledge already acknowledged batch") - default: - panic("inactive batch") - } - } - - b.report() -} - -func (b *batch) report() { - b.ack.ch <- batchAckMsg{} + b.ackChan <- batchAckMsg{} } diff --git a/libbeat/publisher/queue/memqueue/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go index 4fb10a7b83a..017516cbc4d 100644 --- a/libbeat/publisher/queue/memqueue/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -29,19 +29,11 @@ import ( // but tries to forward events as early as possible. type directEventLoop struct { broker *broker + buf ringBuffer - buf ringBuffer - - // active broker API channels - events chan pushRequest - get chan getRequest - pubCancel chan producerCancelRequest - - // ack handling - acks chan int // ackloop -> eventloop : total number of events ACKed by outputs - schedACKS chan chanList // eventloop -> ackloop : active list of batches to be acked - pendingACKs chanList // ordered list of active batches to be send to the ackloop - ackSeq uint // ack batch sequence number to validate ordering + // pendingACKs aggregates a list of ACK channels for batches that have been sent + // to consumers, which is then sent to the broker's scheduledACKs channel. + pendingACKs chanList } // bufferingEventLoop implements the broker main event loop. @@ -49,24 +41,20 @@ type directEventLoop struct { type bufferingEventLoop struct { broker *broker - buf *batchBuffer - flushList flushList + buf *batchBuffer + flushList flushList + + // The number of events currently waiting in the queue, including + // those that have not yet been acked. eventCount int minEvents int maxEvents int flushTimeout time.Duration - // active broker API channels - events chan pushRequest - get chan getRequest - pubCancel chan producerCancelRequest - - // ack handling - acks chan int // ackloop -> eventloop : total number of events ACKed by outputs - schedACKS chan chanList // eventloop -> ackloop : active list of batches to be acked - pendingACKs chanList // ordered list of active batches to be send to the ackloop - ackSeq uint // ack batch sequence number to validate ordering + // pendingACKs aggregates a list of ACK channels for batches that have been sent + // to consumers, which is then sent to the broker's scheduledACKs channel. + pendingACKs chanList // buffer flush timer state timer *time.Timer @@ -81,11 +69,7 @@ type flushList struct { func newDirectEventLoop(b *broker, size int) *directEventLoop { l := &directEventLoop{ - broker: b, - events: b.events, - get: nil, - pubCancel: b.pubCancel, - acks: b.acks, + broker: b, } l.buf.init(b.logger, size) @@ -99,80 +83,73 @@ func (l *directEventLoop) run() { ) for { + var pushChan chan pushRequest + // Push requests are enabled if the queue isn't yet full. + if !l.buf.Full() { + pushChan = l.broker.pushChan + } + + var getChan chan getRequest + // Get requests are enabled if there are events in the queue + // that haven't yet been sent to a consumer. + if buf.Avail() > 0 { + getChan = l.broker.getChan + } + + var schedACKs chan chanList + // Sending pending ACKs to the broker's scheduled ACKs + // channel is enabled if it is nonempty. + if !l.pendingACKs.empty() { + schedACKs = l.broker.scheduledACKs + } + select { case <-broker.done: return - case req := <-l.events: // producer pushing new event - l.handleInsert(&req) + case req := <-pushChan: // producer pushing new event + l.insert(&req) - case req := <-l.pubCancel: // producer cancelling active events + case count := <-l.broker.ackChan: + // Events have been ACKed, remove them from the internal buffer. + l.buf.removeEntries(count) + + case req := <-l.broker.cancelChan: // producer cancelling active events l.handleCancel(&req) + // re-enable pushRequest if buffer can take new events - case req := <-l.get: // consumer asking for next batch - l.handleConsumer(&req) + case req := <-getChan: // consumer asking for next batch + l.handleGetRequest(&req) - case l.schedACKS <- l.pendingACKs: - // on send complete list of pending batches has been forwarded -> clear list and queue - l.schedACKS = nil + case schedACKs <- l.pendingACKs: + // on send complete list of pending batches has been forwarded -> clear list l.pendingACKs = chanList{} - - case count := <-l.acks: - l.handleACK(count) - - } - - // update get and idle timer after state machine - l.get = nil - if buf.Avail() > 0 { - l.get = broker.requests } } } -func (l *directEventLoop) handleInsert(req *pushRequest) { - // log := l.broker.logger - // log.Debugf("push event: %v\t%v\t%p\n", req.event, req.seq, req.state) - - if avail, ok := l.insert(req); ok && avail == 0 { - // log.Debugf("buffer: all regions full") - - // no more space to accept new events -> unset events queue for time being - l.events = nil - } -} - -func (l *directEventLoop) insert(req *pushRequest) (int, bool) { - var avail int +// Returns true if the queue is full after handling the insertion request. +func (l *directEventLoop) insert(req *pushRequest) { log := l.broker.logger - if req.state == nil { - avail = l.buf.insert(req.event, clientState{}) - return avail, true - } - st := req.state - if st.cancelled { + if st == nil { + l.buf.insert(req.event, clientState{}) + } else if st.cancelled { reportCancelledState(log, req) - return -1, false + } else { + l.buf.insert(req.event, clientState{ + seq: req.seq, + state: st, + }) } - - avail = l.buf.insert(req.event, clientState{ - seq: req.seq, - state: st, - }) - - return avail, true } func (l *directEventLoop) handleCancel(req *producerCancelRequest) { // log := l.broker.logger // log.Debug("handle cancel request") - var ( - removed int - broker = l.broker - ) + var removed int if st := req.state; st != nil { st.cancelled = true @@ -183,44 +160,25 @@ func (l *directEventLoop) handleCancel(req *producerCancelRequest) { if req.resp != nil { req.resp <- producerCancelResponse{removed: removed} } - - // re-enable pushRequest if buffer can take new events - if !l.buf.Full() { - l.events = broker.events - } } -func (l *directEventLoop) handleConsumer(req *getRequest) { +func (l *directEventLoop) handleGetRequest(req *getRequest) { // log := l.broker.logger // log.Debugf("try reserve %v events", req.sz) - start, buf := l.buf.reserve(req.sz) + start, buf := l.buf.reserve(req.entryCount) count := len(buf) if count == 0 { panic("empty batch returned") } - // log.Debug("newACKChan: ", b.ackSeq, count) - ackCH := newACKChan(l.ackSeq, start, count, l.buf.buf.clients) - l.ackSeq++ + ackCH := newBatchACKState(start, count, l.buf.entries) - req.resp <- getResponse{ackCH, buf} + req.responseChan <- getResponse{ackCH.ackChan, buf} l.pendingACKs.append(ackCH) - l.schedACKS = l.broker.scheduledACKs } -func (l *directEventLoop) handleACK(count int) { - // log := l.broker.logger - // log.Debug("receive buffer ack:", count) - - // Give broker/buffer a chance to clean up most recent ACKs - // After handling ACKs some buffer has been freed up - // -> always reenable producers - l.buf.ack(count) - l.events = l.broker.events -} - -// processACK is used by the ackLoop to process the list of acked batches +// processACK is called by the ackLoop to process the list of acked batches func (l *directEventLoop) processACK(lst chanList, N int) { log := l.broker.logger { @@ -233,42 +191,42 @@ func (l *directEventLoop) processACK(lst chanList, N int) { acks := lst.front() start := acks.start - states := acks.states + entries := l.buf.entries idx := start + N - 1 - if idx >= len(states) { - idx -= len(states) + if idx >= len(entries) { + idx -= len(entries) } total := 0 for i := N - 1; i >= 0; i-- { if idx < 0 { - idx = len(states) - 1 + idx = len(entries) - 1 } - st := &states[idx] - log.Debugf("try ack index: (idx=%v, i=%v, seq=%v)\n", idx, i, st.seq) + client := &entries[idx].client + log.Debugf("try ack index: (idx=%v, i=%v, seq=%v)\n", idx, i, client.seq) idx-- - if st.state == nil { + if client.state == nil { log.Debug("no state set") continue } - count := (st.seq - st.state.lastACK) + count := (client.seq - client.state.lastACK) if count == 0 || count > math.MaxUint32/2 { // seq number comparison did underflow. This happens only if st.seq has // already been acknowledged // log.Debug("seq number already acked: ", st.seq) - st.state = nil + client.state = nil continue } log.Debugf("broker ACK events: count=%v, start-seq=%v, end-seq=%v\n", count, - st.state.lastACK+1, - st.seq, + client.state.lastACK+1, + client.seq, ) total += int(count) @@ -278,9 +236,9 @@ func (l *directEventLoop) processACK(lst chanList, N int) { )) } - st.state.cb(int(count)) - st.state.lastACK = st.seq - st.state = nil + client.state.cb(int(count)) + client.state.lastACK = client.seq + client.state = nil } } @@ -290,11 +248,6 @@ func newBufferingEventLoop(b *broker, size int, minEvents int, flushTimeout time maxEvents: size, minEvents: minEvents, flushTimeout: flushTimeout, - - events: b.events, - get: nil, - pubCancel: b.pubCancel, - acks: b.acks, } l.buf = newBatchBuffer(l.minEvents) @@ -307,29 +260,46 @@ func newBufferingEventLoop(b *broker, size int, minEvents int, flushTimeout time } func (l *bufferingEventLoop) run() { - var ( - broker = l.broker - ) + broker := l.broker for { + var pushChan chan pushRequest + // Push requests are enabled if the queue isn't yet full. + if l.eventCount < l.maxEvents { + pushChan = l.broker.pushChan + } + + var getChan chan getRequest + // Get requests are enabled if the queue has events that + // weren't yet sent to consumers. + if !l.flushList.empty() { + getChan = l.broker.getChan + } + + var schedACKs chan chanList + // Enable sending to the scheduled ACKs channel if we have + // something to send. + if !l.pendingACKs.empty() { + schedACKs = l.broker.scheduledACKs + } + select { case <-broker.done: return - case req := <-l.events: // producer pushing new event + case req := <-pushChan: // producer pushing new event l.handleInsert(&req) - case req := <-l.pubCancel: // producer cancelling active events + case req := <-l.broker.cancelChan: // producer cancelling active events l.handleCancel(&req) - case req := <-l.get: // consumer asking for next batch - l.handleConsumer(&req) + case req := <-getChan: // consumer asking for next batch + l.handleGetRequest(&req) - case l.schedACKS <- l.pendingACKs: - l.schedACKS = nil + case schedACKs <- l.pendingACKs: l.pendingACKs = chanList{} - case count := <-l.acks: + case count := <-l.broker.ackChan: l.handleACK(count) case <-l.idleC: @@ -345,9 +315,6 @@ func (l *bufferingEventLoop) run() { func (l *bufferingEventLoop) handleInsert(req *pushRequest) { if l.insert(req) { l.eventCount++ - if l.eventCount == l.maxEvents { - l.events = nil // stop inserting events if upper limit is reached - } L := l.buf.length() if !l.buf.flushed { @@ -414,17 +381,10 @@ func (l *bufferingEventLoop) handleCancel(req *producerCancelRequest) { } } l.flushList = tmpList - if tmpList.empty() { - l.get = nil - } - l.eventCount -= removed - if l.eventCount < l.maxEvents { - l.events = l.broker.events - } } -func (l *bufferingEventLoop) handleConsumer(req *getRequest) { +func (l *bufferingEventLoop) handleGetRequest(req *getRequest) { buf := l.flushList.head if buf == nil { panic("get from non-flushed buffers") @@ -435,7 +395,7 @@ func (l *bufferingEventLoop) handleConsumer(req *getRequest) { panic("empty buffer in flush list") } - if sz := req.sz; sz > 0 { + if sz := req.entryCount; sz > 0 { if sz < count { count = sz } @@ -445,17 +405,13 @@ func (l *bufferingEventLoop) handleConsumer(req *getRequest) { panic("empty batch returned") } - events := buf.events[:count] - clients := buf.clients[:count] - ackChan := newACKChan(l.ackSeq, 0, count, clients) - l.ackSeq++ + entries := buf.entries[:count] + acker := newBatchACKState(0, count, entries) - req.resp <- getResponse{ackChan, events} - l.pendingACKs.append(ackChan) - l.schedACKS = l.broker.scheduledACKs + req.responseChan <- getResponse{acker.ackChan, entries} + l.pendingACKs.append(acker) - buf.events = buf.events[count:] - buf.clients = buf.clients[count:] + buf.entries = buf.entries[count:] if buf.length() == 0 { l.advanceFlushList() } @@ -463,9 +419,6 @@ func (l *bufferingEventLoop) handleConsumer(req *getRequest) { func (l *bufferingEventLoop) handleACK(count int) { l.eventCount -= count - if l.eventCount < l.maxEvents { - l.events = l.broker.events - } } func (l *bufferingEventLoop) startFlushTimer() { @@ -486,12 +439,8 @@ func (l *bufferingEventLoop) stopFlushTimer() { func (l *bufferingEventLoop) advanceFlushList() { l.flushList.pop() - if l.flushList.count == 0 { - l.get = nil - - if l.buf.flushed { - l.buf = newBatchBuffer(l.minEvents) - } + if l.flushList.count == 0 && l.buf.flushed { + l.buf = newBatchBuffer(l.minEvents) } } @@ -503,7 +452,6 @@ func (l *bufferingEventLoop) flushBuffer() { } l.flushList.add(l.buf) - l.get = l.broker.requests } func (l *bufferingEventLoop) processACK(lst chanList, N int) { @@ -513,10 +461,10 @@ func (l *bufferingEventLoop) processACK(lst chanList, N int) { lst.reverse() for !lst.empty() { current := lst.pop() - states := current.states + entries := current.entries - for i := len(states) - 1; i >= 0; i-- { - st := &states[i] + for i := len(entries) - 1; i >= 0; i-- { + st := &entries[i].client if st.state == nil { continue } diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index b3b4c234ffe..8d4571ce225 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -22,7 +22,7 @@ import "github.com/elastic/beats/v7/libbeat/publisher" // producer -> broker API type pushRequest struct { - event publisher.Event + event *publisher.Event seq uint32 state *produceState } @@ -39,13 +39,13 @@ type producerCancelResponse struct { // consumer -> broker API type getRequest struct { - sz int // request sz events from the broker - resp chan getResponse // channel to send response to + entryCount int // request entryCount events from the broker + responseChan chan getResponse // channel to send response to } type getResponse struct { - ack *ackChan - buf []publisher.Event + ackChan chan batchAckMsg + entries []queueEntry } type batchAckMsg struct{} diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 625ab473d94..4679a008e98 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -56,7 +56,7 @@ func newProducer(b *broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel openState := openState{ log: b.logger, done: make(chan struct{}), - events: b.events, + events: b.pushChan, } if cb != nil { @@ -69,14 +69,14 @@ func newProducer(b *broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel } func (p *forgetfulProducer) Publish(event publisher.Event) bool { - return p.openState.publish(p.makeRequest(event)) + return p.openState.publish(p.makeRequest(&event)) } func (p *forgetfulProducer) TryPublish(event publisher.Event) bool { - return p.openState.tryPublish(p.makeRequest(event)) + return p.openState.tryPublish(p.makeRequest(&event)) } -func (p *forgetfulProducer) makeRequest(event publisher.Event) pushRequest { +func (p *forgetfulProducer) makeRequest(event *publisher.Event) pushRequest { return pushRequest{event: event} } @@ -86,11 +86,11 @@ func (p *forgetfulProducer) Cancel() int { } func (p *ackProducer) Publish(event publisher.Event) bool { - return p.updSeq(p.openState.publish(p.makeRequest(event))) + return p.updSeq(p.openState.publish(p.makeRequest(&event))) } func (p *ackProducer) TryPublish(event publisher.Event) bool { - return p.updSeq(p.openState.tryPublish(p.makeRequest(event))) + return p.updSeq(p.openState.tryPublish(p.makeRequest(&event))) } func (p *ackProducer) updSeq(ok bool) bool { @@ -100,7 +100,7 @@ func (p *ackProducer) updSeq(ok bool) bool { return ok } -func (p *ackProducer) makeRequest(event publisher.Event) pushRequest { +func (p *ackProducer) makeRequest(event *publisher.Event) pushRequest { req := pushRequest{ event: event, seq: p.seq, @@ -114,7 +114,7 @@ func (p *ackProducer) Cancel() int { if p.dropOnCancel { ch := make(chan producerCancelResponse) - p.broker.pubCancel <- producerCancelRequest{ + p.broker.cancelChan <- producerCancelRequest{ state: &p.state, resp: ch, } diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 45cd055d91e..84c353e5cd7 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" + "gotest.tools/assert" "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest" diff --git a/libbeat/publisher/queue/memqueue/ringbuf.go b/libbeat/publisher/queue/memqueue/ringbuf.go index 80ece2c00c4..f45e59b0231 100644 --- a/libbeat/publisher/queue/memqueue/ringbuf.go +++ b/libbeat/publisher/queue/memqueue/ringbuf.go @@ -21,33 +21,35 @@ import ( "fmt" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/publisher" ) // Internal event ring buffer. -// The ring is split into 2 regions. -// Region A contains active events to be send to consumers, while region B can -// only be filled by producers, if there is no more space in region A. Splitting -// the ring buffer into regions enables the broker to send batches of type -// []publisher.Event to the consumer without having to copy and/or grow/shrink the -// buffers. +// The ring is split into 2 contiguous regions. +// Events are appended to region A until it grows to the end of the internal +// buffer. Then region B is created at the beginning of the internal buffer, +// and events are inserted there until region A is emptied. When A becomes empty, +// we rename region B to region A, and the cycle repeats every time we wrap around +// the internal array storage. type ringBuffer struct { - buf eventBuffer + logger *logp.Logger + + entries []queueEntry + // The underlying array is divided up into two contiguous regions. regA, regB region - reserved int // amount of events in region A actively processed/reserved + + // The number of events awaiting ACK at the beginning of region A. + reserved int } +// region represents a contiguous region in ringBuffer's internal storage (i.e. +// one that does not cross the end of the array). type region struct { + // The starting position of this region within the full event buffer. index int - size int -} -type eventBuffer struct { - logger *logp.Logger - - events []publisher.Event - clients []clientState + // The number of events currently stored in this region. + size int } type clientState struct { @@ -55,40 +57,24 @@ type clientState struct { state *produceState // the producer it's state used to compute and signal the ACK count } -func (b *eventBuffer) init(size int) { - b.events = make([]publisher.Event, size) - b.clients = make([]clientState, size) -} - -func (b *eventBuffer) Len() int { - return len(b.events) -} - -func (b *eventBuffer) Set(idx int, event publisher.Event, st clientState) { - // b.logger.Debugf("insert event: idx=%v, seq=%v\n", idx, st.seq) - - b.events[idx] = event - b.clients[idx] = st -} - -func (b *ringBuffer) init(log *logp.Logger, size int) { - *b = ringBuffer{} - b.buf.init(size) - b.buf.logger = log +func (b *ringBuffer) init(logger *logp.Logger, size int) { + *b = ringBuffer{ + logger: logger, + entries: make([]queueEntry, size), + } } -func (b *ringBuffer) insert(event publisher.Event, client clientState) int { - // log := b.buf.logger - // log.Debug("insert:") - // log.Debug(" region A:", b.regA) - // log.Debug(" region B:", b.regB) - // log.Debug(" reserved:", b.reserved) - // defer func() { - // log.Debug(" -> region A:", b.regA) - // log.Debug(" -> region B:", b.regB) - // log.Debug(" -> reserved:", b.reserved) - // }() - +// Old spec: +// Returns the number of free entries left in the queue buffer after +// insertion. +// Also returns 0 if there is no space left in the queue to insert +// the given event. However, this is an error state: the first time +// it returns 0, insertion should be disabled by setting the +// pushRequest channel in directEventLoop to nil. +// New spec: +// Returns true if the ringBuffer is full after handling +// the given insertion, false otherwise. +func (b *ringBuffer) insert(event interface{}, client clientState) { // always insert into region B, if region B exists. // That is, we have 2 regions and region A is currently processed by consumers if b.regB.size > 0 { @@ -96,164 +82,110 @@ func (b *ringBuffer) insert(event publisher.Event, client clientState) int { idx := b.regB.index + b.regB.size avail := b.regA.index - idx - if avail == 0 { - return 0 + if avail > 0 { + b.entries[idx] = queueEntry{event, client} + b.regB.size++ } - - b.buf.Set(idx, event, client) - b.regB.size++ - - return avail - 1 + return } // region B does not exist yet, check if region A is available for use idx := b.regA.index + b.regA.size - // log.Debug(" - index: ", idx) - // log.Debug(" - buffer size: ", b.buf.Len()) - avail := b.buf.Len() - idx - if avail == 0 { // no more space in region A - // log.Debug(" - region A full") - - if b.regA.index == 0 { - // space to create region B, buffer is full - - // log.Debug(" - no space in region B") - - return 0 + if b.regA.index+b.regA.size >= len(b.entries) { + // region A extends to the end of the buffer + if b.regA.index > 0 { + // If there is space before region A, create + // region B there. + b.regB = region{index: 0, size: 1} + b.entries[0] = queueEntry{event, client} } - - // create region B and insert events - // log.Debug(" - create region B") - b.regB.index = 0 - b.regB.size = 1 - b.buf.Set(0, event, client) - return b.regA.index - 1 + return } // space available in region A -> let's append the event // log.Debug(" - push into region A") - b.buf.Set(idx, event, client) + b.entries[idx] = queueEntry{event, client} b.regA.size++ - return avail - 1 } // cancel removes all buffered events matching `st`, not yet reserved by // any consumer func (b *ringBuffer) cancel(st *produceState) int { - // log := b.buf.logger - // log.Debug("cancel:") - // log.Debug(" region A:", b.regA) - // log.Debug(" region B:", b.regB) - // log.Debug(" reserved:", b.reserved) - // defer func() { - // log.Debug(" -> region A:", b.regA) - // log.Debug(" -> region B:", b.regB) - // log.Debug(" -> reserved:", b.reserved) - // }() - - cancelB := b.cancelRegion(st, b.regB) - b.regB.size -= cancelB - - cancelA := b.cancelRegion(st, region{ + cancelledB := b.cancelRegion(st, b.regB) + b.regB.size -= cancelledB + + cancelledA := b.cancelRegion(st, region{ index: b.regA.index + b.reserved, size: b.regA.size - b.reserved, }) - b.regA.size -= cancelA + b.regA.size -= cancelledA - return cancelA + cancelB + return cancelledA + cancelledB } -func (b *ringBuffer) cancelRegion(st *produceState, reg region) (removed int) { +// cancelRegion removes the events in the specified range having +// the specified produceState. It returns the number of events +// removed. +func (b *ringBuffer) cancelRegion(st *produceState, reg region) int { start := reg.index end := start + reg.size - events := b.buf.events[start:end] - clients := b.buf.clients[start:end] + entries := b.entries[start:end] - toEvents := events[:0] - toClients := clients[:0] + toEntries := entries[:0] // filter loop for i := 0; i < reg.size; i++ { - if clients[i].state == st { + if entries[i].client.state == st { continue // remove } - - toEvents = append(toEvents, events[i]) - toClients = append(toClients, clients[i]) + toEntries = append(toEntries, entries[i]) } // re-initialize old buffer elements to help garbage collector - events = events[len(toEvents):] - clients = clients[len(toClients):] - for i := range events { - events[i] = publisher.Event{} - clients[i] = clientState{} + entries = entries[len(toEntries):] + for i := range entries { + entries[i] = queueEntry{} } - return len(events) + return len(entries) } // reserve returns up to `sz` events from the brokerBuffer, // exclusively marking the events as 'reserved'. Subsequent calls to `reserve` // will only return enqueued and non-reserved events from the buffer. // If `sz == -1`, all available events will be reserved. -func (b *ringBuffer) reserve(sz int) (int, []publisher.Event) { - // log := b.buf.logger - // log.Debug("reserve: ", sz) - // log.Debug(" region A:", b.regA) - // log.Debug(" region B:", b.regB) - // log.Debug(" reserved:", b.reserved) - // defer func() { - // log.Debug(" -> region A:", b.regA) - // log.Debug(" -> region B:", b.regB) - // log.Debug(" -> reserved:", b.reserved) - // }() - +func (b *ringBuffer) reserve(sz int) (int, []queueEntry) { use := b.regA.size - b.reserved - // log.Debug(" - avail: ", use) - if sz > 0 { - if use > sz { - use = sz - } + if sz > 0 && use > sz { + use = sz } start := b.regA.index + b.reserved end := start + use b.reserved += use - // log.Debug(" - start:", start) - // log.Debug(" - end:", end) - return start, b.buf.events[start:end] + return start, b.entries[start:end] } -// ack up to sz events in region A -func (b *ringBuffer) ack(sz int) { - // log := b.buf.logger - // log.Debug("ack: ", sz) - // log.Debug(" region A:", b.regA) - // log.Debug(" region B:", b.regB) - // log.Debug(" reserved:", b.reserved) - // defer func() { - // log.Debug(" -> region A:", b.regA) - // log.Debug(" -> region B:", b.regB) - // log.Debug(" -> reserved:", b.reserved) - // }() - - if b.regA.size < sz { +// Remove the specified number of previously-reserved buffer entries from the +// start of region A. Called by the event loop when events are ACKed by +// consumers. +func (b *ringBuffer) removeEntries(count int) { + if b.regA.size < count { panic(fmt.Errorf("commit region to big (commit region=%v, buffer size=%v)", - sz, b.regA.size, + count, b.regA.size, )) } // clear region, so published events can be collected by the garbage collector: - end := b.regA.index + sz + end := b.regA.index + count for i := b.regA.index; i < end; i++ { - b.buf.events[i] = publisher.Event{} + b.entries[i] = queueEntry{} } b.regA.index = end - b.regA.size -= sz - b.reserved -= sz + b.regA.size -= count + b.reserved -= count if b.regA.size == 0 { // region A is empty, transfer region B into region A b.regA = b.regB @@ -262,20 +194,18 @@ func (b *ringBuffer) ack(sz int) { } } +// Number of events that consumers can currently request. func (b *ringBuffer) Avail() int { return b.regA.size - b.reserved } func (b *ringBuffer) Full() bool { - var avail int if b.regB.size > 0 { - avail = b.regA.index - b.regB.index - b.regB.size - } else { - avail = b.buf.Len() - b.regA.index - b.regA.size + return b.regA.index == (b.regB.index + b.regB.size) } - return avail == 0 + return b.regA.size == len(b.entries) } func (b *ringBuffer) Size() int { - return b.buf.Len() + return len(b.entries) }