From c40fe346279efd7999b9bae465d9bcf96571ff09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 21 Oct 2024 11:27:10 +0200 Subject: [PATCH 1/2] Fix entry deletion in the memqueue --- libbeat/publisher/queue/memqueue/runloop.go | 10 ++++++---- libbeat/publisher/queue/memqueue/runloop_test.go | 10 ++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/libbeat/publisher/queue/memqueue/runloop.go b/libbeat/publisher/queue/memqueue/runloop.go index 397b41a25e83..180a313c5c2d 100644 --- a/libbeat/publisher/queue/memqueue/runloop.go +++ b/libbeat/publisher/queue/memqueue/runloop.go @@ -204,12 +204,14 @@ func (l *runLoop) handleGetReply(req *getRequest) { func (l *runLoop) handleDelete(count int) { byteCount := 0 + // remove entries from the queue for i := 0; i < count; i++ { - entry := l.broker.buf[(l.bufPos+i)%len(l.broker.buf)] - byteCount += entry.eventSize + index := (l.bufPos + i) % len(l.broker.buf) + byteCount += l.broker.buf[index].eventSize + l.broker.buf[index].event = nil } - // Advance position and counters. Event data was already cleared in - // batch.FreeEntries when the events were vended. + + // advance the buffer position and update tracking fields l.bufPos = (l.bufPos + count) % len(l.broker.buf) l.eventCount -= count l.consumedCount -= count diff --git a/libbeat/publisher/queue/memqueue/runloop_test.go b/libbeat/publisher/queue/memqueue/runloop_test.go index f6c83e8fec0b..2fba40325c36 100644 --- a/libbeat/publisher/queue/memqueue/runloop_test.go +++ b/libbeat/publisher/queue/memqueue/runloop_test.go @@ -19,6 +19,7 @@ package memqueue import ( "context" + "slices" "testing" "time" @@ -176,11 +177,20 @@ func TestObserverRemoveEvents(t *testing.T) { // Initialize the queue entries to a test byte size for i := range rl.broker.buf { rl.broker.buf[i].eventSize = 123 + rl.broker.buf[i].event = publisher.Event{} } const deleteCount = 25 rl.broker.deleteChan <- deleteCount // Run one iteration of the run loop, so it can handle the delete request rl.runIteration() + + // The entries should actually be deleted + expectedRemaining := len(rl.broker.buf) - deleteCount + remainingEntries := slices.DeleteFunc(slices.Clone(rl.broker.buf), func(entry queueEntry) bool { + return entry.event == nil + }) + assert.Len(t, remainingEntries, expectedRemaining) + // It should have deleted 25 events, so we expect the size to be 25 * 123. assertRegistryUint(t, reg, "queue.removed.events", deleteCount, "Deleting from the queue should report the removed events") assertRegistryUint(t, reg, "queue.removed.bytes", deleteCount*123, "Deleting from the queue should report the removed bytes") From b31141f9acdd41f1a88549243d8ac027e59a7cec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 21 Oct 2024 11:35:43 +0200 Subject: [PATCH 2/2] Fix linter errors --- libbeat/publisher/queue/memqueue/broker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index b617bae61102..e060eedb85e6 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -290,7 +290,7 @@ var batchPool = sync.Pool{ } func newBatch(queue *broker, start, count int) *batch { - batch := batchPool.Get().(*batch) + batch := batchPool.Get().(*batch) //nolint: errcheck // we want this to panic if there's an error batch.next = nil batch.queue = queue batch.start = start