Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce skeleton of memlog store #19261

Merged
merged 3 commits into from
Jun 25, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions libbeat/statestore/backend/memlog/diskstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package memlog

import (
"bufio"
"os"
"path/filepath"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

// diskstore manages the on-disk state of the memlog store.
type diskstore struct {
log *logp.Logger

// store configuration
checkpointPred CheckpointPredicate
fileMode os.FileMode
bufferSize int

// on disk file tracking information
home string // home path of the store
logFileName string // current log file
urso marked this conversation as resolved.
Show resolved Hide resolved
dataFiles []dataFileInfo // set of data files found

// txid is the sequential counter that tracks
// all updates to the store. The txid is added to operation being logged
// used as name for the data files.
txid uint64

// log file access. The log file is updated using an in memory write buffer.
logFile *os.File
logBuf *bufio.Writer

// internal state and metrics
logFileSize uint64
logEntries uint
logInvalid bool
logNeedsTruncate bool
}

// dataFileInfo is used to track and sort on disk data files.
// We should have only one data file on disk, but in case delete operations
// have failed or not finished dataFileInfo is used to detect the ordering.
type dataFileInfo struct {
path string
txid uint64
}

// storeEntry is used to write entries to the checkpoint file only.
type storeEntry struct {
Key string `struct:"_key"`
Fields common.MapStr `struct:",inline"`
}

// storeMeta is read from the meta file.
type storeMeta struct {
Version string `struct:"version"`
}

// logAction is prepended to each operation logged to the update file.
// It contains the update ID, a sequential counter to track correctness,
// and the action name.
type logAction struct {
Op string `json:"op"`
ID uint64 `json:"id"`
}

const (
logFileName = "log.json"
metaFileName = "meta.json"

storeVersion = "1"

keyField = "_key"
)

// newDiskStorek initializes the disk store stucture only. The store must have
urso marked this conversation as resolved.
Show resolved Hide resolved
// been opened already. It tries to open the update log file for append
// operations. If opening the update log file fails, it is marked as
// 'corrupted', triggering a checkpoint operation on the first update to the store.
func newDiskStore(
log *logp.Logger,
home string,
dataFiles []dataFileInfo,
txid uint64,
mode os.FileMode,
entries uint,
logInvalid bool,
bufferSize uint,
checkpointPred CheckpointPredicate,
) *diskstore {
s := &diskstore{
log: log.With("path", home),
home: home,
logFileName: filepath.Join(home, logFileName),
dataFiles: dataFiles,
txid: txid,
fileMode: mode,
bufferSize: int(bufferSize),
logFile: nil,
logBuf: nil,
logEntries: entries,
logInvalid: logInvalid,
logNeedsTruncate: false, // only truncate on next checkpoint
checkpointPred: checkpointPred,
}

_ = s.tryOpenLog()
return s
}

// tryOpenLog access the update log. The log file is truncated if a checkpoint operation has been
// executed last.
// The log file is marked as invalid if opening it failed. This will trigger a checkpoint operation
// and another call to tryOpenLog in the future.
func (s *diskstore) tryOpenLog() error {
panic("TODO: implement me")
}

// mustCheckpoint returns true if the store is required to execute a checkpoint
// operation, either by predicate or by some internal state detecting a problem
// with the log file.
func (s *diskstore) mustCheckpoint() bool {
return s.logInvalid || s.checkpointPred(s.logFileSize)
}

func (s *diskstore) Close() error {
panic("TODO: implement me")
}

// log operation adds another entry to the update log file.
// The log file is marked as invalid if the write fails. This will trigger a
// checkpoint operation in the future.
func (s *diskstore) LogOperation(op op) error {
panic("TODO: implement me")
}

// WriteCheckpoint serializes all state into a json file. The file contains an
// array with all states known to the memory storage.
// WriteCheckpoint first serializes all state to a temporary file, and finally
// replaces move the temporary data file into the correct location. No files
urso marked this conversation as resolved.
Show resolved Hide resolved
// are overwritten or replaced. Instead the change sequence number is used for
// the filename, and older data files will be deleleted after success.
urso marked this conversation as resolved.
Show resolved Hide resolved
//
// The active marker file is overwritten after all updates did succeed. The
// marker file contains the filename of the current valid data-file.
// NOTE: due to limitation on some Operating system or file systems, the active
// marker is not a symlink, but an actual file.
func (s *diskstore) WriteCheckpoint(state map[string]entry) error {
panic("TODO: implement me")
}
95 changes: 95 additions & 0 deletions libbeat/statestore/backend/memlog/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Package memlog implements the memlog statestore backend.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to put this content in a README.md instead? That would have the additional benefit that it would be automatically rendered when browsing this package directory on github, which helps discoverability.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, interesting point. I have put it here, because I use go docs :)

// The store provided by memlog is a in-memory key-value store
// that logs all operations to an append only log file.
// Once the log file is considered full the store executes a checkpoint
// operation. The checkpoint operation serializes all state to a data file.
//
// The memory store in memlog holds all key-value pairs in a hashtable, with
// value represented by map[string]interface{}. As the store must be 'valid'
// based on the state of the last update operations (Set, Remove), it
// guarantees that no references into data structures passed via Set are held.
// Instead structured data is serialized/deserialized into a
// map[string]interface{}. The serialized states contain only primitive types
// like intX, uintX, float, bool, string, slices, or map[string]interface{}
// itself. As a side effect this also guarantees that the internal can always
// be serialized to disk after updating the in memory representation.
//
// On disk we have a meta file, an update log file, data files, and an active
// marker file in the store directory.
//
// The meta file only contains the store version number.
//
// Normally all operations that update the store in memory state are appended
// to the update log file.
// The file stores all entries in JSON format. Each entry starts with an action
// entry, followed by an data entry.
// The action entry has the schema: `{"op": "<name>", id: <number>}`. Supporter
// operations are 'set' or 'remove'. The `id` contains a sequential counter
// that must always be increased by 1.
// The data entry for the 'set' operation has the format: `{"K": "<key>", "V": { ... }}`.
// The data entry for the 'remove' operation has the format: `{"K": "<key>"}`.
// Updates to the log file are not synced to disk. Having all updates available
// between restarts/crashes also depends on the capabilities of the operation
// system and file system. When opening the store we read up until it is
// possible, reconstructing a last known valid state the beat can continue
// from. This can lead to duplicates if the machine/filesystem has had an
// outage with state not yet fully synchronised to disk. Ordinary restarts
// should not lead to any problems.
// If any error is encountered when reading the log file, the next updates to the store
// will trigger a checkpoint operation and reset the log file.
//
// The store might contain multiple data files, but only the last data file is
// supposed to be valid. Older data files will continiously tried to be cleaned up
// on checkpoint operations.
// The data files filenames do include the change sequence number. Which allows
// us to sort them by name. The checkpoint operation of memlog, writes the full state
// into a new data file, that consists of an JSON array with all known key-value pairs.
// All fields of an entry are store top-level in the object. The `_key` field
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this sentence ("All fields...") means: it sounds like it's saying the object has a flat structure, but the description above says that map[string]interface{} can be included, so I think I'm misunderstanding what "entry" is referring to here, could that be clarified?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try. It's indeed an object that supports nesting (map[string]interface{})

// is used to store the entries key.
// NOTE: Creating a new file guarantees that Beats can progress when creating a
// new checkpoint file. Some filesystems tend to block the
// delete/replace operation when the file is accessed by another process
// (e.g. common problem with AV Scanners on Windows). By creating a new
// file we circumvent this problem. Failures in deleting old files is
// ok, and we will try to delete old data files again in the future.
//
// The active marker file is not really used by the store. It is written for
// debugging purposes and contains the filepath of the last written data file
// that is supposed to be valid.
//
// When opening the store we first validate the meta file and read the "last"
// data file into the in-memory hashtable. Older data files are ignored. The
// filename with the update sequence number is used to sort data files.
// NOTE: the active marker file is not used, as the checkpoint operation is
// supposed to be an atomic operation that is finalized once the data
// file is moved to its correct location.
//
// After loading the data file we loop over all operations in the log file.
// Operations with a smaller sequence number are ignored when iterating the log
// file. If any subsequent entries in the log file have a sequence number difference !=
// 1, we assume the log file to be corrupted and stop the loop. All processing
// continues from the last known accumulated state.
//
// When closing the store we make a last attempt at fsyncing the log file (just
// in case), close the log file and clear all in memory state.
//
// The store provided by memlog is threadsafe and uses a RWMutex. We allow only
// one active writer, but multiple concurrent readers.
package memlog
27 changes: 27 additions & 0 deletions libbeat/statestore/backend/memlog/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package memlog

import "errors"

var (
errRegClosed = errors.New("registry has been closed")
errLogInvalid = errors.New("can not add operation to log file, a checkpoint is required")
errTxIDInvalid = errors.New("invalid update sequence number")
errKeyUnknown = errors.New("key unknown")
)
Loading