Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat] Disk queue implementation #21176

Merged
merged 97 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from 94 commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
e76a41b
Initial disk queue skeleton
faec Apr 8, 2020
1d8bf65
Merge branch 'feature-disk-queue' into disk-queue
faec Apr 9, 2020
e8c8128
Sketching out some top-level disk queue data structures
faec Apr 10, 2020
97a7ed5
add queue type registration
faec Apr 16, 2020
f5ad9a2
Merge branch 'feature-disk-queue' into disk-queue
faec Apr 16, 2020
a78b85c
use new registry helper
faec Apr 16, 2020
b12020c
connect external user config to the queue's Settings struct
faec Apr 16, 2020
67540d6
Fill out more default settings
faec Apr 16, 2020
94f125c
review comments
faec Apr 23, 2020
f29a96f
review comments (add panic to unimplemented functions)
faec Apr 23, 2020
7ce01f9
Merge branch 'feature-disk-queue' into disk-queue
faec Apr 24, 2020
a04980e
some state file stuff
faec May 27, 2020
26b4248
revising code to match new design
faec Jun 1, 2020
f30f30b
more state file handling
faec Jun 1, 2020
c312d69
reading data frames from segments
faec Jun 2, 2020
1a40b06
fleshing out segment logic
faec Jun 3, 2020
ce65718
lots of partial work on reader and writer
faec Jun 9, 2020
22ae148
reworking segments
faec Jun 16, 2020
3f5f8fe
reworking reader code
faec Jun 18, 2020
61fa5d7
working on writer loop
faec Jul 17, 2020
0191dc6
Merge branch 'master' into disk-queue-0
faec Jul 17, 2020
3bf35ff
fix most build errors
faec Jul 17, 2020
50bd450
checksumType -> ChecksumType
faec Jul 21, 2020
04c9b60
working on read / write loops
faec Jul 30, 2020
7a2e09a
replace filebeat with a queue wrapper for testing
faec Jul 30, 2020
132ba8e
adapting encoder stuff from the disk spool
faec Aug 12, 2020
e73f55f
add most of the api logic for the reader / writer loops
faec Aug 12, 2020
6d2ca31
filling in segment-deletion api
faec Aug 13, 2020
988cef6
connect consumer ack endpoints
faec Aug 13, 2020
7774dc4
organize, delete dead code
faec Aug 13, 2020
7146525
comment / delete more old code
faec Aug 13, 2020
1001565
cleanup, plug in consumer acks
faec Aug 13, 2020
bb56b8f
finish reader loop api / logic
faec Aug 21, 2020
a99e869
make things build
faec Sep 3, 2020
89da2b2
clean up naming, plug in queue initialization, start fixing writer loop
faec Sep 3, 2020
6fa9d33
finish writer loop rework
faec Sep 4, 2020
e77dfa6
plug in writing to actual files
faec Sep 4, 2020
d2a65dc
make diskQueue.logger a top-level field
faec Sep 4, 2020
b93fe83
move segment allocation to the producer handler, clean up naming
faec Sep 4, 2020
3a3bfcd
properly handle writer loop responses
faec Sep 4, 2020
cb1107d
implement readSegmentHeader, more cleanup / naming changes
faec Sep 4, 2020
3020962
move queueSettings out of queueSegment, fix bugs
faec Sep 8, 2020
df31c0e
delete old code, reorganize some structures
faec Sep 8, 2020
4a22ccd
move frame-wrapping responsibility to the writer loop
faec Sep 8, 2020
d4e1dcd
add event deserialization
faec Sep 8, 2020
0e12495
properly assign frame ids after reading
faec Sep 8, 2020
56fe5ba
remove obsolete code
faec Sep 9, 2020
eaf6e2b
compute / verify real checksums
faec Sep 9, 2020
5f0376d
don't read everything twice
faec Sep 9, 2020
d63cbf0
Fix ACKing / deletion
faec Sep 9, 2020
98e61f0
revert testing wrapper around filebeat
faec Sep 9, 2020
e48c815
filebeat ingestion is working!
faec Sep 10, 2020
1575640
clean up capacity checking / add maybeUnblockProducers
faec Sep 10, 2020
d495aa8
move all dq.maybe... calls out of the handle... helpers
faec Sep 10, 2020
dd08e2c
simplify / clarify the writer loop response logic
faec Sep 10, 2020
8db0acf
add retrying to file creation / writing
faec Sep 10, 2020
26d226d
correct / clarify comments
faec Sep 10, 2020
442a513
move segment read / write positions to the segments struct
faec Sep 10, 2020
1a1742e
remove the separate 'core loop' struct and coalesce its helpers in di…
faec Sep 10, 2020
5ca039d
set output channel buffer to a more reasonable size
faec Sep 14, 2020
fdd4be6
Expose some user configuration settings
faec Sep 14, 2020
f7a446d
send producer acks
faec Sep 16, 2020
79a81de
remove checksum type as a configurable field
faec Sep 16, 2020
f88481c
Batch producer ACKs / fsync after writes
faec Sep 17, 2020
8b61f5b
remove queueSegment.header which is no longer needed
faec Sep 17, 2020
8c32477
reorganizing files a bit, starting the final pass of consumer acks
faec Sep 17, 2020
0a6f9f4
initialize the read position properly, continue consumer ack revision
faec Sep 17, 2020
91bc3b2
still rewriting consumer acks
faec Sep 18, 2020
aea4cad
plug consumer acks back in
faec Sep 18, 2020
a8ca56a
...increment the loop variable -.-
faec Sep 18, 2020
6d91ab5
cleanups
faec Sep 18, 2020
767aabb
remove unused variables
faec Sep 21, 2020
0bf668e
shutdown cleanly
faec Sep 21, 2020
857077c
delete the final segment on shutdown if everything is ACKed
faec Sep 21, 2020
d4cd0bb
turn on basic producer canceling
faec Sep 21, 2020
91ce56f
propagate event flags thru the queue
faec Sep 22, 2020
f9dbfb1
clean up state_file / remove unused code
faec Sep 22, 2020
93d3fff
save queue position as events are ACKed
faec Sep 22, 2020
0ac493c
remove producer cancel boilerplate (not needed in v1)
faec Sep 22, 2020
3af1421
various review comments
faec Sep 22, 2020
333cb76
...save everything before commit
faec Sep 22, 2020
0dfffe1
clarify the errors that "can't" fail
faec Sep 22, 2020
43c4aaa
be more consistent about unsignedness
faec Sep 22, 2020
0b33c4b
change the format of the deleter loop response
faec Sep 22, 2020
7b2f4ba
make helper functions to initialize the helper loop structures
faec Sep 22, 2020
a626cec
clarify comments
faec Sep 22, 2020
53da09a
minor tweaks
faec Sep 22, 2020
565a99f
remove unused event field in writeFrame
faec Sep 22, 2020
008fbe7
...save all files before commit...
faec Sep 22, 2020
b5ec926
don't initialize waitgroup to a nil pointer
faec Sep 22, 2020
11e55a8
error message cleanups
faec Sep 23, 2020
ff04bb5
simplify position write-to-disk
faec Sep 23, 2020
cc49879
Merge branch 'master' into disk-queue-0
faec Sep 24, 2020
2baf1d4
add some unit tests
faec Sep 24, 2020
30cf56c
more verbose config errors
faec Sep 24, 2020
c619d6a
Merge branch 'master' into disk-queue-0
faec Sep 24, 2020
5b41351
change queue acks back to a pointer since it can't be used w/o initia…
faec Sep 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libbeat/publisher/includes/includes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
_ "github.com/elastic/beats/v7/libbeat/outputs/kafka"
_ "github.com/elastic/beats/v7/libbeat/outputs/logstash"
_ "github.com/elastic/beats/v7/libbeat/outputs/redis"
_ "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue"
_ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
_ "github.com/elastic/beats/v7/libbeat/publisher/queue/spool"
)
146 changes: 146 additions & 0 deletions libbeat/publisher/queue/diskqueue/acks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package diskqueue

import (
"os"
"sync"

"github.com/elastic/beats/v7/libbeat/logp"
)

// queuePosition represents a logical position within the queue buffer.
type queuePosition struct {
segmentID segmentID
offset segmentOffset
}

type diskQueueACKs struct {
logger *logp.Logger

// This lock must be held to access diskQueueACKs fields (except for
// diskQueueACKs.done, which is always safe).
lock sync.Mutex

// The id and position of the first unacknowledged frame.
nextFrameID frameID
nextPosition queuePosition

// If a frame has been ACKed, then frameSize[frameID] contains its size on
// disk. The size is used to track the queuePosition of the oldest
// remaining frame, which is written to disk as ACKs are received. (We do
// this to avoid duplicating events if the beat terminates without a clean
// shutdown.)
frameSize map[frameID]uint64

// segmentBoundaries maps the first frameID of each segment to its
// corresponding segment ID.
segmentBoundaries map[frameID]segmentID

// When a segment has been completely acknowledged by a consumer, it sends
// the segment ID to this channel, where it is read by the core loop and
// scheduled for deletion.
segmentACKChan chan segmentID

// An open writable file handle to the file that stores the queue position.
// This position is advanced as we receive ACKs, confirming it is safe
// to move forward, so the acking code is responsible for updating this
// file.
positionFile *os.File

// When the queue is closed, diskQueueACKs.done is closed to signal that
// the core loop will not accept any more acked segments and any future
// ACKs should be ignored.
done chan struct{}
}

func newDiskQueueACKs(
logger *logp.Logger, position queuePosition, positionFile *os.File,
) diskQueueACKs {
return diskQueueACKs{
logger: logger,
nextFrameID: 0,
nextPosition: position,
frameSize: make(map[frameID]uint64),
segmentBoundaries: make(map[frameID]segmentID),
segmentACKChan: make(chan segmentID),
positionFile: positionFile,
done: make(chan struct{}),
}
}

func (dqa *diskQueueACKs) addFrames(frames []*readFrame) {
dqa.lock.Lock()
defer dqa.lock.Unlock()
select {
case <-dqa.done:
// We are already done and should ignore any leftover ACKs we receive.
return
default:
}
for _, frame := range frames {
segment := frame.segment
if frame.id != 0 && frame.id == segment.firstFrameID {
// This is the first frame in its segment, mark it so we know when
// we're starting a new segment.
//
// Subtlety: we don't count the very first frame as a "boundary" even
// though it is the first frame we read from its segment. This prevents
// us from resetting our segment offset to zero, in case the initial
// offset was restored from a previous session instead of starting at
// the beginning of the first file.
dqa.segmentBoundaries[frame.id] = segment.id
}
dqa.frameSize[frame.id] = frame.bytesOnDisk
}
oldSegmentID := dqa.nextPosition.segmentID
if dqa.frameSize[dqa.nextFrameID] != 0 {
for ; dqa.frameSize[dqa.nextFrameID] != 0; dqa.nextFrameID++ {
newSegment, ok := dqa.segmentBoundaries[dqa.nextFrameID]
if ok {
// This is the start of a new segment. Remove this frame from the
// segment boundary list and set the position to the start of the
// new segment.
delete(dqa.segmentBoundaries, dqa.nextFrameID)
dqa.nextPosition = queuePosition{
segmentID: newSegment,
offset: 0,
}
}
dqa.nextPosition.offset += segmentOffset(dqa.frameSize[dqa.nextFrameID])
delete(dqa.frameSize, dqa.nextFrameID)
}
// We advanced the ACK position at least somewhat, so write its
// new value.
err := writeQueuePositionToHandle(dqa.positionFile, dqa.nextPosition)
if err != nil {
// TODO: Don't spam this warning on every ACK if it's a permanent error.
dqa.logger.Warnf("Couldn't save queue position: %v", err)
}
}
if oldSegmentID != dqa.nextPosition.segmentID {
// We crossed at least one segment boundary, inform the listener that
// everything before the current segment has been acknowledged (but bail
// out if our done channel has been closed, since that means there is no
// listener on the other end.)
select {
case dqa.segmentACKChan <- dqa.nextPosition.segmentID - 1:
case <-dqa.done:
}
}
}
33 changes: 33 additions & 0 deletions libbeat/publisher/queue/diskqueue/checksum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package diskqueue

import (
"encoding/binary"
"hash/crc32"
)

// Computes the checksum that should be written / read in a frame footer
// based on the raw content of that frame (excluding header / footer).
func computeChecksum(data []byte) uint32 {
hash := crc32.NewIEEE()
frameLength := uint32(len(data) + frameMetadataSize)
binary.Write(hash, binary.LittleEndian, &frameLength)
hash.Write(data)
return hash.Sum32()
}
160 changes: 160 additions & 0 deletions libbeat/publisher/queue/diskqueue/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package diskqueue

import (
"errors"
"fmt"
"path/filepath"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgtype"
"github.com/elastic/beats/v7/libbeat/paths"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

// Settings contains the configuration fields to create a new disk queue
// or open an existing one.
type Settings struct {
// The path on disk of the queue's containing directory, which will be
// created if it doesn't exist. Within the directory, the queue's state
// is stored in state.dat and each segment's data is stored in
// {segmentIndex}.seg
// If blank, the default directory is "diskqueue" within the beat's data
// directory.
Path string

// MaxBufferSize is the maximum number of bytes that the queue should
// ever occupy on disk. A value of 0 means the queue can grow until the
// disk is full (this is not recommended on a primary system disk).
MaxBufferSize uint64

// MaxSegmentSize is the maximum number of bytes that should be written
// to a single segment file before creating a new one.
MaxSegmentSize uint64

// How many events will be read from disk while waiting for a consumer
// request.
ReadAheadLimit int

// How many events will be queued in memory waiting to be written to disk.
// This setting should rarely matter in practice, but if data is coming
// in faster than it can be written to disk for an extended period,
// this limit can keep it from overflowing memory.
WriteAheadLimit int

// A listener that should be sent ACKs when an event is successfully
// written to disk.
WriteToDiskListener queue.ACKListener
}

// userConfig holds the parameters for a disk queue that are configurable
// by the end user in the beats yml file.
type userConfig struct {
Path string `config:"path"`
MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"`
SegmentSize *cfgtype.ByteSize `config:"segment_size"`
ReadAheadLimit *int `config:"read_ahead"`
WriteAheadLimit *int `config:"write_ahead"`
}

func (c *userConfig) Validate() error {
// If the segment size is explicitly specified, the total queue size must
// be at least twice as large.
if c.SegmentSize != nil && c.MaxSize != 0 && c.MaxSize < *c.SegmentSize*2 {
return errors.New(
"Disk queue max_size must be at least twice as big as segment_size")
}

// We require a total queue size of at least 10MB, and a segment size of
// at least 1MB. The queue can support lower thresholds, but it will perform
// terribly, so we give an explicit error in that case.
// These bounds are still extremely low for Beats ingestion, but if all you
// need is for a low-volume stream on a tiny device to persist between
// restarts, it will work fine.
if c.MaxSize != 0 && c.MaxSize < 10*1000*1000 {
return errors.New(
faec marked this conversation as resolved.
Show resolved Hide resolved
"Disk queue max_size cannot be less than 10MB")
}
if c.SegmentSize != nil && *c.SegmentSize < 1000*1000 {
return errors.New(
"Disk queue segment_size cannot be less than 1MB")
}

return nil
}

// DefaultSettings returns a Settings object with reasonable default values
// for all important fields.
func DefaultSettings() Settings {
return Settings{
MaxSegmentSize: 100 * (1 << 20), // 100MiB
MaxBufferSize: (1 << 30), // 1GiB

ReadAheadLimit: 256,
WriteAheadLimit: 1024,
}
}

// SettingsForUserConfig returns a Settings struct initialized with the
// end-user-configurable settings in the given config tree.
func SettingsForUserConfig(config *common.Config) (Settings, error) {
userConfig := userConfig{}
if err := config.Unpack(&userConfig); err != nil {
return Settings{}, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of inconsistent error return styles here. Sometimes we wrap an error, sometimes we don't. Unless something else is requesting a specific error type, we might want to just use errors.wrap or fmt.Errorf for everything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote for fmt.Errorf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heuristic I aim for is to ask whether the error being returned is understandable in the context of the caller. For example, errors in the deleter loop aren't wrapped, because they are reported (and grouped / wrapped) when they get back to the core loop, and it looks silly to have a message that looks like "couldn't delete segment file: [couldn't delete segment file: [actual error]]". So for low-level helpers I often leave it unwrapped, knowing that the caller is responsible for reporting it comprehensibly.

That said, config.go doesn't seem to follow that convention, and could use a little more verbosity in the messages, so I fixed the calls here :-)

}
settings := DefaultSettings()
settings.Path = userConfig.Path

settings.MaxBufferSize = uint64(userConfig.MaxSize)
if userConfig.SegmentSize != nil {
settings.MaxSegmentSize = uint64(*userConfig.SegmentSize)
} else {
// If no value is specified, default segment size is total queue size
// divided by 10.
settings.MaxSegmentSize = uint64(userConfig.MaxSize) / 10
}

return settings, nil
}

//
// bookkeeping helpers
//

func (settings Settings) directoryPath() string {
if settings.Path == "" {
return paths.Resolve(paths.Data, "diskqueue")
}

return settings.Path
}

func (settings Settings) stateFilePath() string {
return filepath.Join(settings.directoryPath(), "state.dat")
}

func (settings Settings) segmentPath(segmentID segmentID) string {
return filepath.Join(
settings.directoryPath(),
fmt.Sprintf("%v.seg", segmentID))
}

func (settings Settings) maxSegmentOffset() segmentOffset {
return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize)
}
Loading