-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add basic metrics to disk queue #33471
Add basic metrics to disk queue #33471
Conversation
Pinging @elastic/elastic-agent (Team:Elastic-Agent) |
This pull request does not have a backport label.
To fixup this pull request, you need to add the backport labels for the needed
|
Tests are working locally, not sure what's up with CI.... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did a first pass based on my knowledge of the disk queue internals.
oldSegments = append(oldSegments, segments.writing...) | ||
oldSegments = append(oldSegments, segments.acked...) | ||
|
||
sort.Slice(oldSegments, func(i, j int) bool { return oldSegments[i].id < oldSegments[j].id }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this taking the oldest segment ID, or the oldest event ID? I think it is the former, and we want the latter.
We report the oldest event ID in the memory queue, which has no concept of a segment:
beats/libbeat/publisher/queue/memqueue/eventloop.go
Lines 155 to 161 in df5c62f
// If the queue is empty, we report the "oldest" ID as the next | |
// one that will be assigned. Otherwise, we report the ID attached | |
// to the oldest queueEntry. | |
oldestEntryID := l.nextEntryID | |
if oldestEntry := l.buf.OldestEntry(); oldestEntry != nil { | |
oldestEntryID = oldestEntry.id | |
} |
Probably implementing this requires elastic/elastic-agent-shipper#27 to be done first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AH, you're right. @leehinman Do you think there's any value in reporting the oldest segment ID? The more I think about it, I'm kind of leaning towards "no."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't think oldest ID will be useful. Also it is easy to find out, the name of segment file is <id no>.seg
, so just looking at the queue directory gives us this info.
libbeat/publisher/queue/queue.go
Outdated
@@ -47,6 +47,8 @@ type Metrics struct { | |||
|
|||
//UnackedConsumedEvents is the count of events that an output consumer has read, but not yet ack'ed | |||
UnackedConsumedEvents opt.Uint | |||
//UnackedConsumedEvents is the count of bytes that an output consumer has read, but not yet ack'ed | |||
UnackedConsumedBytes opt.Uint | |||
|
|||
//OldestActiveTimestamp is the timestamp of the oldest item in the queue. | |||
OldestActiveTimestamp common.Time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove OldestActiveTimestamp
from this struct since we are not implementing it, since it requires too many changes to the internals of both queues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That'll require making changes in elastic-agent-shipper
, so we may want to do that in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes let's do a separate up PR to clean this up.
func (segments *diskQueueSegments) unACKedReadBytes() uint64 { | ||
var acc uint64 | ||
for _, seg := range segments.acking { | ||
acc += seg.byteCount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is right either, because this will count all the bytes in a segment which may consist of all the events in the segment file. Ideally we would want this to increment event by event, otherwise it will just always be the size of the segment file currently being sent.
The disk queue is a collection of segment files, each segment file containing multiple frames (events): https://github.com/elastic/beats/blob/main/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is difficult to implement in a useful way, I am in favour of just not implementing the metric at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leehinman can you chime in here? I'm kind of going off the assumption that if we're reporting values in bytes, it doesn't matter too much how those byte segments map to individual events, but I don't know enough about the backend of the disk queue to know if "currently size of whatever segment chunk just got sent" is useful or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the amount of events (or bytes) that have been read but not acked would be very useful. It should always be the same as amount of data the output has sent to but hasn't heard back yet. So the output should have this info.
} | ||
|
||
// metrics response from the disk queue | ||
type metricsRequestResponse struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could add number of items in the queue. On startup the segments are read, and they contain a count field. We could update that total when we add & ack events in the queue.
I'm not sure oldestEntryID is very helpful, the ID isn't a time, so if I told you the oldest ID was 13075 what would you do with that information? I think I'd rather see a guage that shows me the rate things are being added to the queue and being vacated from the queue as well as total number in the queue.
For OccupiedRead
, is this number of events that we have sent but haven't received an ACK for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure oldestEntryID is very helpful, the ID isn't a time, so if I told you the oldest ID was 13075 what would you do with that information? I think I'd rather see a guage that shows me the rate things are being added to the queue and being vacated from the queue as well as total number in the queue.
I was thinking the same. The only value of oldestEntryID is that we can look to see if it is changing to know if the queue is draining. We will probably get very similar information from observing the current queue size, but the oldestEntryID would let us catch the edge case where the queue is almost always full but data is still moving through it.
I am not opposed to removing this metric, but I think it has some small value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could add number of items in the queue. On startup the segments are read, and they contain a count field. We could update that total when we add & ack events in the queue
Having EventCount
for the disk queue would be interesting, and would mean we can always rely on the EventCount
metric being populated between both queue types which I think makes the metrics easier to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm reluctant to use oldestEntryID as an indicator of a draining queue, if for some reason an event can't be delivered but we have retries, than oldestEntryID might be static but the queue could be draining.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If oldestEntryID is only ever useful during edge cases, let's just remove it. It will be better to add some more obvious metrics for those, like watching an increasing retry counter in the output metrics.
Alright @cmacknz I had a brief zoom chat with @leehinman , we've cleared up a few things about how this should work:
|
* add basic metrics to disk queue * linter * debugging testt CI issues * tinkering with tests still * remove unneeded metrics, clean up
What does this PR do?
This is part of fix elastic/elastic-agent-shipper#11
This adds basic health and status metrics to the disk queue: a byte limit, a current byte count, an occupied_read metric, and a reporting of the oldest ID in the queue.
I've never looked at the disk queue before this, and I still only kind of understand it, so there's probably some issues lurking around in here. In particular, there's no mutexes or anything in the segments code, and the comments do mention restricting how things like
sizeOnDisk()
are called, so I'm a tad concerned about how we're making multiple passes through the variousqueueSegment
types.This also adds a new field to the
queue.Metrics
type, as the unacked read field was previously events-only.Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.