diff --git a/libbeat/statestore/backend/memlog/diskstore.go b/libbeat/statestore/backend/memlog/diskstore.go new file mode 100644 index 00000000000..5c2179e7bac --- /dev/null +++ b/libbeat/statestore/backend/memlog/diskstore.go @@ -0,0 +1,169 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "bufio" + "os" + "path/filepath" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +// diskstore manages the on-disk state of the memlog store. +type diskstore struct { + log *logp.Logger + + // store configuration + checkpointPred CheckpointPredicate + fileMode os.FileMode + bufferSize int + + // on disk file tracking information + home string // home path of the store + logFilePath string // current log file + dataFiles []dataFileInfo // set of data files found + + // txid is the sequential counter that tracks + // all updates to the store. The txid is added to operation being logged + // used as name for the data files. + txid uint64 + + // log file access. The log file is updated using an in memory write buffer. + logFile *os.File + logBuf *bufio.Writer + + // internal state and metrics + logFileSize uint64 + logEntries uint + logInvalid bool + logNeedsTruncate bool +} + +// dataFileInfo is used to track and sort on disk data files. +// We should have only one data file on disk, but in case delete operations +// have failed or not finished dataFileInfo is used to detect the ordering. +type dataFileInfo struct { + path string + txid uint64 +} + +// storeEntry is used to write entries to the checkpoint file only. +type storeEntry struct { + Key string `struct:"_key"` + Fields common.MapStr `struct:",inline"` +} + +// storeMeta is read from the meta file. +type storeMeta struct { + Version string `struct:"version"` +} + +// logAction is prepended to each operation logged to the update file. +// It contains the update ID, a sequential counter to track correctness, +// and the action name. +type logAction struct { + Op string `json:"op"` + ID uint64 `json:"id"` +} + +const ( + logFileName = "log.json" + metaFileName = "meta.json" + + storeVersion = "1" + + keyField = "_key" +) + +// newDiskStore initializes the disk store stucture only. The store must have +// been opened already. It tries to open the update log file for append +// operations. If opening the update log file fails, it is marked as +// 'corrupted', triggering a checkpoint operation on the first update to the store. +func newDiskStore( + log *logp.Logger, + home string, + dataFiles []dataFileInfo, + txid uint64, + mode os.FileMode, + entries uint, + logInvalid bool, + bufferSize uint, + checkpointPred CheckpointPredicate, +) *diskstore { + s := &diskstore{ + log: log.With("path", home), + home: home, + logFilePath: filepath.Join(home, logFileName), + dataFiles: dataFiles, + txid: txid, + fileMode: mode, + bufferSize: int(bufferSize), + logFile: nil, + logBuf: nil, + logEntries: entries, + logInvalid: logInvalid, + logNeedsTruncate: false, // only truncate on next checkpoint + checkpointPred: checkpointPred, + } + + _ = s.tryOpenLog() + return s +} + +// tryOpenLog access the update log. The log file is truncated if a checkpoint operation has been +// executed last. +// The log file is marked as invalid if opening it failed. This will trigger a checkpoint operation +// and another call to tryOpenLog in the future. +func (s *diskstore) tryOpenLog() error { + panic("TODO: implement me") +} + +// mustCheckpoint returns true if the store is required to execute a checkpoint +// operation, either by predicate or by some internal state detecting a problem +// with the log file. +func (s *diskstore) mustCheckpoint() bool { + return s.logInvalid || s.checkpointPred(s.logFileSize) +} + +func (s *diskstore) Close() error { + panic("TODO: implement me") +} + +// log operation adds another entry to the update log file. +// The log file is marked as invalid if the write fails. This will trigger a +// checkpoint operation in the future. +func (s *diskstore) LogOperation(op op) error { + panic("TODO: implement me") +} + +// WriteCheckpoint serializes all state into a json file. The file contains an +// array with all states known to the memory storage. +// WriteCheckpoint first serializes all state to a temporary file, and finally +// moves the temporary data file into the correct location. No files +// are overwritten or replaced. Instead the change sequence number is used for +// the filename, and older data files will be deleted after success. +// +// The active marker file is overwritten after all updates did succeed. The +// marker file contains the filename of the current valid data-file. +// NOTE: due to limitation on some Operating system or file systems, the active +// marker is not a symlink, but an actual file. +func (s *diskstore) WriteCheckpoint(state map[string]entry) error { + panic("TODO: implement me") +} diff --git a/libbeat/statestore/backend/memlog/doc.go b/libbeat/statestore/backend/memlog/doc.go new file mode 100644 index 00000000000..276daf9fd79 --- /dev/null +++ b/libbeat/statestore/backend/memlog/doc.go @@ -0,0 +1,97 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package memlog implements the memlog statestore backend. +// The store provided by memlog is a in-memory key-value store +// that logs all operations to an append only log file. +// Once the log file is considered full the store executes a checkpoint +// operation. The checkpoint operation serializes all state to a data file. +// +// The memory store in memlog holds all key-value pairs in a hashtable, with +// value represented by map[string]interface{}. As the store must be 'valid' +// based on the state of the last update operations (Set, Remove), it +// guarantees that no references into data structures passed via Set are held. +// Instead structured data is serialized/deserialized into a +// map[string]interface{}. The serialized states contain only primitive types +// like intX, uintX, float, bool, string, slices, or map[string]interface{} +// itself. As a side effect this also guarantees that the internal can always +// be serialized to disk after updating the in memory representation. +// +// On disk we have a meta file, an update log file, data files, and an active +// marker file in the store directory. +// +// The meta file only contains the store version number. +// +// Normally all operations that update the store in memory state are appended +// to the update log file. +// The file stores all entries in JSON format. Each entry starts with an action +// entry, followed by an data entry. +// The action entry has the schema: `{"op": "", id: }`. Supporter +// operations are 'set' or 'remove'. The `id` contains a sequential counter +// that must always be increased by 1. +// The data entry for the 'set' operation has the format: `{"K": "", "V": { ... }}`. +// The data entry for the 'remove' operation has the format: `{"K": ""}`. +// Updates to the log file are not synced to disk. Having all updates available +// between restarts/crashes also depends on the capabilities of the operation +// system and file system. When opening the store we read up until it is +// possible, reconstructing a last known valid state the beat can continue +// from. This can lead to duplicates if the machine/filesystem has had an +// outage with state not yet fully synchronised to disk. Ordinary restarts +// should not lead to any problems. +// If any error is encountered when reading the log file, the next updates to the store +// will trigger a checkpoint operation and reset the log file. +// +// The store might contain multiple data files, but only the last data file is +// supposed to be valid. Older data files will continiously tried to be cleaned up +// on checkpoint operations. +// The data files filenames do include the change sequence number. Which allows +// us to sort them by name. The checkpoint operation of memlog, writes the full +// state into a new data file, that consists of an JSON array with all known +// key-value pairs. Each JSON object in the array consists of the value +// object, with memlog private fields added. Private fields start with `_`. At +// the moment the only private field is `_key`, which is used to identify the +// key-value pair. +// NOTE: Creating a new file guarantees that Beats can progress when creating a +// new checkpoint file. Some filesystems tend to block the +// delete/replace operation when the file is accessed by another process +// (e.g. common problem with AV Scanners on Windows). By creating a new +// file we circumvent this problem. Failures in deleting old files is +// ok, and we will try to delete old data files again in the future. +// +// The active marker file is not really used by the store. It is written for +// debugging purposes and contains the filepath of the last written data file +// that is supposed to be valid. +// +// When opening the store we first validate the meta file and read the "last" +// data file into the in-memory hashtable. Older data files are ignored. The +// filename with the update sequence number is used to sort data files. +// NOTE: the active marker file is not used, as the checkpoint operation is +// supposed to be an atomic operation that is finalized once the data +// file is moved to its correct location. +// +// After loading the data file we loop over all operations in the log file. +// Operations with a smaller sequence number are ignored when iterating the log +// file. If any subsequent entries in the log file have a sequence number difference != +// 1, we assume the log file to be corrupted and stop the loop. All processing +// continues from the last known accumulated state. +// +// When closing the store we make a last attempt at fsyncing the log file (just +// in case), close the log file and clear all in memory state. +// +// The store provided by memlog is threadsafe and uses a RWMutex. We allow only +// one active writer, but multiple concurrent readers. +package memlog diff --git a/libbeat/statestore/backend/memlog/error.go b/libbeat/statestore/backend/memlog/error.go new file mode 100644 index 00000000000..29f665bcacf --- /dev/null +++ b/libbeat/statestore/backend/memlog/error.go @@ -0,0 +1,27 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import "errors" + +var ( + errRegClosed = errors.New("registry has been closed") + errLogInvalid = errors.New("can not add operation to log file, a checkpoint is required") + errTxIDInvalid = errors.New("invalid update sequence number") + errKeyUnknown = errors.New("key unknown") +) diff --git a/libbeat/statestore/backend/memlog/memlog.go b/libbeat/statestore/backend/memlog/memlog.go new file mode 100644 index 00000000000..265b53a3df1 --- /dev/null +++ b/libbeat/statestore/backend/memlog/memlog.go @@ -0,0 +1,131 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "os" + "path/filepath" + "sync" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore/backend" +) + +// Registry configures access to memlog based stores. +type Registry struct { + log *logp.Logger + + mu sync.Mutex + active bool + + settings Settings + + wg sync.WaitGroup +} + +// Settings configures a new Registry. +type Settings struct { + // Registry root directory. Stores will be single sub-directories. + Root string + + // FileMode is used to configure the file mode for new files generated by the + // regisry. File mode 0600 will be used if this field is not set. + FileMode os.FileMode + + // BufferSize configures the IO buffer size when accessing the underlying + // storage files. Defaults to 4096 if not set. + BufferSize uint + + // Checkpoint predicate that can trigger a registry file rotation. If not + // configured, memlog will automatically trigger a checkpoint every 10MB. + Checkpoint CheckpointPredicate +} + +// CheckpointPredicate is the type for configurable checkpoint checks. +// The store executes a checkpoint operation when the predicate returns true. +type CheckpointPredicate func(fileSize uint64) bool + +const defaultFileMode os.FileMode = 0600 + +const defaultBufferSize = 4096 + +func defaultCheckpoint(filesize uint64) bool { + const limit = 10 * 1 << 20 // set rotation limit to 10MB by default + return filesize >= limit +} + +// New configures a memlog Registry that can be used to open stores. +func New(log *logp.Logger, settings Settings) (*Registry, error) { + if settings.FileMode == 0 { + settings.FileMode = defaultFileMode + } + if settings.Checkpoint == nil { + settings.Checkpoint = defaultCheckpoint + } + if settings.BufferSize == 0 { + settings.BufferSize = defaultBufferSize + } + + root, err := filepath.Abs(settings.Root) + if err != nil { + return nil, err + } + + settings.Root = root + return &Registry{ + log: log, + active: true, + settings: settings, + }, nil +} + +// Access creates or opens a new store. A new sub-directory for the store if +// created, if the store does not exist. +// Returns an error is any file access fails. +func (r *Registry) Access(name string) (backend.Store, error) { + r.mu.Lock() + defer r.mu.Unlock() + + if !r.active { + return nil, errRegClosed + } + + logger := r.log.With("store", name) + + home := filepath.Join(r.settings.Root, name) + fileMode := r.settings.FileMode + bufSz := r.settings.BufferSize + store, err := openStore(logger, home, fileMode, bufSz, r.settings.Checkpoint) + if err != nil { + return nil, err + } + + return store, nil +} + +// Close closes the registry. No new store can be accessed during close. +// Close blocks until all stores have been closed. +func (r *Registry) Close() error { + r.mu.Lock() + r.active = false + r.mu.Unlock() + + // block until all stores have been closed + r.wg.Wait() + return nil +} diff --git a/libbeat/statestore/backend/memlog/op.go b/libbeat/statestore/backend/memlog/op.go new file mode 100644 index 00000000000..d30ce975750 --- /dev/null +++ b/libbeat/statestore/backend/memlog/op.go @@ -0,0 +1,46 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import "github.com/elastic/beats/v7/libbeat/common" + +type ( + op interface { + name() string + } + + // opSet encodes the 'Set' operations in the update log. + opSet struct { + K string + V common.MapStr + } + + // opRemove encodes the 'Remove' operation in the update log. + opRemove struct { + K string + } +) + +// operation type names +const ( + opValSet = "set" + opValRemove = "remove" +) + +func (*opSet) name() string { return opValSet } +func (*opRemove) name() string { return opValRemove } diff --git a/libbeat/statestore/backend/memlog/store.go b/libbeat/statestore/backend/memlog/store.go new file mode 100644 index 00000000000..9fcef7032ec --- /dev/null +++ b/libbeat/statestore/backend/memlog/store.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "os" + "sync" + + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/statestore/backend" +) + +// store implements an actual memlog based store. +// It holds all key value pairs in memory in a memstore struct. +// All changes to the memstore are logged to the diskstore. +// The store execute a checkpoint operation if the checkpoint predicate +// triggers the operation, or if some error in the update log file has been +// detected by the diskstore. +// +// The store allows only one writer, but multiple concurrent readers. +type store struct { + lock sync.RWMutex + disk *diskstore + mem memstore +} + +// memstore is the in memory key value store +type memstore struct { + table map[string]entry +} + +type entry struct { + value map[string]interface{} +} + +// openStore opens a store from the home path. +// The directory and intermediate directories will be created if it does not exist. +// The open routine loads the full key-value store into memory by first reading the data file and finally applying all outstanding updates +// from the update log file. +// If an error in in the log file is detected, the store opening routine continues from the last known valid state and will trigger a checkpoint +// operation on subsequent writes, also truncating the log file. +// Old data files are scheduled for deletion later. +func openStore(log *logp.Logger, home string, mode os.FileMode, bufSz uint, checkpoint CheckpointPredicate) (*store, error) { + panic("TODO: implement me") +} + +// Close closes access to the update log file and clears the in memory key +// value store. Access to the store after close can lead to a panic. +func (s *store) Close() error { + panic("TODO: implement me") +} + +// Has checks if the key is known. The in memory store does not report any +// errors. +func (s *store) Has(key string) (bool, error) { + panic("TODO: implement me") +} + +// Get retrieves and decodes the key-value pair into to. +func (s *store) Get(key string, to interface{}) error { + panic("TODO: implement me") +} + +// Set inserts or overwrites a key-value pair. +// If encoding was successful the in-memory state will be updated and a +// set-operation is logged to the diskstore. +func (s *store) Set(key string, value interface{}) error { + panic("TODO: implement me") +} + +// Remove removes a key from the in memory store and logs a remove operation to +// the diskstore. The operation does not check if the key exists. +func (s *store) Remove(key string) error { + panic("TODO: implement me") +} + +// Each iterates over all key-value pairs in the store. +func (s *store) Each(fn func(string, backend.ValueDecoder) (bool, error)) error { + panic("TODO: implement me") +} + +func (e entry) Decode(to interface{}) error { + return typeconv.Convert(to, e.value) +} diff --git a/libbeat/statestore/backend/memlog/util.go b/libbeat/statestore/backend/memlog/util.go new file mode 100644 index 00000000000..af2036b1b5e --- /dev/null +++ b/libbeat/statestore/backend/memlog/util.go @@ -0,0 +1,48 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "os" + "syscall" +) + +// isTxIDLessEqual compares two IDs by checking that their distance is < 2^63. +// It always returns true if +// - a == b +// - a < b (mod 2^63) +// - b > a after an integer rollover that is still within the distance of <2^63-1 +func isTxIDLessEqual(a, b uint64) bool { + return int64(a-b) <= 0 +} + +func isRetryErr(err error) bool { + return err == syscall.EINTR || err == syscall.EAGAIN +} + +// trySyncPath provides a best-effort fsync on path (directory). The fsync is required by some +// filesystems, so to update the parents directory metadata to actually +// contain the new file being rotated in. +func trySyncPath(path string) { + f, err := os.Open(path) + if err != nil { + return // ignore error, sync on dir must not be necessarily supported by the FS + } + defer f.Close() + syncFile(f) +} diff --git a/libbeat/statestore/backend/memlog/util_darwin.go b/libbeat/statestore/backend/memlog/util_darwin.go new file mode 100644 index 00000000000..1aaf64ab42a --- /dev/null +++ b/libbeat/statestore/backend/memlog/util_darwin.go @@ -0,0 +1,46 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import ( + "os" + + "golang.org/x/sys/unix" +) + +// syncFile implements the fsync operation for darwin. On darwin fsync is not +// reliable, instead the fcntl syscall with F_FULLFSYNC must be used. +func syncFile(f *os.File) error { + for { + _, err := unix.FcntlInt(f.Fd(), unix.F_FULLFSYNC, 0) + err = normalizeIOError(err) + if err == nil || isIOError(err) { + return err + } + + if isRetryErr(err) { + continue + } + + err = f.Sync() + if isRetryErr(err) { + continue + } + return err + } +} diff --git a/libbeat/statestore/backend/memlog/util_other.go b/libbeat/statestore/backend/memlog/util_other.go new file mode 100644 index 00000000000..f975924853a --- /dev/null +++ b/libbeat/statestore/backend/memlog/util_other.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build linux dragonfly freebsd netbsd openbsd solaris + +package memlog + +import ( + "os" +) + +// syncFile implements the fsync operation for most *nix systems. +// The call is retried if EINTR or EAGAIN is returned. +func syncFile(f *os.File) error { + // best effort. + for { + err := f.Sync() + if err == nil || !isRetryErr(err) { + return err + } + } +} diff --git a/libbeat/statestore/backend/memlog/util_windows.go b/libbeat/statestore/backend/memlog/util_windows.go new file mode 100644 index 00000000000..ce0d1080b72 --- /dev/null +++ b/libbeat/statestore/backend/memlog/util_windows.go @@ -0,0 +1,26 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package memlog + +import "os" + +// syncFile implements the fsync operation for Windows. Internally +// FlushFileBuffers will be used. +func syncFile(f *os.File) error { + return f.Sync() // stdlib already uses FlushFileBuffers, yay +}