Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic metrics to disk queue #33471

Merged
merged 5 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions libbeat/publisher/queue/diskqueue/core_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,21 @@ func (dq *diskQueue) run() {
// If there were blocked producers waiting for more queue space,
// we might be able to unblock them now.
dq.maybeUnblockProducers()

case metricsReq := <-dq.metricsRequestChan:
dq.handleMetricsRequest(metricsReq)
}
}
}

// handleMetricsRequest responds to an event on the metricsRequestChan chan
func (dq *diskQueue) handleMetricsRequest(request metricsRequest) {
resp := metricsRequestResponse{
sizeOnDisk: dq.segments.sizeOnDisk(),
}
request.response <- resp
}

func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) {
// Pathological case checking: make sure the incoming frame isn't bigger
// than an entire segment all by itself (as long as it isn't, it is
Expand Down
40 changes: 39 additions & 1 deletion libbeat/publisher/queue/diskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package diskqueue
import (
"errors"
"fmt"
"io"
"os"
"sync"

"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/opt"
)

// diskQueue is the internal type representing a disk-based implementation
Expand Down Expand Up @@ -71,6 +73,9 @@ type diskQueue struct {
// The API channel used by diskQueueProducer to write events.
producerWriteRequestChan chan producerWriteRequest

// API channel used by the public Metrics() API to request queue metrics
metricsRequestChan chan metricsRequest

// pendingFrames is a list of all incoming data frames that have been
// accepted by the queue and are waiting to be sent to the writer loop.
// Segment ids in this list always appear in sorted order, even between
Expand All @@ -86,6 +91,16 @@ type diskQueue struct {
done chan struct{}
}

// channel request for metrics from an external client
type metricsRequest struct {
response chan metricsRequestResponse
}

// metrics response from the disk queue
type metricsRequestResponse struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could add number of items in the queue. On startup the segments are read, and they contain a count field. We could update that total when we add & ack events in the queue.

I'm not sure oldestEntryID is very helpful, the ID isn't a time, so if I told you the oldest ID was 13075 what would you do with that information? I think I'd rather see a guage that shows me the rate things are being added to the queue and being vacated from the queue as well as total number in the queue.

For OccupiedRead, is this number of events that we have sent but haven't received an ACK for?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure oldestEntryID is very helpful, the ID isn't a time, so if I told you the oldest ID was 13075 what would you do with that information? I think I'd rather see a guage that shows me the rate things are being added to the queue and being vacated from the queue as well as total number in the queue.

I was thinking the same. The only value of oldestEntryID is that we can look to see if it is changing to know if the queue is draining. We will probably get very similar information from observing the current queue size, but the oldestEntryID would let us catch the edge case where the queue is almost always full but data is still moving through it.

I am not opposed to removing this metric, but I think it has some small value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could add number of items in the queue. On startup the segments are read, and they contain a count field. We could update that total when we add & ack events in the queue

Having EventCount for the disk queue would be interesting, and would mean we can always rely on the EventCount metric being populated between both queue types which I think makes the metrics easier to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reluctant to use oldestEntryID as an indicator of a draining queue, if for some reason an event can't be delivered but we have retries, than oldestEntryID might be static but the queue could be draining.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If oldestEntryID is only ever useful during edge cases, let's just remove it. It will be better to add some more obvious metrics for those, like watching an increasing retry counter in the output metrics.

sizeOnDisk uint64
}

func init() {
queue.RegisterQueueType(
"disk",
Expand Down Expand Up @@ -220,6 +235,7 @@ func NewQueue(logger *logp.Logger, settings Settings) (*diskQueue, error) {
deleterLoop: newDeleterLoop(settings),

producerWriteRequestChan: make(chan producerWriteRequest),
metricsRequestChan: make(chan metricsRequest),

done: make(chan struct{}),
}
Expand Down Expand Up @@ -281,6 +297,28 @@ func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer {
}
}

// Metrics returns current disk metrics
func (dq *diskQueue) Metrics() (queue.Metrics, error) {
return queue.Metrics{}, queue.ErrMetricsNotImplemented
respChan := make(chan metricsRequestResponse, 1)
req := metricsRequest{response: respChan}

select {
case <-dq.done:
return queue.Metrics{}, io.EOF
case dq.metricsRequestChan <- req:

}

resp := metricsRequestResponse{}
select {
case <-dq.done:
return queue.Metrics{}, io.EOF
case resp = <-respChan:
}

maxSize := dq.settings.MaxBufferSize
return queue.Metrics{
ByteLimit: opt.UintWith(maxSize),
ByteCount: opt.UintWith(resp.sizeOnDisk),
}, nil
}
41 changes: 41 additions & 0 deletions libbeat/publisher/queue/diskqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/stretchr/testify/require"
)

var seed int64
Expand Down Expand Up @@ -75,6 +78,44 @@ func TestProduceConsumer(t *testing.T) {
t.Run("direct", testWith(makeTestQueue()))
}

func TestMetrics(t *testing.T) {
dir, err := ioutil.TempDir("", "diskqueue_metrics")
defer func() {
_ = os.RemoveAll(dir)
}()
require.NoError(t, err)
settings := DefaultSettings()
settings.Path = dir
// lower max segment size so we can get multiple segments
settings.MaxSegmentSize = 100

testQueue, err := NewQueue(logp.L(), settings)
require.NoError(t, err)
defer testQueue.Close()

eventsToTest := 100

// Send events to queue
producer := testQueue.Producer(queue.ProducerConfig{})
sendEventsToQueue(eventsToTest, producer)

// fetch metrics before we read any events
time.Sleep(time.Millisecond * 500)
testMetrics, err := testQueue.Metrics()
require.NoError(t, err)

require.Equal(t, testMetrics.ByteLimit.ValueOr(0), uint64((1 << 30)))
require.NotZero(t, testMetrics.ByteCount.ValueOr(0))
t.Logf("got %d bytes written", testMetrics.ByteCount.ValueOr(0))

}

func sendEventsToQueue(count int, prod queue.Producer) {
for i := 0; i < count; i++ {
prod.Publish(queuetest.MakeEvent(mapstr.M{"count": i}))
}
}

func makeTestQueue() queuetest.QueueFactory {
return func(t *testing.T) queue.Queue {
dir, err := ioutil.TempDir("", "diskqueue_test")
Expand Down