Skip to content

Commit

Permalink
[libbeat] Disk queue implementation (elastic#21176)
Browse files Browse the repository at this point in the history
Initial implementation of the new libbeat disk queue
  • Loading branch information
faec authored Sep 28, 2020
1 parent a75582f commit 2b8fd7c
Show file tree
Hide file tree
Showing 17 changed files with 2,600 additions and 0 deletions.
1 change: 1 addition & 0 deletions libbeat/publisher/includes/includes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
_ "github.com/elastic/beats/v7/libbeat/outputs/kafka"
_ "github.com/elastic/beats/v7/libbeat/outputs/logstash"
_ "github.com/elastic/beats/v7/libbeat/outputs/redis"
_ "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue"
_ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
_ "github.com/elastic/beats/v7/libbeat/publisher/queue/spool"
)
146 changes: 146 additions & 0 deletions libbeat/publisher/queue/diskqueue/acks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package diskqueue

import (
"os"
"sync"

"github.com/elastic/beats/v7/libbeat/logp"
)

// queuePosition represents a logical position within the queue buffer.
type queuePosition struct {
segmentID segmentID
offset segmentOffset
}

type diskQueueACKs struct {
logger *logp.Logger

// This lock must be held to access diskQueueACKs fields (except for
// diskQueueACKs.done, which is always safe).
lock sync.Mutex

// The id and position of the first unacknowledged frame.
nextFrameID frameID
nextPosition queuePosition

// If a frame has been ACKed, then frameSize[frameID] contains its size on
// disk. The size is used to track the queuePosition of the oldest
// remaining frame, which is written to disk as ACKs are received. (We do
// this to avoid duplicating events if the beat terminates without a clean
// shutdown.)
frameSize map[frameID]uint64

// segmentBoundaries maps the first frameID of each segment to its
// corresponding segment ID.
segmentBoundaries map[frameID]segmentID

// When a segment has been completely acknowledged by a consumer, it sends
// the segment ID to this channel, where it is read by the core loop and
// scheduled for deletion.
segmentACKChan chan segmentID

// An open writable file handle to the file that stores the queue position.
// This position is advanced as we receive ACKs, confirming it is safe
// to move forward, so the acking code is responsible for updating this
// file.
positionFile *os.File

// When the queue is closed, diskQueueACKs.done is closed to signal that
// the core loop will not accept any more acked segments and any future
// ACKs should be ignored.
done chan struct{}
}

func newDiskQueueACKs(
logger *logp.Logger, position queuePosition, positionFile *os.File,
) *diskQueueACKs {
return &diskQueueACKs{
logger: logger,
nextFrameID: 0,
nextPosition: position,
frameSize: make(map[frameID]uint64),
segmentBoundaries: make(map[frameID]segmentID),
segmentACKChan: make(chan segmentID),
positionFile: positionFile,
done: make(chan struct{}),
}
}

func (dqa *diskQueueACKs) addFrames(frames []*readFrame) {
dqa.lock.Lock()
defer dqa.lock.Unlock()
select {
case <-dqa.done:
// We are already done and should ignore any leftover ACKs we receive.
return
default:
}
for _, frame := range frames {
segment := frame.segment
if frame.id != 0 && frame.id == segment.firstFrameID {
// This is the first frame in its segment, mark it so we know when
// we're starting a new segment.
//
// Subtlety: we don't count the very first frame as a "boundary" even
// though it is the first frame we read from its segment. This prevents
// us from resetting our segment offset to zero, in case the initial
// offset was restored from a previous session instead of starting at
// the beginning of the first file.
dqa.segmentBoundaries[frame.id] = segment.id
}
dqa.frameSize[frame.id] = frame.bytesOnDisk
}
oldSegmentID := dqa.nextPosition.segmentID
if dqa.frameSize[dqa.nextFrameID] != 0 {
for ; dqa.frameSize[dqa.nextFrameID] != 0; dqa.nextFrameID++ {
newSegment, ok := dqa.segmentBoundaries[dqa.nextFrameID]
if ok {
// This is the start of a new segment. Remove this frame from the
// segment boundary list and set the position to the start of the
// new segment.
delete(dqa.segmentBoundaries, dqa.nextFrameID)
dqa.nextPosition = queuePosition{
segmentID: newSegment,
offset: 0,
}
}
dqa.nextPosition.offset += segmentOffset(dqa.frameSize[dqa.nextFrameID])
delete(dqa.frameSize, dqa.nextFrameID)
}
// We advanced the ACK position at least somewhat, so write its
// new value.
err := writeQueuePositionToHandle(dqa.positionFile, dqa.nextPosition)
if err != nil {
// TODO: Don't spam this warning on every ACK if it's a permanent error.
dqa.logger.Warnf("Couldn't save queue position: %v", err)
}
}
if oldSegmentID != dqa.nextPosition.segmentID {
// We crossed at least one segment boundary, inform the listener that
// everything before the current segment has been acknowledged (but bail
// out if our done channel has been closed, since that means there is no
// listener on the other end.)
select {
case dqa.segmentACKChan <- dqa.nextPosition.segmentID - 1:
case <-dqa.done:
}
}
}
33 changes: 33 additions & 0 deletions libbeat/publisher/queue/diskqueue/checksum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package diskqueue

import (
"encoding/binary"
"hash/crc32"
)

// Computes the checksum that should be written / read in a frame footer
// based on the raw content of that frame (excluding header / footer).
func computeChecksum(data []byte) uint32 {
hash := crc32.NewIEEE()
frameLength := uint32(len(data) + frameMetadataSize)
binary.Write(hash, binary.LittleEndian, &frameLength)
hash.Write(data)
return hash.Sum32()
}
158 changes: 158 additions & 0 deletions libbeat/publisher/queue/diskqueue/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package diskqueue

import (
"errors"
"fmt"
"path/filepath"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgtype"
"github.com/elastic/beats/v7/libbeat/paths"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

// Settings contains the configuration fields to create a new disk queue
// or open an existing one.
type Settings struct {
// The path on disk of the queue's containing directory, which will be
// created if it doesn't exist. Within the directory, the queue's state
// is stored in state.dat and each segment's data is stored in
// {segmentIndex}.seg
// If blank, the default directory is "diskqueue" within the beat's data
// directory.
Path string

// MaxBufferSize is the maximum number of bytes that the queue should
// ever occupy on disk. A value of 0 means the queue can grow until the
// disk is full (this is not recommended on a primary system disk).
MaxBufferSize uint64

// MaxSegmentSize is the maximum number of bytes that should be written
// to a single segment file before creating a new one.
MaxSegmentSize uint64

// How many events will be read from disk while waiting for a consumer
// request.
ReadAheadLimit int

// How many events will be queued in memory waiting to be written to disk.
// This setting should rarely matter in practice, but if data is coming
// in faster than it can be written to disk for an extended period,
// this limit can keep it from overflowing memory.
WriteAheadLimit int

// A listener that should be sent ACKs when an event is successfully
// written to disk.
WriteToDiskListener queue.ACKListener
}

// userConfig holds the parameters for a disk queue that are configurable
// by the end user in the beats yml file.
type userConfig struct {
Path string `config:"path"`
MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"`
SegmentSize *cfgtype.ByteSize `config:"segment_size"`
ReadAheadLimit *int `config:"read_ahead"`
WriteAheadLimit *int `config:"write_ahead"`
}

func (c *userConfig) Validate() error {
// If the segment size is explicitly specified, the total queue size must
// be at least twice as large.
if c.SegmentSize != nil && c.MaxSize != 0 && c.MaxSize < *c.SegmentSize*2 {
return errors.New(
"Disk queue max_size must be at least twice as big as segment_size")
}

// We require a total queue size of at least 10MB, and a segment size of
// at least 1MB. The queue can support lower thresholds, but it will perform
// terribly, so we give an explicit error in that case.
// These bounds are still extremely low for Beats ingestion, but if all you
// need is for a low-volume stream on a tiny device to persist between
// restarts, it will work fine.
if c.MaxSize != 0 && c.MaxSize < 10*1000*1000 {
return fmt.Errorf(
"Disk queue max_size (%d) cannot be less than 10MB", c.MaxSize)
}
if c.SegmentSize != nil && *c.SegmentSize < 1000*1000 {
return fmt.Errorf(
"Disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize)
}

return nil
}

// DefaultSettings returns a Settings object with reasonable default values
// for all important fields.
func DefaultSettings() Settings {
return Settings{
MaxSegmentSize: 100 * (1 << 20), // 100MiB
MaxBufferSize: (1 << 30), // 1GiB

ReadAheadLimit: 256,
WriteAheadLimit: 1024,
}
}

// SettingsForUserConfig returns a Settings struct initialized with the
// end-user-configurable settings in the given config tree.
func SettingsForUserConfig(config *common.Config) (Settings, error) {
userConfig := userConfig{}
if err := config.Unpack(&userConfig); err != nil {
return Settings{}, fmt.Errorf("parsing user config: %w", err)
}
settings := DefaultSettings()
settings.Path = userConfig.Path

settings.MaxBufferSize = uint64(userConfig.MaxSize)
if userConfig.SegmentSize != nil {
settings.MaxSegmentSize = uint64(*userConfig.SegmentSize)
} else {
// If no value is specified, default segment size is total queue size
// divided by 10.
settings.MaxSegmentSize = uint64(userConfig.MaxSize) / 10
}
return settings, nil
}

//
// bookkeeping helpers
//

func (settings Settings) directoryPath() string {
if settings.Path == "" {
return paths.Resolve(paths.Data, "diskqueue")
}
return settings.Path
}

func (settings Settings) stateFilePath() string {
return filepath.Join(settings.directoryPath(), "state.dat")
}

func (settings Settings) segmentPath(segmentID segmentID) string {
return filepath.Join(
settings.directoryPath(),
fmt.Sprintf("%v.seg", segmentID))
}

func (settings Settings) maxSegmentOffset() segmentOffset {
return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize)
}
Loading

0 comments on commit 2b8fd7c

Please sign in to comment.