Skip to content

Commit

Permalink
Registry file fsync improvements (elastic#6988) (elastic#7353)
Browse files Browse the repository at this point in the history
* Registry file fsync improvements

- Return error if Sync fails
- Execute fsync on new parent directory
- improve metrics:
  - registrar.writes.total: total number of registry write attempts
  - registrar.writes.fail: total number of failed write attempts
  - registrar.writes.success: total number of successfull write attempts

(cherry picked from commit f5179f9)
  • Loading branch information
Steffen Siering authored and ph committed Jun 18, 2018
1 parent 95ed596 commit c0b3780
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits]
- Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202]
- Keep different registry entry per container stream to avoid wrong offsets. {issue}7281[7281]
- Fix offset field pointing at end of a line. {issue}6514[6514]
- Commit registry writes to stable storage to avoid corrupt registry files. {issue}6792[6792]
*Heartbeat*
Expand Down
87 changes: 54 additions & 33 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
)

type Registrar struct {
Channel chan []file.State
out successLogger
done chan struct{}
registryFile string // Path to the Registry File
registryFilePermissions os.FileMode // Permissions to apply on the Registry File
wg sync.WaitGroup
Channel chan []file.State
out successLogger
done chan struct{}
registryFile string // Path to the Registry File
fileMode os.FileMode // Permissions to apply on the Registry File
wg sync.WaitGroup

states *file.States // Map with all file paths inside and the corresponding state
gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write
Expand All @@ -35,16 +35,20 @@ type successLogger interface {
}

var (
statesUpdate = monitoring.NewInt(nil, "registrar.states.update")
statesCleanup = monitoring.NewInt(nil, "registrar.states.cleanup")
statesCurrent = monitoring.NewInt(nil, "registrar.states.current")
registryWrites = monitoring.NewInt(nil, "registrar.writes")
statesUpdate = monitoring.NewInt(nil, "registrar.states.update")
statesCleanup = monitoring.NewInt(nil, "registrar.states.cleanup")
statesCurrent = monitoring.NewInt(nil, "registrar.states.current")
registryWrites = monitoring.NewInt(nil, "registrar.writes.total")
registryFails = monitoring.NewInt(nil, "registrar.writes.fail")
registrySuccess = monitoring.NewInt(nil, "registrar.writes.success")
)

func New(registryFile string, registryFilePermissions os.FileMode, flushTimeout time.Duration, out successLogger) (*Registrar, error) {
// New creates a new Registrar instance, updating the registry file on
// `file.State` updates. New fails if the file can not be opened or created.
func New(registryFile string, fileMode os.FileMode, flushTimeout time.Duration, out successLogger) (*Registrar, error) {
r := &Registrar{
registryFile: registryFile,
registryFilePermissions: registryFilePermissions,
registryFile: registryFile,
fileMode: fileMode,
done: make(chan struct{}),
states: file.NewStates(),
Channel: make(chan []file.State, 1),
Expand Down Expand Up @@ -258,38 +262,55 @@ func (r *Registrar) flushRegistry() {

// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
// First clean up states
r.gcStates()
states := r.states.GetStates()
statesCurrent.Set(int64(len(states)))

logp.Debug("registrar", "Write registry file: %s", r.registryFile)
registryWrites.Inc()

tempfile := r.registryFile + ".new"
f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, r.registryFilePermissions)
tempfile, err := writeTmpFile(r.registryFile, r.fileMode, states)
if err != nil {
logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, err)
registryFails.Inc()
return err
}

// First clean up states
states := r.states.GetStates()

encoder := json.NewEncoder(f)
err = encoder.Encode(states)
err = helper.SafeFileRotate(r.registryFile, tempfile)
if err != nil {
f.Close()
logp.Err("Error when encoding the states: %s", err)
registryFails.Inc()
return err
}

// Commit the changes to storage to avoid corrupt registry files
f.Sync()
// Directly close file because of windows
f.Close()
logp.Debug("registrar", "Registry file updated. %d states written.", len(states))
registrySuccess.Inc()

err = helper.SafeFileRotate(r.registryFile, tempfile)
return nil
}

logp.Debug("registrar", "Registry file updated. %d states written.", len(states))
registryWrites.Add(1)
statesCurrent.Set(int64(len(states)))
func writeTmpFile(baseName string, perm os.FileMode, states []file.State) (string, error) {
logp.Debug("registrar", "Write registry file: %s", baseName)

tempfile := baseName + ".new"
f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, perm)
if err != nil {
logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, err)
return "", err
}

defer f.Close()

encoder := json.NewEncoder(f)

if err := encoder.Encode(states); err != nil {
logp.Err("Error when encoding the states: %s", err)
return "", err
}

// Commit the changes to storage to avoid corrupt registry files
if err = f.Sync(); err != nil {
logp.Err("Error when syncing new registry file contents: %s", err)
return "", err
}

return err
return tempfile, nil
}
14 changes: 14 additions & 0 deletions libbeat/common/file/helper_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,26 @@ package file

import (
"os"
"path/filepath"
)

// SafeFileRotate safely rotates an existing file under path and replaces it with the tempfile
func SafeFileRotate(path, tempfile string) error {
parent := filepath.Dir(path)

if e := os.Rename(tempfile, path); e != nil {
return e
}

// best-effort fsync on parent directory. The fsync is required by some
// filesystems, so to update the parents directory metadata to actually
// contain the new file being rotated in.
f, err := os.Open(parent)
if err != nil {
return nil // ignore error
}
defer f.Close()
f.Sync()

return nil
}

0 comments on commit c0b3780

Please sign in to comment.