Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Add more disk queue unit tests and fix a size-check bug #22107

Merged
merged 13 commits into from
Oct 22, 2020
Merged
24 changes: 17 additions & 7 deletions libbeat/publisher/queue/diskqueue/core_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) {
// than an entire segment all by itself (as long as it isn't, it is
// guaranteed to eventually enter the queue assuming no disk errors).
frameSize := request.frame.sizeOnDisk()
if dq.settings.MaxSegmentSize < frameSize {
if dq.settings.maxSegmentOffset() < segmentOffset(frameSize) {
dq.logger.Warnf(
"Rejecting event with size %v because the maximum segment size is %v",
frameSize, dq.settings.MaxSegmentSize)
"Rejecting event with size %v because the segment buffer limit is %v",
frameSize, dq.settings.maxSegmentOffset())
request.responseChan <- false
return
}
Expand Down Expand Up @@ -326,13 +326,19 @@ func (dq *diskQueue) maybeWritePending() {
// Nothing to do right now
return
}

// Remove everything from pendingFrames and forward it to the writer loop.
frames := dq.pendingFrames
dq.pendingFrames = nil
dq.writerLoop.requestChan <- writerLoopRequest{frames: frames}

dq.writerLoop.requestChan <- writerLoopRequest{
frames: frames,
// Compute the size of the request so we know how full the queue is going
// to be.
totalSize := uint64(0)
for _, sf := range frames {
totalSize += sf.frame.sizeOnDisk()
}
dq.writeRequestSize = totalSize
dq.writing = true
}

Expand Down Expand Up @@ -471,8 +477,12 @@ func (dq *diskQueue) canAcceptFrameOfSize(frameSize uint64) bool {
// left in the queue after accounting for the existing segments and the
// pending writes that were already accepted.
pendingBytes := uint64(0)
for _, request := range dq.pendingFrames {
pendingBytes += request.frame.sizeOnDisk()
for _, sf := range dq.pendingFrames {
pendingBytes += sf.frame.sizeOnDisk()
}
// If a writing request is outstanding, include it in the size total.
if dq.writing {
pendingBytes += dq.writeRequestSize
}
currentSize := pendingBytes + dq.segments.sizeOnDisk()

Expand Down
Loading