Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic metrics to disk queue #33471

Merged
merged 5 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions libbeat/publisher/queue/diskqueue/core_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,23 @@ func (dq *diskQueue) run() {
// If there were blocked producers waiting for more queue space,
// we might be able to unblock them now.
dq.maybeUnblockProducers()

case metricsReq := <-dq.metricsRequestChan:
dq.handleMetricsRequest(metricsReq)
}
}
}

// handleMetricsRequest responds to an event on the metricsRequestChan chan
func (dq *diskQueue) handleMetricsRequest(request metricsRequest) {
resp := metricsRequestResponse{
sizeOnDisk: dq.segments.sizeOnDisk(),
OccupiedRead: dq.segments.unACKedReadBytes(),
oldestEntryID: dq.segments.oldestIDOnDisk(),
}
request.response <- resp
}

func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) {
// Pathological case checking: make sure the incoming frame isn't bigger
// than an entire segment all by itself (as long as it isn't, it is
Expand Down
37 changes: 36 additions & 1 deletion libbeat/publisher/queue/diskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package diskqueue
import (
"errors"
"fmt"
"io"
"os"
"sync"

"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/opt"
)

// diskQueue is the internal type representing a disk-based implementation
Expand Down Expand Up @@ -71,6 +73,9 @@ type diskQueue struct {
// The API channel used by diskQueueProducer to write events.
producerWriteRequestChan chan producerWriteRequest

// API channel used by the public Metrics() API to request queue metrics
metricsRequestChan chan metricsRequest

// pendingFrames is a list of all incoming data frames that have been
// accepted by the queue and are waiting to be sent to the writer loop.
// Segment ids in this list always appear in sorted order, even between
Expand All @@ -86,6 +91,18 @@ type diskQueue struct {
done chan struct{}
}

// channel request for metrics from an external client
type metricsRequest struct {
response chan metricsRequestResponse
}

// metrics response from the disk queue
type metricsRequestResponse struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could add number of items in the queue. On startup the segments are read, and they contain a count field. We could update that total when we add & ack events in the queue.

I'm not sure oldestEntryID is very helpful, the ID isn't a time, so if I told you the oldest ID was 13075 what would you do with that information? I think I'd rather see a guage that shows me the rate things are being added to the queue and being vacated from the queue as well as total number in the queue.

For OccupiedRead, is this number of events that we have sent but haven't received an ACK for?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure oldestEntryID is very helpful, the ID isn't a time, so if I told you the oldest ID was 13075 what would you do with that information? I think I'd rather see a guage that shows me the rate things are being added to the queue and being vacated from the queue as well as total number in the queue.

I was thinking the same. The only value of oldestEntryID is that we can look to see if it is changing to know if the queue is draining. We will probably get very similar information from observing the current queue size, but the oldestEntryID would let us catch the edge case where the queue is almost always full but data is still moving through it.

I am not opposed to removing this metric, but I think it has some small value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could add number of items in the queue. On startup the segments are read, and they contain a count field. We could update that total when we add & ack events in the queue

Having EventCount for the disk queue would be interesting, and would mean we can always rely on the EventCount metric being populated between both queue types which I think makes the metrics easier to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reluctant to use oldestEntryID as an indicator of a draining queue, if for some reason an event can't be delivered but we have retries, than oldestEntryID might be static but the queue could be draining.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If oldestEntryID is only ever useful during edge cases, let's just remove it. It will be better to add some more obvious metrics for those, like watching an increasing retry counter in the output metrics.

sizeOnDisk uint64
oldestEntryID segmentID
OccupiedRead uint64
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
}

func init() {
queue.RegisterQueueType(
"disk",
Expand Down Expand Up @@ -220,6 +237,7 @@ func NewQueue(logger *logp.Logger, settings Settings) (*diskQueue, error) {
deleterLoop: newDeleterLoop(settings),

producerWriteRequestChan: make(chan producerWriteRequest),
metricsRequestChan: make(chan metricsRequest),

done: make(chan struct{}),
}
Expand Down Expand Up @@ -281,6 +299,23 @@ func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer {
}
}

// Metrics returns current disk metrics
func (dq *diskQueue) Metrics() (queue.Metrics, error) {
return queue.Metrics{}, queue.ErrMetricsNotImplemented
respChan := make(chan metricsRequestResponse, 1)
req := metricsRequest{response: respChan}

select {
case <-dq.done:
return queue.Metrics{}, io.EOF
case dq.metricsRequestChan <- req:
}
resp := <-respChan
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved

maxSize := dq.settings.MaxBufferSize
return queue.Metrics{
ByteLimit: opt.UintWith(maxSize),
ByteCount: opt.UintWith(resp.sizeOnDisk),
UnackedConsumedBytes: opt.UintWith(resp.OccupiedRead),
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
OldestEntryID: queue.EntryID(resp.oldestEntryID),
}, nil
}
62 changes: 62 additions & 0 deletions libbeat/publisher/queue/diskqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/queuetest"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/stretchr/testify/require"
)

var seed int64
Expand Down Expand Up @@ -75,6 +78,65 @@ func TestProduceConsumer(t *testing.T) {
t.Run("direct", testWith(makeTestQueue()))
}

func TestMetrics(t *testing.T) {
dir, err := ioutil.TempDir("", "diskqueue_metrics")
defer func() {
_ = os.RemoveAll(dir)
}()
require.NoError(t, err)
settings := DefaultSettings()
settings.Path = dir
// lower max segment size so we can get multiple segments
settings.MaxSegmentSize = 100

testQueue, err := NewQueue(logp.L(), settings)
require.NoError(t, err)
defer testQueue.Close()

eventsToTest := 100

// Send events to queue
producer := testQueue.Producer(queue.ProducerConfig{})
sendEventsToQueue(eventsToTest, producer)

// fetch metrics before we read any events
time.Sleep(time.Millisecond * 500)
testMetrics, err := testQueue.Metrics()
require.NoError(t, err)

require.Equal(t, testMetrics.ByteLimit.ValueOr(0), uint64((1 << 30)))
require.NotZero(t, testMetrics.ByteCount.ValueOr(0))
t.Logf("got %d bytes written", testMetrics.ByteCount.ValueOr(0))

// now read & ACK the events
batch, err := testQueue.Get(eventsToTest)
require.NoError(t, err, "error in Get")
time.Sleep(time.Millisecond * 100)
require.NotZero(t, testMetrics.UnackedConsumedBytes.ValueOr(0))
// ACK
batch.Done()

// Occupied read should now be zero
testMetricsACKed, err := testQueue.Metrics()
require.NoError(t, err)
require.Zero(t, testMetricsACKed.UnackedConsumedBytes.ValueOr(0))

// insert again
sendEventsToQueue(eventsToTest, producer)
// This time, the oldest segment ID should be > 0
time.Sleep(time.Millisecond * 100)
testMetricsSecond, err := testQueue.Metrics()
require.NoError(t, err)
require.NotZero(t, testMetricsSecond.OldestEntryID)

}

func sendEventsToQueue(count int, prod queue.Producer) {
for i := 0; i < count; i++ {
prod.Publish(queuetest.MakeEvent(mapstr.M{"count": i}))
}
}

func makeTestQueue() queuetest.QueueFactory {
return func(t *testing.T) queue.Queue {
dir, err := ioutil.TempDir("", "diskqueue_test")
Expand Down
29 changes: 29 additions & 0 deletions libbeat/publisher/queue/diskqueue/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,35 @@ func (segments *diskQueueSegments) sizeOnDisk() uint64 {
return total
}

// Iterates through all segment types to find the oldest ID in the queue
func (segments *diskQueueSegments) oldestIDOnDisk() segmentID {
// not all segment types are pre-sorted, hence us appending some, taking [0] from others
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
oldSegments := []*queueSegment{}

if len(segments.reading) > 0 {
oldSegments = append(oldSegments, segments.reading[0])
}
if len(segments.acking) > 0 {
oldSegments = append(oldSegments, segments.acking[0])
}

oldSegments = append(oldSegments, segments.writing...)
oldSegments = append(oldSegments, segments.acked...)

sort.Slice(oldSegments, func(i, j int) bool { return oldSegments[i].id < oldSegments[j].id })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this taking the oldest segment ID, or the oldest event ID? I think it is the former, and we want the latter.

We report the oldest event ID in the memory queue, which has no concept of a segment:

// If the queue is empty, we report the "oldest" ID as the next
// one that will be assigned. Otherwise, we report the ID attached
// to the oldest queueEntry.
oldestEntryID := l.nextEntryID
if oldestEntry := l.buf.OldestEntry(); oldestEntry != nil {
oldestEntryID = oldestEntry.id
}

Probably implementing this requires elastic/elastic-agent-shipper#27 to be done first

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AH, you're right. @leehinman Do you think there's any value in reporting the oldest segment ID? The more I think about it, I'm kind of leaning towards "no."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think oldest ID will be useful. Also it is easy to find out, the name of segment file is <id no>.seg, so just looking at the queue directory gives us this info.


return oldSegments[0].id
}

// unACKedReadBytes returns the total count of bytes that have been read, but not ack'ed by the consumer
func (segments *diskQueueSegments) unACKedReadBytes() uint64 {
var acc uint64
for _, seg := range segments.acking {
acc += seg.byteCount
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is right either, because this will count all the bytes in a segment which may consist of all the events in the segment file. Ideally we would want this to increment event by event, otherwise it will just always be the size of the segment file currently being sent.

The disk queue is a collection of segment files, each segment file containing multiple frames (events): https://github.com/elastic/beats/blob/main/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is difficult to implement in a useful way, I am in favour of just not implementing the metric at this point.

Copy link
Contributor Author

@fearful-symmetry fearful-symmetry Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leehinman can you chime in here? I'm kind of going off the assumption that if we're reporting values in bytes, it doesn't matter too much how those byte segments map to individual events, but I don't know enough about the backend of the disk queue to know if "currently size of whatever segment chunk just got sent" is useful or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the amount of events (or bytes) that have been read but not acked would be very useful. It should always be the same as amount of data the output has sent to but hasn't heard back yet. So the output should have this info.

}
return acc
}

// segmentReader handles reading of segments. getReader sets up the
// reader and handles setting up the Reader to deal with the different
// schema version. With Schema version 2 there is the option for
Expand Down
2 changes: 2 additions & 0 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type Metrics struct {

//UnackedConsumedEvents is the count of events that an output consumer has read, but not yet ack'ed
UnackedConsumedEvents opt.Uint
//UnackedConsumedEvents is the count of bytes that an output consumer has read, but not yet ack'ed
fearful-symmetry marked this conversation as resolved.
Show resolved Hide resolved
UnackedConsumedBytes opt.Uint

//OldestActiveTimestamp is the timestamp of the oldest item in the queue.
OldestActiveTimestamp common.Time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove OldestActiveTimestamp from this struct since we are not implementing it, since it requires too many changes to the internals of both queues?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'll require making changes in elastic-agent-shipper, so we may want to do that in a separate PR?

Copy link
Member

@cmacknz cmacknz Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes let's do a separate up PR to clean this up.

Expand Down