diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b8f198045ed..37cfb7dcd00 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -190,6 +190,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits] - Commit registry writes to stable storage to avoid corrupt registry files. {pull}6877[6877] - Fix a parsing issue in the syslog input for RFC3339 timestamp and time with nanoseconds. {pull}7046[7046] - Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202] +- Fix registry duplicates and log resending on upgrade. {issue}7634[7634] *Heartbeat* diff --git a/filebeat/input/docker/input.go b/filebeat/input/docker/input.go index e270f7475dc..835f50bb811 100644 --- a/filebeat/input/docker/input.go +++ b/filebeat/input/docker/input.go @@ -52,6 +52,9 @@ func NewInput( // Add stream to meta to ensure different state per stream if config.Containers.Stream != "all" { + if context.Meta == nil { + context.Meta = map[string]string{} + } context.Meta["stream"] = config.Containers.Stream } diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index b6e41bacc2c..1628ab9a40e 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -26,6 +26,9 @@ type State struct { // NewState creates a new file state func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string) State { + if len(meta) == 0 { + meta = nil + } return State{ Fileinfo: fileInfo, Source: path, @@ -42,7 +45,7 @@ func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]strin func (s *State) ID() string { // Generate id on first request. This is needed as id is not set when converting back from json if s.Id == "" { - if s.Meta == nil { + if len(s.Meta) == 0 { s.Id = s.FileStateOS.String() } else { hashValue, _ := hashstructure.Hash(s.Meta, nil) @@ -66,6 +69,6 @@ func (s *State) IsEqual(c *State) bool { func (s *State) IsEmpty() bool { return s.FileStateOS == file.StateOS{} && s.Source == "" && - s.Meta == nil && + len(s.Meta) == 0 && s.Timestamp.IsZero() } diff --git a/filebeat/input/input.go b/filebeat/input/input.go index ce155ecd194..1d4ec5aef1a 100644 --- a/filebeat/input/input.go +++ b/filebeat/input/input.go @@ -70,7 +70,7 @@ func New( Done: input.done, BeatDone: input.beatDone, DynamicFields: dynFields, - Meta: map[string]string{}, + Meta: nil, } var ipt Input ipt, err = f(conf, outlet, context) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 17f5aad49ca..10a118dfabf 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -73,6 +73,11 @@ func NewInput( // can be forwarded correctly to the registrar. stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone) + meta := context.Meta + if len(meta) == 0 { + meta = nil + } + p := &Input{ config: defaultConfig, cfg: cfg, @@ -81,7 +86,7 @@ func NewInput( stateOutlet: stateOut, states: file.NewStates(), done: context.Done, - meta: context.Meta, + meta: meta, } if err := cfg.Unpack(&p.config); err != nil { @@ -663,6 +668,10 @@ func (p *Input) updateState(state file.State) error { state.TTL = p.config.CleanInactive } + if len(state.Meta) == 0 { + state.Meta = nil + } + // Update first internal state p.states.Update(state) diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index c884afdfe3f..80c1829f216 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -3,6 +3,7 @@ package registrar import ( "encoding/json" "fmt" + "io" "os" "path/filepath" "sync" @@ -115,20 +116,111 @@ func (r *Registrar) loadStates() error { logp.Info("Loading registrar data from %s", r.registryFile) - decoder := json.NewDecoder(f) - states := []file.State{} - err = decoder.Decode(&states) + states, err := readStatesFrom(f) if err != nil { - return fmt.Errorf("Error decoding states: %s", err) + return err } - - states = resetStates(states) r.states.SetStates(states) logp.Info("States Loaded from registrar: %+v", len(states)) return nil } +func readStatesFrom(in io.Reader) ([]file.State, error) { + states := []file.State{} + decoder := json.NewDecoder(in) + if err := decoder.Decode(&states); err != nil { + return nil, fmt.Errorf("Error decoding states: %s", err) + } + + states = fixStates(states) + states = resetStates(states) + return states, nil +} + +// fixStates cleans up the regsitry states when updating from an older version +// of filebeat potentially writing invalid entries. +func fixStates(states []file.State) []file.State { + if len(states) == 0 { + return states + } + + // we use a map of states here, so to identify and merge duplicate entries. + idx := map[string]*file.State{} + for i := range states { + state := &states[i] + fixState(state) + + id := state.ID() + old, exists := idx[id] + if !exists { + idx[id] = state + } else { + mergeStates(old, state) // overwrite the entry in 'old' + } + } + + if len(idx) == len(states) { + return states + } + + i := 0 + newStates := make([]file.State, len(idx)) + for _, state := range idx { + newStates[i] = *state + i++ + } + return newStates +} + +// fixState updates a read state to fullfil required invariantes: +// - "Meta" must be nil if len(Meta) == 0 +func fixState(st *file.State) { + if len(st.Meta) == 0 { + st.Meta = nil + } +} + +// mergeStates merges 2 states by trying to determine the 'newer' state. +// The st state is overwritten with the updated fields. +func mergeStates(st, other *file.State) { + st.Finished = st.Finished || other.Finished + if st.Offset < other.Offset { // always select the higher offset + st.Offset = other.Offset + } + + // update file meta-data. As these are updated concurrently by the + // prospectors, select the newer state based on the update timestamp. + var meta, metaOld, metaNew map[string]string + if st.Timestamp.Before(other.Timestamp) { + st.Source = other.Source + st.Timestamp = other.Timestamp + st.TTL = other.TTL + st.FileStateOS = other.FileStateOS + + metaOld, metaNew = st.Meta, other.Meta + } else { + metaOld, metaNew = other.Meta, st.Meta + } + + if len(metaOld) == 0 || len(metaNew) == 0 { + meta = metaNew + } else { + meta = map[string]string{} + for k, v := range metaOld { + meta[k] = v + } + for k, v := range metaNew { + meta[k] = v + } + } + + if len(meta) == 0 { + meta = nil + } + st.Meta = meta +} + // resetStates sets all states to finished and disable TTL on restart // For all states covered by an input, TTL will be overwritten with the input value func resetStates(states []file.State) []file.State { diff --git a/filebeat/registrar/registrar_test.go b/filebeat/registrar/registrar_test.go new file mode 100644 index 00000000000..102b073dffa --- /dev/null +++ b/filebeat/registrar/registrar_test.go @@ -0,0 +1,189 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package registrar + +import ( + "reflect" + "sort" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/filebeat/input/file" +) + +func TestRegistrarRead(t *testing.T) { + type testCase struct { + input string + expected []file.State + } + + zone := time.FixedZone("+0000", 0) + + cases := map[string]testCase{ + "ok registry with one entry": testCase{ + input: `[ + { + "type": "log", + "source": "test.log", + "offset": 10, + "timestamp": "2018-07-16T10:45:01+00:00", + "ttl": -1, + "meta": null + } + ]`, + expected: []file.State{ + { + Type: "log", + Source: "test.log", + Timestamp: time.Date(2018, time.July, 16, 10, 45, 01, 0, zone), + Offset: 10, + TTL: -2, // loader always resets states + }, + }, + }, + + "load config without meta": testCase{ + input: `[ + { + "type": "log", + "source": "test.log", + "offset": 10, + "timestamp": "2018-07-16T10:45:01+00:00", + "ttl": -1 + } + ]`, + expected: []file.State{ + { + Type: "log", + Source: "test.log", + Timestamp: time.Date(2018, time.July, 16, 10, 45, 01, 0, zone), + Offset: 10, + TTL: -2, // loader always resets states + }, + }, + }, + + "load config with empty meta": testCase{ + input: `[ + { + "type": "log", + "source": "test.log", + "offset": 10, + "timestamp": "2018-07-16T10:45:01+00:00", + "ttl": -1, + "meta": {} + } + ]`, + expected: []file.State{ + { + Type: "log", + Source: "test.log", + Timestamp: time.Date(2018, time.July, 16, 10, 45, 01, 0, zone), + Offset: 10, + TTL: -2, // loader always resets states + }, + }, + }, + + "requires merge without meta-data": testCase{ + input: `[ + { + "type": "log", + "source": "test.log", + "offset": 100, + "timestamp": "2018-07-16T10:45:01+00:00", + "ttl": -1, + "meta": {} + }, + { + "type": "log", + "source": "test.log", + "offset": 10, + "timestamp": "2018-07-16T10:45:10+00:00", + "ttl": -1, + "meta": null + } + ]`, + expected: []file.State{ + { + Type: "log", + Source: "test.log", + Timestamp: time.Date(2018, time.July, 16, 10, 45, 10, 0, zone), + Offset: 100, + TTL: -2, // loader always resets states + Meta: nil, + }, + }, + }, + } + + matchState := func(t *testing.T, i int, expected, actual file.State) { + check := func(name string, a, b interface{}) { + if !reflect.DeepEqual(a, b) { + t.Errorf("State %v: %v mismatch (expected=%v, actual=%v)", i, name, a, b) + } + } + + check("id", expected.ID(), actual.ID()) + check("source", expected.Source, actual.Source) + check("offset", expected.Offset, actual.Offset) + check("ttl", expected.TTL, actual.TTL) + check("meta", expected.Meta, actual.Meta) + check("type", expected.Type, actual.Type) + + if t1, t2 := expected.Timestamp, actual.Timestamp; !t1.Equal(t2) { + t.Errorf("State %v: timestamp mismatch (expected=%v, actual=%v)", i, t1, t2) + } + } + + for name, test := range cases { + test := test + t.Run(name, func(t *testing.T) { + in := strings.NewReader(test.input) + + states, err := readStatesFrom(in) + if !assert.NoError(t, err) { + return + } + + actual := sortedStates(states) + expected := sortedStates(test.expected) + if len(actual) != len(expected) { + t.Errorf("expected %v state, but registrar did load %v states", + len(expected), len(actual)) + return + } + + for i := range expected { + matchState(t, i, expected[i], actual[i]) + } + }) + } +} + +func sortedStates(states []file.State) []file.State { + tmp := make([]file.State, len(states)) + copy(tmp, states) + sort.Slice(tmp, func(i, j int) bool { + return tmp[i].ID() < tmp[j].ID() + }) + return tmp +} diff --git a/filebeat/tests/files/registry/test-2lines-registry-6.3.0 b/filebeat/tests/files/registry/test-2lines-registry-6.3.0 new file mode 100644 index 00000000000..5f7414b9cf3 --- /dev/null +++ b/filebeat/tests/files/registry/test-2lines-registry-6.3.0 @@ -0,0 +1 @@ +[{"source":"test.log","offset":10,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","FileStateOS":{"inode":8604592318,"device":16777220}}] diff --git a/filebeat/tests/files/registry/test-2lines-registry-6.3.1 b/filebeat/tests/files/registry/test-2lines-registry-6.3.1 new file mode 100644 index 00000000000..a4c2ccf126c --- /dev/null +++ b/filebeat/tests/files/registry/test-2lines-registry-6.3.1 @@ -0,0 +1 @@ +[{"source":"test.log","offset":10,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","meta":{},"FileStateOS":{"inode":8604592318,"device":16777220}}] diff --git a/filebeat/tests/files/registry/test-2lines-registry-6.3.1-faulty b/filebeat/tests/files/registry/test-2lines-registry-6.3.1-faulty new file mode 100644 index 00000000000..2606e69bbbc --- /dev/null +++ b/filebeat/tests/files/registry/test-2lines-registry-6.3.1-faulty @@ -0,0 +1,4 @@ +[ + {"source":"test.log","offset":10,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","meta":{},"FileStateOS":{"inode":8604592318,"device":16777220}}, + {"source":"test.log","offset":0,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":8604592318,"device":16777220}} +] diff --git a/filebeat/tests/files/registry/test-2lines-registry-latest b/filebeat/tests/files/registry/test-2lines-registry-latest new file mode 100644 index 00000000000..110dc1613d1 --- /dev/null +++ b/filebeat/tests/files/registry/test-2lines-registry-latest @@ -0,0 +1 @@ +[{"source":"test.log","offset":10,"timestamp":"2018-07-18T21:51:43.529893808+02:00","ttl":-1,"type":"log","meta":null,"FileStateOS":{"inode":8604592318,"device":16777220}}] diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index a9f094ced9c..8c95c9395a6 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -70,6 +70,7 @@ def test_registrar_file_content(self): "offset": iterations * line_len, }, record) self.assertTrue("FileStateOS" in record) + self.assertIsNone(record["meta"]) file_state_os = record["FileStateOS"] if os.name == "nt": diff --git a/filebeat/tests/system/test_registrar_upgrade.py b/filebeat/tests/system/test_registrar_upgrade.py new file mode 100644 index 00000000000..21569e9c384 --- /dev/null +++ b/filebeat/tests/system/test_registrar_upgrade.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python +"""Test the registrar with old registry file formats""" + +import os +import json + +from nose.plugins.skip import Skip, SkipTest + +from filebeat import BaseTest + + +class Test(BaseTest): + def test_upgrade_from_6_3_0(self): + template = "test-2lines-registry-6.3.0" + self.run_with_single_registry_format(template) + + def test_upgrade_from_6_3_1(self): + template = "test-2lines-registry-6.3.1" + self.run_with_single_registry_format(template) + + def test_upgrade_from_faulty_6_3_1(self): + template = "test-2lines-registry-6.3.1-faulty" + self.run_with_single_registry_format(template) + + def test_upgrade_from_latest(self): + template = "test-2lines-registry-latest" + self.run_with_single_registry_format(template) + + def run_with_single_registry_format(self, template): + # prepare log file + testfile, file_state = self.prepare_log() + + # prepare registry file + self.apply_registry_template(template, testfile, file_state) + + self.run_and_validate() + + def apply_registry_template(self, template, testfile, file_state): + source = self.beat_path + "/tests/files/registry/" + template + with open(source) as f: + registry = json.loads(f.read()) + + for state in registry: + state["source"] = testfile + state["FileStateOS"] = file_state + with open(self.working_dir + "/registry", 'w') as f: + f.write(json.dumps(registry)) + + def prepare_log(self): + # test is current skipped on windows, due to FileStateOS must match the + # current OS format. + if os.name == "nt": + raise SkipTest + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*" + ) + + os.mkdir(self.working_dir + "/log/") + + testfile_path = self.working_dir + "/log/test.log" + with open(testfile_path, 'w') as f: + f.write("123456789\n") + f.write("abcdefghi\n") + + st = os.stat(testfile_path) + file_state = {"inode": st.st_ino, "device": st.st_dev} + return testfile_path, file_state + + def run_and_validate(self): + filebeat = self.start_beat() + self.wait_until( + lambda: self.output_has(lines=1), + max_timeout=15) + + # stop filebeat and enforce one last registry update + filebeat.check_kill_and_wait() + + data = self.get_registry() + assert len(data) == 1 + assert data[0]["offset"] == 20 + + # check only second line has been written + output = self.read_output() + assert len(output) == 1 + assert output[0]["message"] == "abcdefghi"