diff --git a/filebeat/input/filestream/identifier.go b/filebeat/input/filestream/identifier.go index 7b28a1d3cba..4e2e5643fda 100644 --- a/filebeat/input/filestream/identifier.go +++ b/filebeat/input/filestream/identifier.go @@ -76,9 +76,13 @@ func (f fileSource) Name() string { } // newFileIdentifier creates a new state identifier for a log input. -func newFileIdentifier(ns *common.ConfigNamespace) (fileIdentifier, error) { +func newFileIdentifier(ns *common.ConfigNamespace, suffix string) (fileIdentifier, error) { if ns == nil { - return newINodeDeviceIdentifier(nil) + i, err := newINodeDeviceIdentifier(nil) + if err != nil { + return nil, err + } + return withSuffix(i, suffix), nil } identifierType := ns.Name() @@ -87,7 +91,11 @@ func newFileIdentifier(ns *common.ConfigNamespace) (fileIdentifier, error) { return nil, fmt.Errorf("no such file_identity generator: %s", identifierType) } - return f(ns.Config()) + i, err := f(ns.Config()) + if err != nil { + return nil, err + } + return withSuffix(i, suffix), nil } type inodeDeviceIdentifier struct { @@ -159,6 +167,32 @@ func (p *pathIdentifier) Supports(f identifierFeature) bool { return false } +type suffixIdentifier struct { + i fileIdentifier + suffix string +} + +func withSuffix(inner fileIdentifier, suffix string) fileIdentifier { + if suffix == "" { + return inner + } + return &suffixIdentifier{i: inner, suffix: suffix} +} + +func (s *suffixIdentifier) GetSource(e loginp.FSEvent) fileSource { + fs := s.i.GetSource(e) + fs.name += "-" + s.suffix + return fs +} + +func (s *suffixIdentifier) Name() string { + return s.i.Name() +} + +func (s *suffixIdentifier) Supports(f identifierFeature) bool { + return s.i.Supports(f) +} + // mockIdentifier is used for testing type MockIdentifier struct{} diff --git a/filebeat/input/filestream/identifier_test.go b/filebeat/input/filestream/identifier_test.go index f5f6296516e..8b9cb4e5f40 100644 --- a/filebeat/input/filestream/identifier_test.go +++ b/filebeat/input/filestream/identifier_test.go @@ -36,7 +36,7 @@ type testFileIdentifierConfig struct { func TestFileIdentifier(t *testing.T) { t.Run("default file identifier", func(t *testing.T) { - identifier, err := newFileIdentifier(nil) + identifier, err := newFileIdentifier(nil, "") require.NoError(t, err) assert.Equal(t, DefaultIdentifierName, identifier.Name()) @@ -59,6 +59,30 @@ func TestFileIdentifier(t *testing.T) { assert.Equal(t, identifier.Name()+"::"+file.GetOSState(fi).String(), src.Name()) }) + t.Run("default file identifier with suffix", func(t *testing.T) { + identifier, err := newFileIdentifier(nil, "my-suffix") + require.NoError(t, err) + assert.Equal(t, DefaultIdentifierName, identifier.Name()) + + tmpFile, err := ioutil.TempFile("", "test_file_identifier_native") + if err != nil { + t.Fatalf("cannot create temporary file for test: %v", err) + } + defer os.Remove(tmpFile.Name()) + + fi, err := tmpFile.Stat() + if err != nil { + t.Fatalf("cannot stat temporary file for test: %v", err) + } + + src := identifier.GetSource(loginp.FSEvent{ + NewPath: tmpFile.Name(), + Info: fi, + }) + + assert.Equal(t, identifier.Name()+"::"+file.GetOSState(fi).String()+"-my-suffix", src.Name()) + }) + t.Run("path identifier", func(t *testing.T) { c := common.MustNewConfigFrom(map[string]interface{}{ "identifier": map[string]interface{}{ @@ -69,7 +93,7 @@ func TestFileIdentifier(t *testing.T) { err := c.Unpack(&cfg) require.NoError(t, err) - identifier, err := newFileIdentifier(cfg.Identifier) + identifier, err := newFileIdentifier(cfg.Identifier, "") require.NoError(t, err) assert.Equal(t, pathName, identifier.Name()) diff --git a/filebeat/input/filestream/prospector_creator.go b/filebeat/input/filestream/prospector_creator.go index 59f86d1426a..f792b075cf3 100644 --- a/filebeat/input/filestream/prospector_creator.go +++ b/filebeat/input/filestream/prospector_creator.go @@ -43,7 +43,7 @@ func newProspector(config config) (loginp.Prospector, error) { return nil, fmt.Errorf("error while creating filewatcher %v", err) } - identifier, err := newFileIdentifier(config.FileIdentity) + identifier, err := newFileIdentifier(config.FileIdentity, getIdentifierSuffix(config)) if err != nil { return nil, fmt.Errorf("error while creating file identifier: %v", err) } @@ -104,3 +104,7 @@ func newProspector(config config) (loginp.Prospector, error) { } return nil, fmt.Errorf("no such rotation method: %s", rotationMethod) } + +func getIdentifierSuffix(config config) string { + return config.Reader.Parsers.Suffix +} diff --git a/libbeat/reader/parser/parser.go b/libbeat/reader/parser/parser.go index 151e912416a..f54c5b98dba 100644 --- a/libbeat/reader/parser/parser.go +++ b/libbeat/reader/parser/parser.go @@ -49,6 +49,8 @@ type CommonConfig struct { } type Config struct { + Suffix string + pCfg CommonConfig parsers []common.ConfigNamespace } @@ -79,6 +81,7 @@ func (c *Config) Unpack(cc *common.Config) error { } func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) { + var suffix string for _, ns := range parsers { name := ns.Name() switch name { @@ -103,12 +106,19 @@ func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, er if err != nil { return nil, fmt.Errorf("error while parsing container parser config: %+v", err) } + if config.Stream != readjson.All { + if suffix != "" { + return nil, fmt.Errorf("only one stream selection is allowed") + } + suffix = config.Stream.String() + } default: return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name) } } return &Config{ + Suffix: suffix, pCfg: pCfg, parsers: parsers, }, nil diff --git a/libbeat/reader/parser/parser_test.go b/libbeat/reader/parser/parser_test.go index 37eba5d15f9..1fdf09ef719 100644 --- a/libbeat/reader/parser/parser_test.go +++ b/libbeat/reader/parser/parser_test.go @@ -32,6 +32,75 @@ import ( "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" ) +func TestParsersConfigSuffix(t *testing.T) { + tests := map[string]struct { + parsers map[string]interface{} + expectedSuffix string + expectedError string + }{ + "parsers with no suffix config": { + parsers: map[string]interface{}{ + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "all", + }, + }, + }, + }, + }, + "parsers with correct suffix config": { + parsers: map[string]interface{}{ + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "stdout", + }, + }, + }, + }, + expectedSuffix: "stdout", + }, + "parsers with multiple suffix config": { + parsers: map[string]interface{}{ + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "stdout", + }, + }, + map[string]interface{}{ + "container": map[string]interface{}{ + "stream": "stderr", + }, + }, + }, + }, + expectedError: "only one stream selection is allowed", + }, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + cfg := common.MustNewConfigFrom(test.parsers) + var parsersConfig testParsersConfig + err := cfg.Unpack(&parsersConfig) + require.NoError(t, err) + c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + + if test.expectedError == "" { + require.NoError(t, err) + } else { + require.Contains(t, err.Error(), test.expectedError) + return + } + require.Equal(t, c.Suffix, test.expectedSuffix) + }) + } + +} + func TestParsersConfigAndReading(t *testing.T) { tests := map[string]struct { lines string