Skip to content

Commit

Permalink
Add basic metrics to disk queue (#33471)
Browse files Browse the repository at this point in the history
* add basic metrics to disk queue

* linter

* debugging testt CI issues

* tinkering with tests still

* remove unneeded metrics, clean up
  • Loading branch information
fearful-symmetry authored and chrisberkhout committed Jun 1, 2023
1 parent 58d79ec commit 2c1ca62
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 1 deletion.
11 changes: 11 additions & 0 deletions libbeat/publisher/queue/diskqueue/core_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,21 @@ func (dq *diskQueue) run() {
// If there were blocked producers waiting for more queue space,
// we might be able to unblock them now.
dq.maybeUnblockProducers()

case metricsReq := <-dq.metricsRequestChan:
dq.handleMetricsRequest(metricsReq)
}
}
}

// handleMetricsRequest responds to an event on the metricsRequestChan chan
func (dq *diskQueue) handleMetricsRequest(request metricsRequest) {
resp := metricsRequestResponse{
sizeOnDisk: dq.segments.sizeOnDisk(),
}
request.response <- resp
}

func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) {
// Pathological case checking: make sure the incoming frame isn't bigger
// than an entire segment all by itself (as long as it isn't, it is
Expand Down
40 changes: 39 additions & 1 deletion libbeat/publisher/queue/diskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package diskqueue
import (
"errors"
"fmt"
"io"
"os"
"sync"

"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/opt"
)

// diskQueue is the internal type representing a disk-based implementation
Expand Down Expand Up @@ -71,6 +73,9 @@ type diskQueue struct {
// The API channel used by diskQueueProducer to write events.
producerWriteRequestChan chan producerWriteRequest

// API channel used by the public Metrics() API to request queue metrics
metricsRequestChan chan metricsRequest

// pendingFrames is a list of all incoming data frames that have been
// accepted by the queue and are waiting to be sent to the writer loop.
// Segment ids in this list always appear in sorted order, even between
Expand All @@ -86,6 +91,16 @@ type diskQueue struct {
done chan struct{}
}

// channel request for metrics from an external client
type metricsRequest struct {
response chan metricsRequestResponse
}

// metrics response from the disk queue
type metricsRequestResponse struct {
sizeOnDisk uint64
}

func init() {
queue.RegisterQueueType(
"disk",
Expand Down Expand Up @@ -220,6 +235,7 @@ func NewQueue(logger *logp.Logger, settings Settings) (*diskQueue, error) {
deleterLoop: newDeleterLoop(settings),

producerWriteRequestChan: make(chan producerWriteRequest),
metricsRequestChan: make(chan metricsRequest),

done: make(chan struct{}),
}
Expand Down Expand Up @@ -281,6 +297,28 @@ func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer {
}
}

// Metrics returns current disk metrics
func (dq *diskQueue) Metrics() (queue.Metrics, error) {
return queue.Metrics{}, queue.ErrMetricsNotImplemented
respChan := make(chan metricsRequestResponse, 1)
req := metricsRequest{response: respChan}

select {
case <-dq.done:
return queue.Metrics{}, io.EOF
case dq.metricsRequestChan <- req:

}

resp := metricsRequestResponse{}
select {
case <-dq.done:
return queue.Metrics{}, io.EOF
case resp = <-respChan:
}

maxSize := dq.settings.MaxBufferSize
return queue.Metrics{
ByteLimit: opt.UintWith(maxSize),
ByteCount: opt.UintWith(resp.sizeOnDisk),
}, nil
}
41 changes: 41 additions & 0 deletions libbeat/publisher/queue/diskqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/stretchr/testify/require"
)

var seed int64
Expand Down Expand Up @@ -75,6 +78,44 @@ func TestProduceConsumer(t *testing.T) {
t.Run("direct", testWith(makeTestQueue()))
}

func TestMetrics(t *testing.T) {
dir, err := ioutil.TempDir("", "diskqueue_metrics")
defer func() {
_ = os.RemoveAll(dir)
}()
require.NoError(t, err)
settings := DefaultSettings()
settings.Path = dir
// lower max segment size so we can get multiple segments
settings.MaxSegmentSize = 100

testQueue, err := NewQueue(logp.L(), settings)
require.NoError(t, err)
defer testQueue.Close()

eventsToTest := 100

// Send events to queue
producer := testQueue.Producer(queue.ProducerConfig{})
sendEventsToQueue(eventsToTest, producer)

// fetch metrics before we read any events
time.Sleep(time.Millisecond * 500)
testMetrics, err := testQueue.Metrics()
require.NoError(t, err)

require.Equal(t, testMetrics.ByteLimit.ValueOr(0), uint64((1 << 30)))
require.NotZero(t, testMetrics.ByteCount.ValueOr(0))
t.Logf("got %d bytes written", testMetrics.ByteCount.ValueOr(0))

}

func sendEventsToQueue(count int, prod queue.Producer) {
for i := 0; i < count; i++ {
prod.Publish(queuetest.MakeEvent(mapstr.M{"count": i}))
}
}

func makeTestQueue() queuetest.QueueFactory {
return func(t *testing.T) queue.Queue {
dir, err := ioutil.TempDir("", "diskqueue_test")
Expand Down

0 comments on commit 2c1ca62

Please sign in to comment.