From 1f27b6f37b9a428b4a9635d8143cbfcb62da5a75 Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Tue, 10 Jan 2017 15:58:49 +0100 Subject: [PATCH] Fix bug when migrating old states, 2 restarts are required (#3322) (#3325) The state of migrated states was not properly updated in the registry file. This lead to the issue that after the first restart, the states were migrated but the prospector assumed the states were not finished and didn't start harvesting. A second restart resolved the problem. Discussion started here: https://discuss.elastic.co/t/filebeat-upgrade-requiring-multiple-restarts/70414/8 (cherry picked from commit 61432ab0ada7b033225a35b7f1793d385b486a1f) --- CHANGELOG.asciidoc | 1 + filebeat/registrar/registrar.go | 24 ++-- filebeat/tests/system/test_migration.py | 178 ++++++++++++++++++++++++ filebeat/tests/system/test_registrar.py | 108 -------------- 4 files changed, 194 insertions(+), 117 deletions(-) create mode 100644 filebeat/tests/system/test_migration.py diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index c2afb4f65969..4aef718b2d81 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -41,6 +41,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] *Filebeat* - Fix registry cleanup issue when files falling under ignore_older after restart. {issue}2818[2818] +- Fix registry migration issue from old states were files were only harvested after second restart. {pull}3322[3322] *Winlogbeat* - Fix for "The array bounds are invalid" error when reading large events. {issue}3076[3076] diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index bc2a98c82aa8..5ac9ff8bebc9 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -118,15 +118,7 @@ func (r *Registrar) loadStates() error { return fmt.Errorf("Error decoding states: %s", err) } - // Set all states to finished and disable TTL on restart - // For all states covered by a prospector, TTL will be overwritten with the prospector value - for key, state := range states { - state.Finished = true - // Set ttl to -2 to easily spot which states are not managed by a prospector - state.TTL = -2 - states[key] = state - } - + states = resetStates(states) r.states.SetStates(states) logp.Info("States Loaded from registrar: %+v", len(states)) @@ -176,6 +168,7 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool { // Convert old states to new states logp.Info("Old registry states found: %v", len(oldStates)) states := convertOldStates(oldStates) + states = resetStates(states) r.states.SetStates(states) // Rewrite registry in new format @@ -186,6 +179,19 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool { return true } +// resetStates sets all states to finished and disable TTL on restart +// For all states covered by a prospector, TTL will be overwritten with the prospector value +func resetStates(states []file.State) []file.State { + + for key, state := range states { + state.Finished = true + // Set ttl to -2 to easily spot which states are not managed by a prospector + state.TTL = -2 + states[key] = state + } + return states +} + func convertOldStates(oldStates map[string]file.State) []file.State { // Convert old states to new states states := []file.State{} diff --git a/filebeat/tests/system/test_migration.py b/filebeat/tests/system/test_migration.py new file mode 100644 index 000000000000..62fe99c0198b --- /dev/null +++ b/filebeat/tests/system/test_migration.py @@ -0,0 +1,178 @@ +from filebeat import BaseTest + +import os +import platform +import time +import shutil +import json +import stat +from nose.plugins.skip import Skip, SkipTest + + +class Test(BaseTest): + + def test_migration_non_windows(self): + """ + Tests if migration from old filebeat registry to new format works + """ + + if os.name == "nt": + raise SkipTest + + registry_file = self.working_dir + '/registry' + + # Write old registry file + with open(registry_file, 'w') as f: + f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}}') + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + clean_removed="false", + clean_inactive="0", + ) + + filebeat = self.start_beat() + + self.wait_until( + lambda: self.log_contains("Old registry states found: 2"), + max_timeout=15) + + self.wait_until( + lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), + max_timeout=15) + + filebeat.check_kill_and_wait() + + # Check if content is same as above + assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 + assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 + + # Compare first entry + oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}') + newJson = self.get_registry_entry_by_path("logs/hello.log") + del newJson["timestamp"] + del newJson["ttl"] + assert newJson == oldJson + + # Compare second entry + oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}') + newJson = self.get_registry_entry_by_path("logs/log2.log") + del newJson["timestamp"] + del newJson["ttl"] + assert newJson == oldJson + + # Make sure the right number of entries is in + data = self.get_registry() + assert len(data) == 2 + + def test_migration_windows(self): + """ + Tests if migration from old filebeat registry to new format works + """ + + if os.name != "nt": + raise SkipTest + + registry_file = self.working_dir + '/registry' + + # Write old registry file + with open(registry_file, 'w') as f: + f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}}') + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + ) + + filebeat = self.start_beat() + + self.wait_until( + lambda: self.log_contains("Old registry states found: 2"), + max_timeout=15) + + self.wait_until( + lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), + max_timeout=15) + + filebeat.check_kill_and_wait() + + # Check if content is same as above + assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 + assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 + + # Compare first entry + oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}') + newJson = self.get_registry_entry_by_path("logs/hello.log") + del newJson["timestamp"] + del newJson["ttl"] + assert newJson == oldJson + + # Compare second entry + oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}') + newJson = self.get_registry_entry_by_path("logs/log2.log") + del newJson["timestamp"] + del newJson["ttl"] + assert newJson == oldJson + + # Make sure the right number of entries is in + data = self.get_registry() + assert len(data) == 2 + + def test_migration_continue_reading(self): + """ + Tests if after the migration filebeat keeps reading the file + """ + + os.mkdir(self.working_dir + "/log/") + testfile1 = self.working_dir + "/log/test.log" + + with open(testfile1, 'w') as f: + f.write("entry10\n") + + registry_file = self.working_dir + '/registry' + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + output_file_filename="filebeat_1", + ) + + # Run filebeat to create a registry + filebeat = self.start_beat(output="filebeat1.log") + self.wait_until( + lambda: self.output_has(lines=1, output_file="output/filebeat_1"), + max_timeout=10) + filebeat.check_kill_and_wait() + + # Create old registry file out of the new one + r = self.get_registry() + registry_entry = r[0] + del registry_entry["timestamp"] + del registry_entry["ttl"] + old_registry = {registry_entry["source"]: registry_entry} + + # Overwrite registry + with open(registry_file, 'w') as f: + json.dump(old_registry, f) + + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + output_file_filename="filebeat_2", + ) + + filebeat = self.start_beat(output="filebeat2.log") + + # Wait until state is migrated + self.wait_until( + lambda: self.log_contains( + "Old states converted to new states and written to registrar: 1", "filebeat2.log"), + max_timeout=10) + + with open(testfile1, 'a') as f: + f.write("entry12\n") + + # After restart new output file is created -> only 1 new entry + self.wait_until( + lambda: self.output_has(lines=1, output_file="output/filebeat_2"), + max_timeout=10) + + filebeat.check_kill_and_wait() diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 6bdd8ed5fe44..04984139858d 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -624,114 +624,6 @@ def test_state_after_rotation_ignore_older(self): assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 9 assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 8 - - def test_migration_non_windows(self): - """ - Tests if migration from old filebeat registry to new format works - """ - - if os.name == "nt": - raise SkipTest - - registry_file = self.working_dir + '/registry' - - # Write old registry file - with open(registry_file, 'w') as f: - f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}}') - - self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/input*", - clean_removed="false", - clean_inactive="0", - ) - - filebeat = self.start_beat() - - self.wait_until( - lambda: self.log_contains("Old registry states found: 2"), - max_timeout=15) - - self.wait_until( - lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), - max_timeout=15) - - filebeat.check_kill_and_wait() - - # Check if content is same as above - assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 - assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 - - # Compare first entry - oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}') - newJson = self.get_registry_entry_by_path("logs/hello.log") - del newJson["timestamp"] - del newJson["ttl"] - assert newJson == oldJson - - # Compare second entry - oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}') - newJson = self.get_registry_entry_by_path("logs/log2.log") - del newJson["timestamp"] - del newJson["ttl"] - assert newJson == oldJson - - # Make sure the right number of entries is in - data = self.get_registry() - assert len(data) == 2 - - def test_migration_windows(self): - """ - Tests if migration from old filebeat registry to new format works - """ - - if os.name != "nt": - raise SkipTest - - registry_file = self.working_dir + '/registry' - - # Write old registry file - with open(registry_file, 'w') as f: - f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}}') - - self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/input*", - ) - - filebeat = self.start_beat() - - self.wait_until( - lambda: self.log_contains("Old registry states found: 2"), - max_timeout=15) - - self.wait_until( - lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), - max_timeout=15) - - filebeat.check_kill_and_wait() - - # Check if content is same as above - assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 - assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 - - # Compare first entry - oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}') - newJson = self.get_registry_entry_by_path("logs/hello.log") - del newJson["timestamp"] - del newJson["ttl"] - assert newJson == oldJson - - # Compare second entry - oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}') - newJson = self.get_registry_entry_by_path("logs/log2.log") - del newJson["timestamp"] - del newJson["ttl"] - assert newJson == oldJson - - # Make sure the right number of entries is in - data = self.get_registry() - assert len(data) == 2 - - def test_clean_inactive(self): """ Checks that states are properly removed after clean_inactive