From a15739eab4b99d657cebe2b46af73841bfdbb89a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 28 Jun 2021 17:58:24 +0200 Subject: [PATCH 1/9] Add custom suffix to identifiers in filestream input so container parser can be supported completely --- filebeat/input/filestream/environment_test.go | 2 +- filebeat/input/filestream/identifier.go | 34 +++++++++++++------ .../filestream/identifier_inode_deviceid.go | 11 ++++-- .../identifier_inode_deviceid_windows.go | 2 +- filebeat/input/filestream/identifier_test.go | 28 +++++++++++++-- .../input/filestream/prospector_creator.go | 23 ++++++++++++- filebeat/input/filestream/prospector_test.go | 2 +- 7 files changed, 82 insertions(+), 20 deletions(-) diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 8e4fc3d799a..d1fd24f1921 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -281,7 +281,7 @@ func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, e } func getIDFromPath(filepath string, fi os.FileInfo) string { - identifier, _ := newINodeDeviceIdentifier(nil) + identifier, _ := newINodeDeviceIdentifier(nil, "") src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: filepath}) return "filestream::.global::" + src.Name() } diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go index 7b28a1d3cba..7def478ad05 100644 --- a/filebeat/input/filestream/identifier.go +++ b/filebeat/input/filestream/identifier.go @@ -49,7 +49,7 @@ var ( } ) -type identifierFactory func(*common.Config) (fileIdentifier, error) +type identifierFactory func(c *common.Config, suffix string) (fileIdentifier, error) type fileIdentifier interface { GetSource(loginp.FSEvent) fileSource @@ -76,9 +76,9 @@ func (f fileSource) Name() string { } // newFileIdentifier creates a new state identifier for a log input. -func newFileIdentifier(ns *common.ConfigNamespace) (fileIdentifier, error) { +func newFileIdentifier(ns *common.ConfigNamespace, suffix string) (fileIdentifier, error) { if ns == nil { - return newINodeDeviceIdentifier(nil) + return newINodeDeviceIdentifier(nil, suffix) } identifierType := ns.Name() @@ -87,27 +87,33 @@ func newFileIdentifier(ns *common.ConfigNamespace) (fileIdentifier, error) { return nil, fmt.Errorf("no such file_identity generator: %s", identifierType) } - return f(ns.Config()) + return f(ns.Config(), suffix) } type inodeDeviceIdentifier struct { - name string + name string + suffix string } -func newINodeDeviceIdentifier(_ *common.Config) (fileIdentifier, error) { +func newINodeDeviceIdentifier(_ *common.Config, suffix string) (fileIdentifier, error) { return &inodeDeviceIdentifier{ - name: nativeName, + name: nativeName, + suffix: suffix, }, nil } func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource { + name := i.name + identitySep + file.GetOSState(e.Info).String() + if i.suffix != "" { + name += "-" + i.suffix + } return fileSource{ info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, truncated: e.Op == loginp.OpTruncate, archived: e.Op == loginp.OpArchived, - name: i.name + identitySep + file.GetOSState(e.Info).String(), + name: name, identifierGenerator: i.name, } } @@ -126,12 +132,14 @@ func (i *inodeDeviceIdentifier) Supports(f identifierFeature) bool { } type pathIdentifier struct { - name string + name string + suffix string } -func newPathIdentifier(_ *common.Config) (fileIdentifier, error) { +func newPathIdentifier(_ *common.Config, suffix string) (fileIdentifier, error) { return &pathIdentifier{ - name: pathName, + name: pathName, + suffix: suffix, }, nil } @@ -140,6 +148,10 @@ func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource { if e.Op == loginp.OpDelete { path = e.OldPath } + name := p.name + identitySep + path + if p.suffix != "" { + name += "-" + p.suffix + } return fileSource{ info: e.Info, newPath: e.NewPath, diff --git a/filebeat/input/filestream/identifier_inode_deviceid.go b/filebeat/input/filestream/identifier_inode_deviceid.go index 291bc0ad357..25997e58714 100644 --- a/filebeat/input/filestream/identifier_inode_deviceid.go +++ b/filebeat/input/filestream/identifier_inode_deviceid.go @@ -35,13 +35,14 @@ import ( type inodeMarkerIdentifier struct { log *logp.Logger name string + suffix string markerPath string markerFileLastModifitaion time.Time markerTxt string } -func newINodeMarkerIdentifier(cfg *common.Config) (fileIdentifier, error) { +func newINodeMarkerIdentifier(cfg *common.Config, suffix string) (fileIdentifier, error) { var config struct { MarkerPath string `config:"path" validate:"required"` } @@ -61,6 +62,7 @@ func newINodeMarkerIdentifier(cfg *common.Config) (fileIdentifier, error) { return &inodeMarkerIdentifier{ log: logp.NewLogger("inode_marker_identifier_" + filepath.Base(config.MarkerPath)), name: inodeMarkerName, + suffix: suffix, markerPath: config.MarkerPath, markerFileLastModifitaion: fi.ModTime(), markerTxt: string(markerContent), @@ -93,14 +95,17 @@ func (i *inodeMarkerIdentifier) markerContents() string { } func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource { - osstate := file.GetOSState(e.Info) + name := i.name + identitySep + file.GetOSState(e.Info).String() + "-" + i.markerContents() + if i.suffix != "" { + name += name + "-" + i.suffix + } return fileSource{ info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, truncated: e.Op == loginp.OpTruncate, archived: e.Op == loginp.OpArchived, - name: i.name + identitySep + osstate.InodeString() + "-" + i.markerContents(), + name: name, identifierGenerator: i.name, } } diff --git a/filebeat/input/filestream/identifier_inode_deviceid_windows.go b/filebeat/input/filestream/identifier_inode_deviceid_windows.go index 4ee8d866124..95c900bb25f 100644 --- a/filebeat/input/filestream/identifier_inode_deviceid_windows.go +++ b/filebeat/input/filestream/identifier_inode_deviceid_windows.go @@ -25,6 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -func newINodeMarkerIdentifier(cfg *common.Config) (fileIdentifier, error) { +func newINodeMarkerIdentifier(cfg *common.Config, suffix string) (fileIdentifier, error) { return nil, fmt.Errorf("inode_deviceid is not supported on Windows") } diff --git a/filebeat/input/filestream/identifier_test.go b/filebeat/input/filestream/identifier_test.go index f5f6296516e..8b9cb4e5f40 100644 --- a/filebeat/input/filestream/identifier_test.go +++ b/filebeat/input/filestream/identifier_test.go @@ -36,7 +36,7 @@ type testFileIdentifierConfig struct { func TestFileIdentifier(t *testing.T) { t.Run("default file identifier", func(t *testing.T) { - identifier, err := newFileIdentifier(nil) + identifier, err := newFileIdentifier(nil, "") require.NoError(t, err) assert.Equal(t, DefaultIdentifierName, identifier.Name()) @@ -59,6 +59,30 @@ func TestFileIdentifier(t *testing.T) { assert.Equal(t, identifier.Name()+"::"+file.GetOSState(fi).String(), src.Name()) }) + t.Run("default file identifier with suffix", func(t *testing.T) { + identifier, err := newFileIdentifier(nil, "my-suffix") + require.NoError(t, err) + assert.Equal(t, DefaultIdentifierName, identifier.Name()) + + tmpFile, err := ioutil.TempFile("", "test_file_identifier_native") + if err != nil { + t.Fatalf("cannot create temporary file for test: %v", err) + } + defer os.Remove(tmpFile.Name()) + + fi, err := tmpFile.Stat() + if err != nil { + t.Fatalf("cannot stat temporary file for test: %v", err) + } + + src := identifier.GetSource(loginp.FSEvent{ + NewPath: tmpFile.Name(), + Info: fi, + }) + + assert.Equal(t, identifier.Name()+"::"+file.GetOSState(fi).String()+"-my-suffix", src.Name()) + }) + t.Run("path identifier", func(t *testing.T) { c := common.MustNewConfigFrom(map[string]interface{}{ "identifier": map[string]interface{}{ @@ -69,7 +93,7 @@ func TestFileIdentifier(t *testing.T) { err := c.Unpack(&cfg) require.NoError(t, err) - identifier, err := newFileIdentifier(cfg.Identifier) + identifier, err := newFileIdentifier(cfg.Identifier, "") require.NoError(t, err) assert.Equal(t, pathName, identifier.Name()) diff --git a/filebeat/input/filestream/prospector_creator.go b/filebeat/input/filestream/prospector_creator.go index 59f86d1426a..bc48230e841 100644 --- a/filebeat/input/filestream/prospector_creator.go +++ b/filebeat/input/filestream/prospector_creator.go @@ -24,6 +24,7 @@ import ( loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + "github.com/elastic/beats/v7/libbeat/reader/readjson" ) const ( @@ -43,7 +44,7 @@ func newProspector(config config) (loginp.Prospector, error) { return nil, fmt.Errorf("error while creating filewatcher %v", err) } - identifier, err := newFileIdentifier(config.FileIdentity) + identifier, err := newFileIdentifier(config.FileIdentity, getIdentifierSuffix(config)) if err != nil { return nil, fmt.Errorf("error while creating file identifier: %v", err) } @@ -104,3 +105,23 @@ func newProspector(config config) (loginp.Prospector, error) { } return nil, fmt.Errorf("no such rotation method: %s", rotationMethod) } + +func getIdentifierSuffix(config config) string { + if config.Reader.Parsers == (parsers.Config{}) { + return "" + } + + for _, ns := range config.Reader.Parsers { + if ns.Name() == "container" { + var c readjson.ContainerJSONConfig + err := ns.Config().Unpack(&c) + if err != nil { + return "" + } + if c.Stream != readjson.All { + return c.Stream.String() + } + } + } + return "" +} diff --git a/filebeat/input/filestream/prospector_test.go b/filebeat/input/filestream/prospector_test.go index 83dc2055df0..8e84a2d3a64 100644 --- a/filebeat/input/filestream/prospector_test.go +++ b/filebeat/input/filestream/prospector_test.go @@ -545,7 +545,7 @@ type renamedPathIdentifier struct { func (p *renamedPathIdentifier) Supports(_ identifierFeature) bool { return true } func mustPathIdentifier(renamed bool) fileIdentifier { - pathIdentifier, err := newPathIdentifier(nil) + pathIdentifier, err := newPathIdentifier(nil, "") if err != nil { panic(err) } From d4182cb847321bf3f43f93dd9e58d890a2af4260 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 13 Jul 2021 12:34:06 +0200 Subject: [PATCH 2/9] pass suffix differently --- .../input/filestream/prospector_creator.go | 19 +------------------ libbeat/reader/parser/parser.go | 7 +++++++ 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/filebeat/input/filestream/prospector_creator.go b/filebeat/input/filestream/prospector_creator.go index bc48230e841..f792b075cf3 100644 --- a/filebeat/input/filestream/prospector_creator.go +++ b/filebeat/input/filestream/prospector_creator.go @@ -24,7 +24,6 @@ import ( loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" - "github.com/elastic/beats/v7/libbeat/reader/readjson" ) const ( @@ -107,21 +106,5 @@ func newProspector(config config) (loginp.Prospector, error) { } func getIdentifierSuffix(config config) string { - if config.Reader.Parsers == (parsers.Config{}) { - return "" - } - - for _, ns := range config.Reader.Parsers { - if ns.Name() == "container" { - var c readjson.ContainerJSONConfig - err := ns.Config().Unpack(&c) - if err != nil { - return "" - } - if c.Stream != readjson.All { - return c.Stream.String() - } - } - } - return "" + return config.Reader.Parsers.Suffix } diff --git a/libbeat/reader/parser/parser.go b/libbeat/reader/parser/parser.go index 151e912416a..044384f2d31 100644 --- a/libbeat/reader/parser/parser.go +++ b/libbeat/reader/parser/parser.go @@ -49,6 +49,8 @@ type CommonConfig struct { } type Config struct { + Suffix string + pCfg CommonConfig parsers []common.ConfigNamespace } @@ -79,6 +81,7 @@ func (c *Config) Unpack(cc *common.Config) error { } func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) { + var suffix string for _, ns := range parsers { name := ns.Name() switch name { @@ -103,12 +106,16 @@ func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, er if err != nil { return nil, fmt.Errorf("error while parsing container parser config: %+v", err) } + if config.Stream != readjson.All { + suffix = config.Stream.String() + } default: return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name) } } return &Config{ + Suffix: suffix, pCfg: pCfg, parsers: parsers, }, nil From 481b1aff11355b492eff3930b768c9edf92a9334 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 13 Jul 2021 15:01:59 +0200 Subject: [PATCH 3/9] address review notes --- filebeat/input/filestream/identifier.go | 66 ++++++++++++------- .../filestream/identifier_inode_deviceid.go | 10 +-- .../identifier_inode_deviceid_windows.go | 2 +- 3 files changed, 47 insertions(+), 31 deletions(-) diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go index 7def478ad05..23a0f2a304d 100644 --- a/filebeat/input/filestream/identifier.go +++ b/filebeat/input/filestream/identifier.go @@ -49,7 +49,7 @@ var ( } ) -type identifierFactory func(c *common.Config, suffix string) (fileIdentifier, error) +type identifierFactory func(c *common.Config) (fileIdentifier, error) type fileIdentifier interface { GetSource(loginp.FSEvent) fileSource @@ -78,7 +78,11 @@ func (f fileSource) Name() string { // newFileIdentifier creates a new state identifier for a log input. func newFileIdentifier(ns *common.ConfigNamespace, suffix string) (fileIdentifier, error) { if ns == nil { - return newINodeDeviceIdentifier(nil, suffix) + i, err := newINodeDeviceIdentifier(nil) + if err != nil { + return nil, err + } + return withSuffix(i, suffix), nil } identifierType := ns.Name() @@ -87,33 +91,31 @@ func newFileIdentifier(ns *common.ConfigNamespace, suffix string) (fileIdentifie return nil, fmt.Errorf("no such file_identity generator: %s", identifierType) } - return f(ns.Config(), suffix) + i, err := f(ns.Config()) + if err != nil { + return nil, err + } + return withSuffix(i, suffix), nil } type inodeDeviceIdentifier struct { - name string - suffix string + name string } -func newINodeDeviceIdentifier(_ *common.Config, suffix string) (fileIdentifier, error) { +func newINodeDeviceIdentifier(_ *common.Config) (fileIdentifier, error) { return &inodeDeviceIdentifier{ - name: nativeName, - suffix: suffix, + name: nativeName, }, nil } func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource { - name := i.name + identitySep + file.GetOSState(e.Info).String() - if i.suffix != "" { - name += "-" + i.suffix - } return fileSource{ info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, truncated: e.Op == loginp.OpTruncate, archived: e.Op == loginp.OpArchived, - name: name, + name: i.name + identitySep + file.GetOSState(e.Info).String(), identifierGenerator: i.name, } } @@ -132,14 +134,12 @@ func (i *inodeDeviceIdentifier) Supports(f identifierFeature) bool { } type pathIdentifier struct { - name string - suffix string + name string } -func newPathIdentifier(_ *common.Config, suffix string) (fileIdentifier, error) { +func newPathIdentifier(_ *common.Config) (fileIdentifier, error) { return &pathIdentifier{ - name: pathName, - suffix: suffix, + name: pathName, }, nil } @@ -148,10 +148,6 @@ func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource { if e.Op == loginp.OpDelete { path = e.OldPath } - name := p.name + identitySep + path - if p.suffix != "" { - name += "-" + p.suffix - } return fileSource{ info: e.Info, newPath: e.NewPath, @@ -171,6 +167,32 @@ func (p *pathIdentifier) Supports(f identifierFeature) bool { return false } +type suffixIdentifier struct { + i fileIdentifier + suffix string +} + +func withSuffix(inner fileIdentifier, suffix string) fileIdentifier { + if suffix == "" { + return inner + } + return &suffixIdentifier{i: inner, suffix: suffix} +} + +func (s *suffixIdentifier) GetSource(e loginp.FSEvent) fileSource { + fs := s.i.GetSource(e) + fs.name += "-" + s.suffix + return fs +} + +func (s *suffixIdentifier) Name() string { + return s.i.Name() +} + +func (s *suffixIdentifier) Supports(f identifierFeature) bool { + return s.i.Supports(f) +} + // mockIdentifier is used for testing type MockIdentifier struct{} diff --git a/filebeat/input/filestream/identifier_inode_deviceid.go b/filebeat/input/filestream/identifier_inode_deviceid.go index 25997e58714..174dddf4020 100644 --- a/filebeat/input/filestream/identifier_inode_deviceid.go +++ b/filebeat/input/filestream/identifier_inode_deviceid.go @@ -35,14 +35,13 @@ import ( type inodeMarkerIdentifier struct { log *logp.Logger name string - suffix string markerPath string markerFileLastModifitaion time.Time markerTxt string } -func newINodeMarkerIdentifier(cfg *common.Config, suffix string) (fileIdentifier, error) { +func newINodeMarkerIdentifier(cfg *common.Config) (fileIdentifier, error) { var config struct { MarkerPath string `config:"path" validate:"required"` } @@ -62,7 +61,6 @@ func newINodeMarkerIdentifier(cfg *common.Config, suffix string) (fileIdentifier return &inodeMarkerIdentifier{ log: logp.NewLogger("inode_marker_identifier_" + filepath.Base(config.MarkerPath)), name: inodeMarkerName, - suffix: suffix, markerPath: config.MarkerPath, markerFileLastModifitaion: fi.ModTime(), markerTxt: string(markerContent), @@ -95,17 +93,13 @@ func (i *inodeMarkerIdentifier) markerContents() string { } func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource { - name := i.name + identitySep + file.GetOSState(e.Info).String() + "-" + i.markerContents() - if i.suffix != "" { - name += name + "-" + i.suffix - } return fileSource{ info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, truncated: e.Op == loginp.OpTruncate, archived: e.Op == loginp.OpArchived, - name: name, + name: i.name + identitySep + file.GetOSState(e.Info).String() + "-" + i.markerContents(), identifierGenerator: i.name, } } diff --git a/filebeat/input/filestream/identifier_inode_deviceid_windows.go b/filebeat/input/filestream/identifier_inode_deviceid_windows.go index 95c900bb25f..4ee8d866124 100644 --- a/filebeat/input/filestream/identifier_inode_deviceid_windows.go +++ b/filebeat/input/filestream/identifier_inode_deviceid_windows.go @@ -25,6 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -func newINodeMarkerIdentifier(cfg *common.Config, suffix string) (fileIdentifier, error) { +func newINodeMarkerIdentifier(cfg *common.Config) (fileIdentifier, error) { return nil, fmt.Errorf("inode_deviceid is not supported on Windows") } From 17ba1a792cd134576ef8b71cfeaf6356a7b54c89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 13 Jul 2021 15:27:03 +0200 Subject: [PATCH 4/9] fix test code --- filebeat/input/filestream/environment_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index d1fd24f1921..8e4fc3d799a 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -281,7 +281,7 @@ func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, e } func getIDFromPath(filepath string, fi os.FileInfo) string { - identifier, _ := newINodeDeviceIdentifier(nil, "") + identifier, _ := newINodeDeviceIdentifier(nil) src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: filepath}) return "filestream::.global::" + src.Name() } From 61ed885a02c11f2c627c51d2d296f08691b05aa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 13 Jul 2021 15:30:12 +0200 Subject: [PATCH 5/9] minimize change --- filebeat/input/filestream/identifier.go | 2 +- filebeat/input/filestream/identifier_inode_deviceid.go | 3 ++- filebeat/input/filestream/prospector_test.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go index 23a0f2a304d..4e2e5643fda 100644 --- a/filebeat/input/filestream/identifier.go +++ b/filebeat/input/filestream/identifier.go @@ -49,7 +49,7 @@ var ( } ) -type identifierFactory func(c *common.Config) (fileIdentifier, error) +type identifierFactory func(*common.Config) (fileIdentifier, error) type fileIdentifier interface { GetSource(loginp.FSEvent) fileSource diff --git a/filebeat/input/filestream/identifier_inode_deviceid.go b/filebeat/input/filestream/identifier_inode_deviceid.go index 174dddf4020..a4bc8e9849e 100644 --- a/filebeat/input/filestream/identifier_inode_deviceid.go +++ b/filebeat/input/filestream/identifier_inode_deviceid.go @@ -93,13 +93,14 @@ func (i *inodeMarkerIdentifier) markerContents() string { } func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource { + osstate := file.GetOSState(e.Info) return fileSource{ info: e.Info, newPath: e.NewPath, oldPath: e.OldPath, truncated: e.Op == loginp.OpTruncate, archived: e.Op == loginp.OpArchived, - name: i.name + identitySep + file.GetOSState(e.Info).String() + "-" + i.markerContents(), + name: i.name + identitySep + osstate.String() + "-" + i.markerContents(), identifierGenerator: i.name, } } diff --git a/filebeat/input/filestream/prospector_test.go b/filebeat/input/filestream/prospector_test.go index 8e84a2d3a64..83dc2055df0 100644 --- a/filebeat/input/filestream/prospector_test.go +++ b/filebeat/input/filestream/prospector_test.go @@ -545,7 +545,7 @@ type renamedPathIdentifier struct { func (p *renamedPathIdentifier) Supports(_ identifierFeature) bool { return true } func mustPathIdentifier(renamed bool) fileIdentifier { - pathIdentifier, err := newPathIdentifier(nil, "") + pathIdentifier, err := newPathIdentifier(nil) if err != nil { panic(err) } From 016031bcd952ad2cb431e426be7d5eca37acec78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 13 Jul 2021 15:31:19 +0200 Subject: [PATCH 6/9] more fix --- filebeat/input/filestream/identifier_inode_deviceid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/input/filestream/identifier_inode_deviceid.go b/filebeat/input/filestream/identifier_inode_deviceid.go index a4bc8e9849e..291bc0ad357 100644 --- a/filebeat/input/filestream/identifier_inode_deviceid.go +++ b/filebeat/input/filestream/identifier_inode_deviceid.go @@ -100,7 +100,7 @@ func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource { oldPath: e.OldPath, truncated: e.Op == loginp.OpTruncate, archived: e.Op == loginp.OpArchived, - name: i.name + identitySep + osstate.String() + "-" + i.markerContents(), + name: i.name + identitySep + osstate.InodeString() + "-" + i.markerContents(), identifierGenerator: i.name, } } From 99479847fcb50c8d1a2944fafd0c053590c914a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 14 Jul 2021 13:19:10 +0200 Subject: [PATCH 7/9] add error if multiple suffix is configured --- libbeat/reader/parser/parser.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libbeat/reader/parser/parser.go b/libbeat/reader/parser/parser.go index 044384f2d31..2bc599d6de4 100644 --- a/libbeat/reader/parser/parser.go +++ b/libbeat/reader/parser/parser.go @@ -107,6 +107,9 @@ func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, er return nil, fmt.Errorf("error while parsing container parser config: %+v", err) } if config.Stream != readjson.All { + if suffix != "" { + return fmt.Errorf("only one stream selection is allowed") + } suffix = config.Stream.String() } default: From ba77a509a68417e22b1daba6cb2902abafae90a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 14 Jul 2021 14:14:55 +0200 Subject: [PATCH 8/9] fix error :facepalm: --- libbeat/reader/parser/parser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/reader/parser/parser.go b/libbeat/reader/parser/parser.go index 2bc599d6de4..f54c5b98dba 100644 --- a/libbeat/reader/parser/parser.go +++ b/libbeat/reader/parser/parser.go @@ -108,7 +108,7 @@ func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, er } if config.Stream != readjson.All { if suffix != "" { - return fmt.Errorf("only one stream selection is allowed") + return nil, fmt.Errorf("only one stream selection is allowed") } suffix = config.Stream.String() } From d14cfb188137aa76553f66ff3a6d809270db5bd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Thu, 15 Jul 2021 15:46:27 +0200 Subject: [PATCH 9/9] add tests for parser suffix --- libbeat/reader/parser/parser_test.go | 69 ++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/libbeat/reader/parser/parser_test.go b/libbeat/reader/parser/parser_test.go index 37eba5d15f9..1fdf09ef719 100644 --- a/libbeat/reader/parser/parser_test.go +++ b/libbeat/reader/parser/parser_test.go @@ -32,6 +32,75 @@ import ( "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" ) +func TestParsersConfigSuffix(t *testing.T) { + tests := map[string]struct { + parsers map[string]interface{} + expectedSuffix string + expectedError string + }{ + "parsers with no suffix config": { + parsers: map[string]interface{}{ + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "all", + }, + }, + }, + }, + }, + "parsers with correct suffix config": { + parsers: map[string]interface{}{ + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "stdout", + }, + }, + }, + }, + expectedSuffix: "stdout", + }, + "parsers with multiple suffix config": { + parsers: map[string]interface{}{ + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "stdout", + }, + }, + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "stderr", + }, + }, + }, + }, + expectedError: "only one stream selection is allowed", + }, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + cfg := common.MustNewConfigFrom(test.parsers) + var parsersConfig testParsersConfig + err := cfg.Unpack(&parsersConfig) + require.NoError(t, err) + c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + + if test.expectedError == "" { + require.NoError(t, err) + } else { + require.Contains(t, err.Error(), test.expectedError) + return + } + require.Equal(t, c.Suffix, test.expectedSuffix) + }) + } + +} + func TestParsersConfigAndReading(t *testing.T) { tests := map[string]struct { lines string