Skip to content

Commit

Permalink
Introduce skeleton of memlog store (#19261)
Browse files Browse the repository at this point in the history
Introduce skeleton of memlog store

This changes introduces the skeleton and documentation of the memlog
store.

The addition of the statestore package is split up into multiple
changeset to ease review. The final version of the package can be found
[here](https://github.com/urso/beats/tree/fb-input-v2-combined/libbeat/statestore).

Once finalized, the libbeat/statestore package contains:
- The statestore frontend and interface for use within Beats
- Interfaces for the store backend
- A common set of tests store backends need to support
- a storetest package for testing new features that require a store. The
  testing helpers use map[string]interface{} that can be initialized or
  queried after the test run for validation purposes.
- The default memlog backend + tests

This change introduces the skeleton and internal documentation of the
memlog store for review and discussion, but not the full implementation
yet to ease review. The final implementation and unit tests will be
added later.

The file doc.go documents how the final implementation works.
  • Loading branch information
Steffen Siering committed Jun 25, 2020
1 parent e5b04e9 commit c0f1b75
Show file tree
Hide file tree
Showing 10 changed files with 726 additions and 0 deletions.
169 changes: 169 additions & 0 deletions libbeat/statestore/backend/memlog/diskstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package memlog

import (
"bufio"
"os"
"path/filepath"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

// diskstore manages the on-disk state of the memlog store.
type diskstore struct {
log *logp.Logger

// store configuration
checkpointPred CheckpointPredicate
fileMode os.FileMode
bufferSize int

// on disk file tracking information
home string // home path of the store
logFilePath string // current log file
dataFiles []dataFileInfo // set of data files found

// txid is the sequential counter that tracks
// all updates to the store. The txid is added to operation being logged
// used as name for the data files.
txid uint64

// log file access. The log file is updated using an in memory write buffer.
logFile *os.File
logBuf *bufio.Writer

// internal state and metrics
logFileSize uint64
logEntries uint
logInvalid bool
logNeedsTruncate bool
}

// dataFileInfo is used to track and sort on disk data files.
// We should have only one data file on disk, but in case delete operations
// have failed or not finished dataFileInfo is used to detect the ordering.
type dataFileInfo struct {
path string
txid uint64
}

// storeEntry is used to write entries to the checkpoint file only.
type storeEntry struct {
Key string `struct:"_key"`
Fields common.MapStr `struct:",inline"`
}

// storeMeta is read from the meta file.
type storeMeta struct {
Version string `struct:"version"`
}

// logAction is prepended to each operation logged to the update file.
// It contains the update ID, a sequential counter to track correctness,
// and the action name.
type logAction struct {
Op string `json:"op"`
ID uint64 `json:"id"`
}

const (
logFileName = "log.json"
metaFileName = "meta.json"

storeVersion = "1"

keyField = "_key"
)

// newDiskStore initializes the disk store stucture only. The store must have
// been opened already. It tries to open the update log file for append
// operations. If opening the update log file fails, it is marked as
// 'corrupted', triggering a checkpoint operation on the first update to the store.
func newDiskStore(
log *logp.Logger,
home string,
dataFiles []dataFileInfo,
txid uint64,
mode os.FileMode,
entries uint,
logInvalid bool,
bufferSize uint,
checkpointPred CheckpointPredicate,
) *diskstore {
s := &diskstore{
log: log.With("path", home),
home: home,
logFilePath: filepath.Join(home, logFileName),
dataFiles: dataFiles,
txid: txid,
fileMode: mode,
bufferSize: int(bufferSize),
logFile: nil,
logBuf: nil,
logEntries: entries,
logInvalid: logInvalid,
logNeedsTruncate: false, // only truncate on next checkpoint
checkpointPred: checkpointPred,
}

_ = s.tryOpenLog()
return s
}

// tryOpenLog access the update log. The log file is truncated if a checkpoint operation has been
// executed last.
// The log file is marked as invalid if opening it failed. This will trigger a checkpoint operation
// and another call to tryOpenLog in the future.
func (s *diskstore) tryOpenLog() error {
panic("TODO: implement me")
}

// mustCheckpoint returns true if the store is required to execute a checkpoint
// operation, either by predicate or by some internal state detecting a problem
// with the log file.
func (s *diskstore) mustCheckpoint() bool {
return s.logInvalid || s.checkpointPred(s.logFileSize)
}

func (s *diskstore) Close() error {
panic("TODO: implement me")
}

// log operation adds another entry to the update log file.
// The log file is marked as invalid if the write fails. This will trigger a
// checkpoint operation in the future.
func (s *diskstore) LogOperation(op op) error {
panic("TODO: implement me")
}

// WriteCheckpoint serializes all state into a json file. The file contains an
// array with all states known to the memory storage.
// WriteCheckpoint first serializes all state to a temporary file, and finally
// moves the temporary data file into the correct location. No files
// are overwritten or replaced. Instead the change sequence number is used for
// the filename, and older data files will be deleted after success.
//
// The active marker file is overwritten after all updates did succeed. The
// marker file contains the filename of the current valid data-file.
// NOTE: due to limitation on some Operating system or file systems, the active
// marker is not a symlink, but an actual file.
func (s *diskstore) WriteCheckpoint(state map[string]entry) error {
panic("TODO: implement me")
}
97 changes: 97 additions & 0 deletions libbeat/statestore/backend/memlog/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Package memlog implements the memlog statestore backend.
// The store provided by memlog is a in-memory key-value store
// that logs all operations to an append only log file.
// Once the log file is considered full the store executes a checkpoint
// operation. The checkpoint operation serializes all state to a data file.
//
// The memory store in memlog holds all key-value pairs in a hashtable, with
// value represented by map[string]interface{}. As the store must be 'valid'
// based on the state of the last update operations (Set, Remove), it
// guarantees that no references into data structures passed via Set are held.
// Instead structured data is serialized/deserialized into a
// map[string]interface{}. The serialized states contain only primitive types
// like intX, uintX, float, bool, string, slices, or map[string]interface{}
// itself. As a side effect this also guarantees that the internal can always
// be serialized to disk after updating the in memory representation.
//
// On disk we have a meta file, an update log file, data files, and an active
// marker file in the store directory.
//
// The meta file only contains the store version number.
//
// Normally all operations that update the store in memory state are appended
// to the update log file.
// The file stores all entries in JSON format. Each entry starts with an action
// entry, followed by an data entry.
// The action entry has the schema: `{"op": "<name>", id: <number>}`. Supporter
// operations are 'set' or 'remove'. The `id` contains a sequential counter
// that must always be increased by 1.
// The data entry for the 'set' operation has the format: `{"K": "<key>", "V": { ... }}`.
// The data entry for the 'remove' operation has the format: `{"K": "<key>"}`.
// Updates to the log file are not synced to disk. Having all updates available
// between restarts/crashes also depends on the capabilities of the operation
// system and file system. When opening the store we read up until it is
// possible, reconstructing a last known valid state the beat can continue
// from. This can lead to duplicates if the machine/filesystem has had an
// outage with state not yet fully synchronised to disk. Ordinary restarts
// should not lead to any problems.
// If any error is encountered when reading the log file, the next updates to the store
// will trigger a checkpoint operation and reset the log file.
//
// The store might contain multiple data files, but only the last data file is
// supposed to be valid. Older data files will continiously tried to be cleaned up
// on checkpoint operations.
// The data files filenames do include the change sequence number. Which allows
// us to sort them by name. The checkpoint operation of memlog, writes the full
// state into a new data file, that consists of an JSON array with all known
// key-value pairs. Each JSON object in the array consists of the value
// object, with memlog private fields added. Private fields start with `_`. At
// the moment the only private field is `_key`, which is used to identify the
// key-value pair.
// NOTE: Creating a new file guarantees that Beats can progress when creating a
// new checkpoint file. Some filesystems tend to block the
// delete/replace operation when the file is accessed by another process
// (e.g. common problem with AV Scanners on Windows). By creating a new
// file we circumvent this problem. Failures in deleting old files is
// ok, and we will try to delete old data files again in the future.
//
// The active marker file is not really used by the store. It is written for
// debugging purposes and contains the filepath of the last written data file
// that is supposed to be valid.
//
// When opening the store we first validate the meta file and read the "last"
// data file into the in-memory hashtable. Older data files are ignored. The
// filename with the update sequence number is used to sort data files.
// NOTE: the active marker file is not used, as the checkpoint operation is
// supposed to be an atomic operation that is finalized once the data
// file is moved to its correct location.
//
// After loading the data file we loop over all operations in the log file.
// Operations with a smaller sequence number are ignored when iterating the log
// file. If any subsequent entries in the log file have a sequence number difference !=
// 1, we assume the log file to be corrupted and stop the loop. All processing
// continues from the last known accumulated state.
//
// When closing the store we make a last attempt at fsyncing the log file (just
// in case), close the log file and clear all in memory state.
//
// The store provided by memlog is threadsafe and uses a RWMutex. We allow only
// one active writer, but multiple concurrent readers.
package memlog
27 changes: 27 additions & 0 deletions libbeat/statestore/backend/memlog/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package memlog

import "errors"

var (
errRegClosed = errors.New("registry has been closed")
errLogInvalid = errors.New("can not add operation to log file, a checkpoint is required")
errTxIDInvalid = errors.New("invalid update sequence number")
errKeyUnknown = errors.New("key unknown")
)
Loading

0 comments on commit c0f1b75

Please sign in to comment.