Skip to content

Commit

Permalink
Fix filebeat registry meta being nil vs empty (#7632) (#7644)
Browse files Browse the repository at this point in the history
Filebeat introduces a meta field to registry entries in 6.3.1. The meta field is used to distuingish different log streams in docker files. For other input types the meta field must be null. Unfortunately the input loader did initialize the meta field with an empty dictionary. This leads to failing matches of old and new registry entries. Due to the match failing, old entries will not be removed, and filebeat will handle all files as new files on startup (old logs are send again).

Users will observe duplicate entries in the reigstry file. One entry with "meta": null and one entry with "meta": {}. The entry with "meta": {} will be used by filebeat. The null-entry will not be used by filebeat, but is kept in the registry file, cause it has now active owner (yet).

Improvements provided by this PR:

* when matching states consider an empty map and a null-map to be equivalent
* update input loader to create a null map for old state -> registry entries will be compatible on upgrade
* Add checks in critical places replacing an empty map with a null-map
* Add support to fix registry entries on load. states from corrupted 6.3.1 files will be merged into one single state on load
* introduce unit tests for loading different registry formats
* introduce system tests validating output and registry when upgrading filebeat from an older version

Closes: #7634

(cherry picked from commit c558984)
  • Loading branch information
tsg authored and kvch committed Jul 19, 2018
1 parent 075f1f5 commit 45a9a9e
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits]
- Commit registry writes to stable storage to avoid corrupt registry files. {pull}6877[6877]
- Fix a parsing issue in the syslog input for RFC3339 timestamp and time with nanoseconds. {pull}7046[7046]
- Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202]
- Fix registry duplicates and log resending on upgrade. {issue}7634[7634]
*Heartbeat*
Expand Down
3 changes: 3 additions & 0 deletions filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func NewInput(

// Add stream to meta to ensure different state per stream
if config.Containers.Stream != "all" {
if context.Meta == nil {
context.Meta = map[string]string{}
}
context.Meta["stream"] = config.Containers.Stream
}

Expand Down
7 changes: 5 additions & 2 deletions filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type State struct {

// NewState creates a new file state
func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string) State {
if len(meta) == 0 {
meta = nil
}
return State{
Fileinfo: fileInfo,
Source: path,
Expand All @@ -42,7 +45,7 @@ func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]strin
func (s *State) ID() string {
// Generate id on first request. This is needed as id is not set when converting back from json
if s.Id == "" {
if s.Meta == nil {
if len(s.Meta) == 0 {
s.Id = s.FileStateOS.String()
} else {
hashValue, _ := hashstructure.Hash(s.Meta, nil)
Expand All @@ -66,6 +69,6 @@ func (s *State) IsEqual(c *State) bool {
func (s *State) IsEmpty() bool {
return s.FileStateOS == file.StateOS{} &&
s.Source == "" &&
s.Meta == nil &&
len(s.Meta) == 0 &&
s.Timestamp.IsZero()
}
2 changes: 1 addition & 1 deletion filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func New(
Done: input.done,
BeatDone: input.beatDone,
DynamicFields: dynFields,
Meta: map[string]string{},
Meta: nil,
}
var ipt Input
ipt, err = f(conf, outlet, context)
Expand Down
11 changes: 10 additions & 1 deletion filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func NewInput(
// can be forwarded correctly to the registrar.
stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone)

meta := context.Meta
if len(meta) == 0 {
meta = nil
}

p := &Input{
config: defaultConfig,
cfg: cfg,
Expand All @@ -81,7 +86,7 @@ func NewInput(
stateOutlet: stateOut,
states: file.NewStates(),
done: context.Done,
meta: context.Meta,
meta: meta,
}

if err := cfg.Unpack(&p.config); err != nil {
Expand Down Expand Up @@ -663,6 +668,10 @@ func (p *Input) updateState(state file.State) error {
state.TTL = p.config.CleanInactive
}

if len(state.Meta) == 0 {
state.Meta = nil
}

// Update first internal state
p.states.Update(state)

Expand Down
104 changes: 98 additions & 6 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package registrar
import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -115,20 +116,111 @@ func (r *Registrar) loadStates() error {

logp.Info("Loading registrar data from %s", r.registryFile)

decoder := json.NewDecoder(f)
states := []file.State{}
err = decoder.Decode(&states)
states, err := readStatesFrom(f)
if err != nil {
return fmt.Errorf("Error decoding states: %s", err)
return err
}

states = resetStates(states)
r.states.SetStates(states)
logp.Info("States Loaded from registrar: %+v", len(states))

return nil
}

func readStatesFrom(in io.Reader) ([]file.State, error) {
states := []file.State{}
decoder := json.NewDecoder(in)
if err := decoder.Decode(&states); err != nil {
return nil, fmt.Errorf("Error decoding states: %s", err)
}

states = fixStates(states)
states = resetStates(states)
return states, nil
}

// fixStates cleans up the regsitry states when updating from an older version
// of filebeat potentially writing invalid entries.
func fixStates(states []file.State) []file.State {
if len(states) == 0 {
return states
}

// we use a map of states here, so to identify and merge duplicate entries.
idx := map[string]*file.State{}
for i := range states {
state := &states[i]
fixState(state)

id := state.ID()
old, exists := idx[id]
if !exists {
idx[id] = state
} else {
mergeStates(old, state) // overwrite the entry in 'old'
}
}

if len(idx) == len(states) {
return states
}

i := 0
newStates := make([]file.State, len(idx))
for _, state := range idx {
newStates[i] = *state
i++
}
return newStates
}

// fixState updates a read state to fullfil required invariantes:
// - "Meta" must be nil if len(Meta) == 0
func fixState(st *file.State) {
if len(st.Meta) == 0 {
st.Meta = nil
}
}

// mergeStates merges 2 states by trying to determine the 'newer' state.
// The st state is overwritten with the updated fields.
func mergeStates(st, other *file.State) {
st.Finished = st.Finished || other.Finished
if st.Offset < other.Offset { // always select the higher offset
st.Offset = other.Offset
}

// update file meta-data. As these are updated concurrently by the
// prospectors, select the newer state based on the update timestamp.
var meta, metaOld, metaNew map[string]string
if st.Timestamp.Before(other.Timestamp) {
st.Source = other.Source
st.Timestamp = other.Timestamp
st.TTL = other.TTL
st.FileStateOS = other.FileStateOS

metaOld, metaNew = st.Meta, other.Meta
} else {
metaOld, metaNew = other.Meta, st.Meta
}

if len(metaOld) == 0 || len(metaNew) == 0 {
meta = metaNew
} else {
meta = map[string]string{}
for k, v := range metaOld {
meta[k] = v
}
for k, v := range metaNew {
meta[k] = v
}
}

if len(meta) == 0 {
meta = nil
}
st.Meta = meta
}

// resetStates sets all states to finished and disable TTL on restart
// For all states covered by an input, TTL will be overwritten with the input value
func resetStates(states []file.State) []file.State {
Expand Down
Loading

0 comments on commit 45a9a9e

Please sign in to comment.