Skip to content

Commit

Permalink
[shipper] Make the memory queue accept opaque pointers (elastic#31356)
Browse files Browse the repository at this point in the history
  • Loading branch information
faec authored and kush-elastic committed May 2, 2022
1 parent 46a0832 commit b797c48
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 445 deletions.
49 changes: 22 additions & 27 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,49 +24,46 @@ package memqueue
// Producer ACKs are run in the ackLoop go-routine.
type ackLoop struct {
broker *broker
sig chan batchAckMsg
lst chanList

// A list of ACK channels given to queue consumers,
// used to maintain sequencing of event acknowledgements.
ackChans chanList

totalACK uint64

processACK func(chanList, int)
}

func newACKLoop(b *broker, processACK func(chanList, int)) *ackLoop {
l := &ackLoop{broker: b}
l.processACK = processACK
return l
}

func (l *ackLoop) run() {
var (
// log = l.broker.logger

// Buffer up acked event counter in acked. If acked > 0, acks will be set to
// Buffer up event counter in ackCount. If ackCount > 0, acks will be set to
// the broker.acks channel for sending the ACKs while potentially receiving
// new batches from the broker event loop.
// This concurrent bidirectionally communication pattern requiring 'select'
// ensures we can not have any deadlock between the event loop and the ack
// loop, as the ack loop will not block on any channel
acked int
acks chan int
ackCount int
ackChan chan int
sig chan batchAckMsg
)

for {
select {
case <-l.broker.done:
return

case acks <- acked:
acks, acked = nil, 0
case ackChan <- ackCount:
ackChan, ackCount = nil, 0

case lst := <-l.broker.scheduledACKs:
l.lst.concat(&lst)
case chanList := <-l.broker.scheduledACKs:
l.ackChans.concat(&chanList)

case <-l.sig:
acked += l.handleBatchSig()
if acked > 0 {
acks = l.broker.acks
case <-sig:
ackCount += l.handleBatchSig()
if ackCount > 0 {
ackChan = l.broker.ackChan
}
}

Expand All @@ -76,7 +73,7 @@ func (l *ackLoop) run() {
// log.Debug("ackloop: total batches scheduled = ", l.batchesSched)
// log.Debug("ackloop: total batches ack = ", l.batchesACKed)

l.sig = l.lst.channel()
sig = l.ackChans.channel()
// if l.sig == nil {
// log.Debug("ackloop: no ack scheduled")
// } else {
Expand Down Expand Up @@ -119,17 +116,15 @@ func (l *ackLoop) handleBatchSig() int {
func (l *ackLoop) collectAcked() chanList {
lst := chanList{}

acks := l.lst.pop()
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)
acks := l.ackChans.pop()
lst.append(acks)

done := false
for !l.lst.empty() && !done {
acks := l.lst.front()
for !l.ackChans.empty() && !done {
acks := l.ackChans.front()
select {
case <-acks.ch:
l.broker.logger.Debugf("ackloop: receive ack [%v: %v, %v]", acks.seq, acks.start, acks.count)
lst.append(l.lst.pop())
case <-acks.ackChan:
lst.append(l.ackChans.pop())

default:
done = true
Expand Down
43 changes: 18 additions & 25 deletions libbeat/publisher/queue/memqueue/batchbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,42 @@ package memqueue

import "github.com/elastic/beats/v7/libbeat/publisher"

type queueEntry struct {
event interface{}
client clientState
}

type batchBuffer struct {
next *batchBuffer
flushed bool
events []publisher.Event
clients []clientState
entries []queueEntry
}

func newBatchBuffer(sz int) *batchBuffer {
b := &batchBuffer{}
b.init(sz)
b.entries = make([]queueEntry, 0, sz)
return b
}

func (b *batchBuffer) init(sz int) {
b.events = make([]publisher.Event, 0, sz)
b.clients = make([]clientState, 0, sz)
}

func (b *batchBuffer) add(event publisher.Event, st clientState) {
b.events = append(b.events, event)
b.clients = append(b.clients, st)
func (b *batchBuffer) add(event *publisher.Event, st clientState) {
b.entries = append(b.entries, queueEntry{event, st})
}

func (b *batchBuffer) length() int {
return len(b.events)
return len(b.entries)
}

func (b *batchBuffer) cancel(st *produceState) int {
events := b.events[:0]
clients := b.clients[:0]
entries := b.entries[:0]

removed := 0
for i := range b.clients {
if b.clients[i].state == st {
removed++
removedCount := 0
for _, entry := range b.entries {
if entry.client.state == st {
removedCount++
continue
}

events = append(events, b.events[i])
clients = append(clients, b.clients[i])
entries = append(entries, entry)
}

b.events = events
b.clients = clients
return removed
b.entries = entries
return removedCount
}
86 changes: 57 additions & 29 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,40 @@ type broker struct {

bufSize int

///////////////////////////
// api channels
events chan pushRequest
requests chan getRequest
pubCancel chan producerCancelRequest

// Producers send requests to pushChan to add events to the queue.
pushChan chan pushRequest

// Consumers send requests to getChan to read events from the queue.
getChan chan getRequest

// Producers send requests to cancelChan to cancel events they've
// sent so far that have not yet reached a consumer.
cancelChan chan producerCancelRequest

///////////////////////////
// internal channels
acks chan int

// When ackLoop receives events ACKs from a consumer, it sends the number
// of ACKed events to ackChan to notify the event loop that those
// events can be removed from the queue.
ackChan chan int

// When events are sent to consumers, the ACK channels for their batches
// are collected into chanLists and sent to scheduledACKs.
// These are then read by ackLoop and concatenated to its internal
// chanList of all outstanding ACK channels.
scheduledACKs chan chanList

// A listener that should be notified when ACKs are processed.
// ackLoop calls this listener's OnACK function when it advances
// the consumer ACK position.
// Right now this listener always points at the Pipeline associated with
// this queue. Pipeline.OnACK then forwards the notification to
// Pipeline.observer.queueACKed(), which updates the beats registry
// if needed.
ackListener queue.ACKListener

// wait group for worker shutdown
Expand All @@ -62,17 +87,19 @@ type Settings struct {
InputQueueSize int
}

type ackChan struct {
next *ackChan
ch chan batchAckMsg
seq uint
// batchACKState stores the metadata associated with a batch of events sent to
// a consumer. When the consumer ACKs that batch, a batchAckMsg is sent on
// ackChan and received by
type batchACKState struct {
next *batchACKState
ackChan chan batchAckMsg
start, count int // number of events waiting for ACK
states []clientState
entries []queueEntry
}

type chanList struct {
head *ackChan
tail *ackChan
head *batchACKState
tail *batchACKState
}

func init() {
Expand Down Expand Up @@ -141,12 +168,12 @@ func NewQueue(
logger: logger,

// broker API channels
events: make(chan pushRequest, chanSize),
requests: make(chan getRequest),
pubCancel: make(chan producerCancelRequest, 5),
pushChan: make(chan pushRequest, chanSize),
getChan: make(chan getRequest),
cancelChan: make(chan producerCancelRequest, 5),

// internal broker and ACK handler channels
acks: make(chan int),
ackChan: make(chan int),
scheduledACKs: make(chan chanList),

ackListener: settings.ACKListener,
Expand All @@ -164,7 +191,9 @@ func NewQueue(
}

b.bufSize = sz
ack := newACKLoop(b, eventLoop.processACK)
ackLoop := &ackLoop{
broker: b,
processACK: eventLoop.processACK}

b.wg.Add(2)
go func() {
Expand All @@ -173,7 +202,7 @@ func NewQueue(
}()
go func() {
defer b.wg.Done()
ack.run()
ackLoop.run()
}()

return b
Expand Down Expand Up @@ -204,29 +233,28 @@ func (b *broker) Metrics() (queue.Metrics, error) {

var ackChanPool = sync.Pool{
New: func() interface{} {
return &ackChan{
ch: make(chan batchAckMsg, 1),
return &batchACKState{
ackChan: make(chan batchAckMsg, 1),
}
},
}

func newACKChan(seq uint, start, count int, states []clientState) *ackChan {
func newBatchACKState(start, count int, entries []queueEntry) *batchACKState {
//nolint: errcheck // Return value doesn't need to be checked before conversion.
ch := ackChanPool.Get().(*ackChan)
ch := ackChanPool.Get().(*batchACKState)
ch.next = nil
ch.seq = seq
ch.start = start
ch.count = count
ch.states = states
ch.entries = entries
return ch
}

func releaseACKChan(c *ackChan) {
func releaseACKChan(c *batchACKState) {
c.next = nil
ackChanPool.Put(c)
}

func (l *chanList) prepend(ch *ackChan) {
func (l *chanList) prepend(ch *batchACKState) {
ch.next = l.head
l.head = ch
if l.tail == nil {
Expand All @@ -248,7 +276,7 @@ func (l *chanList) concat(other *chanList) {
l.tail = other.tail
}

func (l *chanList) append(ch *ackChan) {
func (l *chanList) append(ch *batchACKState) {
if l.head == nil {
l.head = ch
} else {
Expand All @@ -261,18 +289,18 @@ func (l *chanList) empty() bool {
return l.head == nil
}

func (l *chanList) front() *ackChan {
func (l *chanList) front() *batchACKState {
return l.head
}

func (l *chanList) channel() chan batchAckMsg {
if l.head == nil {
return nil
}
return l.head.ch
return l.head.ackChan
}

func (l *chanList) pop() *ackChan {
func (l *chanList) pop() *batchACKState {
ch := l.head
if ch != nil {
l.head = ch.next
Expand Down
Loading

0 comments on commit b797c48

Please sign in to comment.