From 3610c869936cf93b961ba94907174007b2b06141 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Thu, 27 Oct 2022 11:05:02 -0700 Subject: [PATCH 1/5] add basic metrics to disk queue --- .../publisher/queue/diskqueue/core_loop.go | 13 ++++ libbeat/publisher/queue/diskqueue/queue.go | 37 +++++++++++- .../publisher/queue/diskqueue/queue_test.go | 59 +++++++++++++++++++ libbeat/publisher/queue/diskqueue/segments.go | 29 +++++++++ libbeat/publisher/queue/queue.go | 2 + 5 files changed, 139 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/queue/diskqueue/core_loop.go b/libbeat/publisher/queue/diskqueue/core_loop.go index c08c204d51ed..9cb8c2a7618c 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop.go +++ b/libbeat/publisher/queue/diskqueue/core_loop.go @@ -84,10 +84,23 @@ func (dq *diskQueue) run() { // If there were blocked producers waiting for more queue space, // we might be able to unblock them now. dq.maybeUnblockProducers() + + case metricsReq := <-dq.metricsRequestChan: + dq.handleMetricsRequest(metricsReq) } } } +// handleMetricsRequest responds to an event on the metricsRequestChan chan +func (dq *diskQueue) handleMetricsRequest(request metricsRequest) { + resp := metricsRequestResponse{ + sizeOnDisk: dq.segments.sizeOnDisk(), + OccupiedRead: dq.segments.unACKedReadBytes(), + oldestEntryID: dq.segments.oldestIDOnDisk(), + } + request.response <- resp +} + func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) { // Pathological case checking: make sure the incoming frame isn't bigger // than an entire segment all by itself (as long as it isn't, it is diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 837b3be6ef59..c6e42bca4f2e 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -20,6 +20,7 @@ package diskqueue import ( "errors" "fmt" + "io" "os" "sync" @@ -27,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/opt" ) // diskQueue is the internal type representing a disk-based implementation @@ -71,6 +73,9 @@ type diskQueue struct { // The API channel used by diskQueueProducer to write events. producerWriteRequestChan chan producerWriteRequest + // API channel used by the public Metrics() API to request queue metrics + metricsRequestChan chan metricsRequest + // pendingFrames is a list of all incoming data frames that have been // accepted by the queue and are waiting to be sent to the writer loop. // Segment ids in this list always appear in sorted order, even between @@ -86,6 +91,18 @@ type diskQueue struct { done chan struct{} } +// channel request for metrics from an external client +type metricsRequest struct { + response chan metricsRequestResponse +} + +// metrics response from the disk queue +type metricsRequestResponse struct { + sizeOnDisk uint64 + oldestEntryID segmentID + OccupiedRead uint64 +} + func init() { queue.RegisterQueueType( "disk", @@ -220,6 +237,7 @@ func NewQueue(logger *logp.Logger, settings Settings) (*diskQueue, error) { deleterLoop: newDeleterLoop(settings), producerWriteRequestChan: make(chan producerWriteRequest), + metricsRequestChan: make(chan metricsRequest), done: make(chan struct{}), } @@ -281,6 +299,23 @@ func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer { } } +// Metrics returns current disk metrics func (dq *diskQueue) Metrics() (queue.Metrics, error) { - return queue.Metrics{}, queue.ErrMetricsNotImplemented + respChan := make(chan metricsRequestResponse, 1) + req := metricsRequest{response: respChan} + + select { + case <-dq.done: + return queue.Metrics{}, io.EOF + case dq.metricsRequestChan <- req: + } + resp := <-respChan + + maxSize := dq.settings.MaxBufferSize + return queue.Metrics{ + ByteLimit: opt.UintWith(maxSize), + ByteCount: opt.UintWith(resp.sizeOnDisk), + UnackedConsumedBytes: opt.UintWith(uint64(resp.OccupiedRead)), + OldestEntryID: queue.EntryID(resp.oldestEntryID), + }, nil } diff --git a/libbeat/publisher/queue/diskqueue/queue_test.go b/libbeat/publisher/queue/diskqueue/queue_test.go index a92850039ce8..cac22acd2cb3 100644 --- a/libbeat/publisher/queue/diskqueue/queue_test.go +++ b/libbeat/publisher/queue/diskqueue/queue_test.go @@ -28,6 +28,8 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/stretchr/testify/require" ) var seed int64 @@ -75,6 +77,63 @@ func TestProduceConsumer(t *testing.T) { t.Run("direct", testWith(makeTestQueue())) } +func TestMetrics(t *testing.T) { + dir, err := ioutil.TempDir("", "diskqueue_test") + defer func() { + _ = os.RemoveAll(dir) + }() + require.NoError(t, err) + settings := DefaultSettings() + settings.Path = dir + // lower max segment size so we can get multiple segments + settings.MaxSegmentSize = 100 + + testQueue, err := NewQueue(logp.L(), settings) + require.NoError(t, err) + defer testQueue.Close() + + eventsToTest := 100 + + // Send events to queue + producer := testQueue.Producer(queue.ProducerConfig{}) + sendEventsToQueue(eventsToTest, producer) + + // fetch metrics before we read any events + time.Sleep(time.Millisecond * 100) + testMetrics, err := testQueue.Metrics() + require.NoError(t, err) + + require.Equal(t, testMetrics.ByteLimit.ValueOr(0), uint64((1 << 30))) + require.NotZero(t, testMetrics.ByteCount.ValueOr(0)) + require.NotZero(t, testMetrics.UnackedConsumedBytes.ValueOr(0)) + + // now read & ACK the events + batch, err := testQueue.Get(eventsToTest) + require.NoError(t, err, "error in Get") + // ACK + batch.Done() + + // Occupied read should now be zero + testMetricsACKed, err := testQueue.Metrics() + require.NoError(t, err) + require.Zero(t, testMetricsACKed.UnackedConsumedBytes.ValueOr(0)) + + // insert again + sendEventsToQueue(eventsToTest, producer) + // This time, the oldest segment ID should be > 0 + time.Sleep(time.Millisecond * 100) + testMetricsSecond, err := testQueue.Metrics() + require.NoError(t, err) + require.NotZero(t, testMetricsSecond.OldestEntryID) + +} + +func sendEventsToQueue(count int, prod queue.Producer) { + for i := 0; i < count; i++ { + prod.Publish(queuetest.MakeEvent(mapstr.M{"count": i})) + } +} + func makeTestQueue() queuetest.QueueFactory { return func(t *testing.T) queue.Queue { dir, err := ioutil.TempDir("", "diskqueue_test") diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index 331aa3f308a3..b285e6d815a0 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -470,6 +470,35 @@ func (segments *diskQueueSegments) sizeOnDisk() uint64 { return total } +// Iterates through all segment types to find the oldest ID in the queue +func (segments *diskQueueSegments) oldestIDOnDisk() segmentID { + // not all segment types are pre-sorted, hence us appending some, taking [0] from others + oldSegments := []*queueSegment{} + + if len(segments.reading) > 0 { + oldSegments = append(oldSegments, segments.reading[0]) + } + if len(segments.acking) > 0 { + oldSegments = append(oldSegments, segments.acking[0]) + } + + oldSegments = append(oldSegments, segments.writing...) + oldSegments = append(oldSegments, segments.acked...) + + sort.Slice(oldSegments, func(i, j int) bool { return oldSegments[i].id < oldSegments[j].id }) + + return oldSegments[0].id +} + +// unACKedReadBytes returns the total count of bytes that have been read, but not ack'ed by the consumer +func (segments *diskQueueSegments) unACKedReadBytes() uint64 { + var acc uint64 + for _, seg := range segments.acking { + acc += seg.byteCount + } + return acc +} + // segmentReader handles reading of segments. getReader sets up the // reader and handles setting up the Reader to deal with the different // schema version. With Schema version 2 there is the option for diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index ce87e6e059c9..431374637bdd 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -47,6 +47,8 @@ type Metrics struct { //UnackedConsumedEvents is the count of events that an output consumer has read, but not yet ack'ed UnackedConsumedEvents opt.Uint + //UnackedConsumedEvents is the count of bytes that an output consumer has read, but not yet ack'ed + UnackedConsumedBytes opt.Uint //OldestActiveTimestamp is the timestamp of the oldest item in the queue. OldestActiveTimestamp common.Time From b1d0ad82352d4823848173c409397be59f9f2656 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Thu, 27 Oct 2022 12:17:34 -0700 Subject: [PATCH 2/5] linter --- libbeat/publisher/queue/diskqueue/queue.go | 2 +- libbeat/publisher/queue/diskqueue/queue_test.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index c6e42bca4f2e..359088551729 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -315,7 +315,7 @@ func (dq *diskQueue) Metrics() (queue.Metrics, error) { return queue.Metrics{ ByteLimit: opt.UintWith(maxSize), ByteCount: opt.UintWith(resp.sizeOnDisk), - UnackedConsumedBytes: opt.UintWith(uint64(resp.OccupiedRead)), + UnackedConsumedBytes: opt.UintWith(resp.OccupiedRead), OldestEntryID: queue.EntryID(resp.oldestEntryID), }, nil } diff --git a/libbeat/publisher/queue/diskqueue/queue_test.go b/libbeat/publisher/queue/diskqueue/queue_test.go index cac22acd2cb3..3cd72a5b2f04 100644 --- a/libbeat/publisher/queue/diskqueue/queue_test.go +++ b/libbeat/publisher/queue/diskqueue/queue_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/stretchr/testify/require" ) From 9c82d69b9c755fd74ee6c610ca9e8cdc25e0c87e Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Thu, 27 Oct 2022 15:42:14 -0700 Subject: [PATCH 3/5] debugging testt CI issues --- libbeat/publisher/queue/diskqueue/queue_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/queue/diskqueue/queue_test.go b/libbeat/publisher/queue/diskqueue/queue_test.go index 3cd72a5b2f04..82cf2dee2900 100644 --- a/libbeat/publisher/queue/diskqueue/queue_test.go +++ b/libbeat/publisher/queue/diskqueue/queue_test.go @@ -79,7 +79,7 @@ func TestProduceConsumer(t *testing.T) { } func TestMetrics(t *testing.T) { - dir, err := ioutil.TempDir("", "diskqueue_test") + dir, err := ioutil.TempDir("", "diskqueue_metrics") defer func() { _ = os.RemoveAll(dir) }() @@ -106,6 +106,7 @@ func TestMetrics(t *testing.T) { require.Equal(t, testMetrics.ByteLimit.ValueOr(0), uint64((1 << 30))) require.NotZero(t, testMetrics.ByteCount.ValueOr(0)) + t.Logf("got %d bytes written", testMetrics.ByteCount.ValueOr(0)) require.NotZero(t, testMetrics.UnackedConsumedBytes.ValueOr(0)) // now read & ACK the events From 28f0d43d8e3d7b498b6249e74cfa0575266d619f Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Fri, 28 Oct 2022 11:04:49 -0700 Subject: [PATCH 4/5] tinkering with tests still --- libbeat/publisher/queue/diskqueue/queue_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/queue/diskqueue/queue_test.go b/libbeat/publisher/queue/diskqueue/queue_test.go index 82cf2dee2900..3442be66f2de 100644 --- a/libbeat/publisher/queue/diskqueue/queue_test.go +++ b/libbeat/publisher/queue/diskqueue/queue_test.go @@ -100,18 +100,19 @@ func TestMetrics(t *testing.T) { sendEventsToQueue(eventsToTest, producer) // fetch metrics before we read any events - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 500) testMetrics, err := testQueue.Metrics() require.NoError(t, err) require.Equal(t, testMetrics.ByteLimit.ValueOr(0), uint64((1 << 30))) require.NotZero(t, testMetrics.ByteCount.ValueOr(0)) t.Logf("got %d bytes written", testMetrics.ByteCount.ValueOr(0)) - require.NotZero(t, testMetrics.UnackedConsumedBytes.ValueOr(0)) // now read & ACK the events batch, err := testQueue.Get(eventsToTest) require.NoError(t, err, "error in Get") + time.Sleep(time.Millisecond * 100) + require.NotZero(t, testMetrics.UnackedConsumedBytes.ValueOr(0)) // ACK batch.Done() From 1eb30b6284d925768246e8b4d1ad8b7ecf91d149 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Thu, 3 Nov 2022 13:05:18 -0700 Subject: [PATCH 5/5] remove unneeded metrics, clean up --- .../publisher/queue/diskqueue/core_loop.go | 4 +-- libbeat/publisher/queue/diskqueue/queue.go | 19 +++++++----- .../publisher/queue/diskqueue/queue_test.go | 21 -------------- libbeat/publisher/queue/diskqueue/segments.go | 29 ------------------- libbeat/publisher/queue/queue.go | 2 -- 5 files changed, 12 insertions(+), 63 deletions(-) diff --git a/libbeat/publisher/queue/diskqueue/core_loop.go b/libbeat/publisher/queue/diskqueue/core_loop.go index 9cb8c2a7618c..93051dd4581e 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop.go +++ b/libbeat/publisher/queue/diskqueue/core_loop.go @@ -94,9 +94,7 @@ func (dq *diskQueue) run() { // handleMetricsRequest responds to an event on the metricsRequestChan chan func (dq *diskQueue) handleMetricsRequest(request metricsRequest) { resp := metricsRequestResponse{ - sizeOnDisk: dq.segments.sizeOnDisk(), - OccupiedRead: dq.segments.unACKedReadBytes(), - oldestEntryID: dq.segments.oldestIDOnDisk(), + sizeOnDisk: dq.segments.sizeOnDisk(), } request.response <- resp } diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 359088551729..47a268497494 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -98,9 +98,7 @@ type metricsRequest struct { // metrics response from the disk queue type metricsRequestResponse struct { - sizeOnDisk uint64 - oldestEntryID segmentID - OccupiedRead uint64 + sizeOnDisk uint64 } func init() { @@ -308,14 +306,19 @@ func (dq *diskQueue) Metrics() (queue.Metrics, error) { case <-dq.done: return queue.Metrics{}, io.EOF case dq.metricsRequestChan <- req: + + } + + resp := metricsRequestResponse{} + select { + case <-dq.done: + return queue.Metrics{}, io.EOF + case resp = <-respChan: } - resp := <-respChan maxSize := dq.settings.MaxBufferSize return queue.Metrics{ - ByteLimit: opt.UintWith(maxSize), - ByteCount: opt.UintWith(resp.sizeOnDisk), - UnackedConsumedBytes: opt.UintWith(resp.OccupiedRead), - OldestEntryID: queue.EntryID(resp.oldestEntryID), + ByteLimit: opt.UintWith(maxSize), + ByteCount: opt.UintWith(resp.sizeOnDisk), }, nil } diff --git a/libbeat/publisher/queue/diskqueue/queue_test.go b/libbeat/publisher/queue/diskqueue/queue_test.go index 3442be66f2de..eac65fd85184 100644 --- a/libbeat/publisher/queue/diskqueue/queue_test.go +++ b/libbeat/publisher/queue/diskqueue/queue_test.go @@ -108,27 +108,6 @@ func TestMetrics(t *testing.T) { require.NotZero(t, testMetrics.ByteCount.ValueOr(0)) t.Logf("got %d bytes written", testMetrics.ByteCount.ValueOr(0)) - // now read & ACK the events - batch, err := testQueue.Get(eventsToTest) - require.NoError(t, err, "error in Get") - time.Sleep(time.Millisecond * 100) - require.NotZero(t, testMetrics.UnackedConsumedBytes.ValueOr(0)) - // ACK - batch.Done() - - // Occupied read should now be zero - testMetricsACKed, err := testQueue.Metrics() - require.NoError(t, err) - require.Zero(t, testMetricsACKed.UnackedConsumedBytes.ValueOr(0)) - - // insert again - sendEventsToQueue(eventsToTest, producer) - // This time, the oldest segment ID should be > 0 - time.Sleep(time.Millisecond * 100) - testMetricsSecond, err := testQueue.Metrics() - require.NoError(t, err) - require.NotZero(t, testMetricsSecond.OldestEntryID) - } func sendEventsToQueue(count int, prod queue.Producer) { diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index b285e6d815a0..331aa3f308a3 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -470,35 +470,6 @@ func (segments *diskQueueSegments) sizeOnDisk() uint64 { return total } -// Iterates through all segment types to find the oldest ID in the queue -func (segments *diskQueueSegments) oldestIDOnDisk() segmentID { - // not all segment types are pre-sorted, hence us appending some, taking [0] from others - oldSegments := []*queueSegment{} - - if len(segments.reading) > 0 { - oldSegments = append(oldSegments, segments.reading[0]) - } - if len(segments.acking) > 0 { - oldSegments = append(oldSegments, segments.acking[0]) - } - - oldSegments = append(oldSegments, segments.writing...) - oldSegments = append(oldSegments, segments.acked...) - - sort.Slice(oldSegments, func(i, j int) bool { return oldSegments[i].id < oldSegments[j].id }) - - return oldSegments[0].id -} - -// unACKedReadBytes returns the total count of bytes that have been read, but not ack'ed by the consumer -func (segments *diskQueueSegments) unACKedReadBytes() uint64 { - var acc uint64 - for _, seg := range segments.acking { - acc += seg.byteCount - } - return acc -} - // segmentReader handles reading of segments. getReader sets up the // reader and handles setting up the Reader to deal with the different // schema version. With Schema version 2 there is the option for diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 431374637bdd..ce87e6e059c9 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -47,8 +47,6 @@ type Metrics struct { //UnackedConsumedEvents is the count of events that an output consumer has read, but not yet ack'ed UnackedConsumedEvents opt.Uint - //UnackedConsumedEvents is the count of bytes that an output consumer has read, but not yet ack'ed - UnackedConsumedBytes opt.Uint //OldestActiveTimestamp is the timestamp of the oldest item in the queue. OldestActiveTimestamp common.Time