From 8b6678f2754b28538ab45da74e0b6ee862318697 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 28 Jun 2021 17:50:47 +0200 Subject: [PATCH 1/5] Move parsers outside of filestream input so others can use them as well --- filebeat/input/filestream/config.go | 3 +- filebeat/input/filestream/input.go | 21 ++++-- .../filestream => libbeat/parser}/parser.go | 71 +++++++++++++------ .../parser}/parser_test.go | 61 ++++++++-------- 4 files changed, 94 insertions(+), 62 deletions(-) rename {filebeat/input/filestream => libbeat/parser}/parser.go (71%) rename {filebeat/input/filestream => libbeat/parser}/parser_test.go (88%) diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 007da10a045..7aff3328e8f 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/match" + "github.com/elastic/beats/v7/libbeat/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" ) @@ -136,7 +137,7 @@ func (c *config) Validate() error { return fmt.Errorf("no path is configured") } - if err := validateParserConfig(parserConfig{maxBytes: c.Reader.MaxBytes, lineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil { + if _, err := parser.NewConfig(nil, parser.CommonConfig{MaxBytes: c.Reader.MaxBytes, LineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil { return fmt.Errorf("cannot parse parser configuration: %+v", err) } diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index e143280e5b9..f5b5e73e363 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -33,6 +33,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/match" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/parser" "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/debug" "github.com/elastic/beats/v7/libbeat/reader/readfile" @@ -57,7 +58,7 @@ type filestream struct { encodingFactory encoding.EncodingFactory encoding encoding.Encoding closerConfig closerConfig - parserConfig []common.ConfigNamespace + parserConfig *parser.Config } // Plugin creates a new filestream input plugin for creating a stateful input. @@ -93,10 +94,23 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding) } + parsers, err := parser.NewConfig( + nil, + parser.CommonConfig{ + MaxBytes: config.Reader.MaxBytes, + LineTerminator: config.Reader.LineTerminator, + }, + config.Reader.Parsers, + ) + if err != nil { + return nil, nil, fmt.Errorf("cannot create parsers: %v", err) + } + filestream := &filestream{ readerConfig: config.Reader, encodingFactory: encodingFactory, closerConfig: config.Close, + parserConfig: parsers, } return prospector, filestream, nil @@ -219,10 +233,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo r = readfile.NewFilemeta(r, fs.newPath) - r, err = newParsers(r, parserConfig{maxBytes: inp.readerConfig.MaxBytes, lineTerminator: inp.readerConfig.LineTerminator}, inp.readerConfig.Parsers) - if err != nil { - return nil, err - } + r = inp.parserConfig.Create(r) r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes) diff --git a/filebeat/input/filestream/parser.go b/libbeat/parser/parser.go similarity index 71% rename from filebeat/input/filestream/parser.go rename to libbeat/parser/parser.go index c64b8981ae4..770fc33832d 100644 --- a/filebeat/input/filestream/parser.go +++ b/libbeat/parser/parser.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package filestream +package parser import ( "errors" @@ -35,21 +35,28 @@ var ( // parser transforms or translates the Content attribute of a Message. // They are able to aggregate two or more Messages into a single one. -type parser interface { +type Parser interface { io.Closer Next() (reader.Message, error) } -type parserConfig struct { - maxBytes int - lineTerminator readfile.LineTerminator +type CommonConfig struct { + MaxBytes int + LineTerminator readfile.LineTerminator } -func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) (parser, error) { - p := in +type Config struct { + pCfg CommonConfig + parsers []common.ConfigNamespace + allowedParsers []string +} - for _, ns := range c { +func NewConfig(allowedParsers []string, pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) { + for _, ns := range parsers { name := ns.Name() + if !isParserAllowed(name, allowedParsers) { + return nil, fmt.Errorf("parser is not allowed: %s", name) + } switch name { case "multiline": var config multiline.Config @@ -58,10 +65,6 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) if err != nil { return nil, fmt.Errorf("error while parsing multiline parser config: %+v", err) } - p, err = multiline.New(p, "\n", pCfg.maxBytes, &config) - if err != nil { - return nil, fmt.Errorf("error while creating multiline parser: %+v", err) - } case "ndjson": var config readjson.ParserConfig cfg := ns.Config() @@ -69,7 +72,6 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) if err != nil { return nil, fmt.Errorf("error while parsing ndjson parser config: %+v", err) } - p = readjson.NewJSONParser(p, &config) case "container": config := readjson.DefaultContainerConfig() cfg := ns.Config() @@ -77,17 +79,34 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) if err != nil { return nil, fmt.Errorf("error while parsing container parser config: %+v", err) } - p = readjson.NewContainerParser(p, &config) default: return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name) } } - return p, nil + return &Config{ + pCfg: pCfg, + parsers: parsers, + allowedParsers: allowedParsers, + }, nil + } -func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error { - for _, ns := range c { +func isParserAllowed(parser string, allowedParsers []string) bool { + if len(allowedParsers) == 0 { + return true + } + for _, p := range allowedParsers { + if p == parser { + return true + } + } + return false +} + +func (c *Config) Create(in reader.Reader) Parser { + p := in + for _, ns := range c.parsers { name := ns.Name() switch name { case "multiline": @@ -95,26 +114,32 @@ func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error { cfg := ns.Config() err := cfg.Unpack(&config) if err != nil { - return fmt.Errorf("error while parsing multiline parser config: %+v", err) + return p + } + p, err = multiline.New(p, "\n", c.pCfg.MaxBytes, &config) + if err != nil { + return p } case "ndjson": - var config readjson.Config + var config readjson.ParserConfig cfg := ns.Config() err := cfg.Unpack(&config) if err != nil { - return fmt.Errorf("error while parsing ndjson parser config: %+v", err) + return p } + p = readjson.NewJSONParser(p, &config) case "container": config := readjson.DefaultContainerConfig() cfg := ns.Config() err := cfg.Unpack(&config) if err != nil { - return fmt.Errorf("error while parsing container parser config: %+v", err) + return p } + p = readjson.NewContainerParser(p, &config) default: - return fmt.Errorf("%s: %s", ErrNoSuchParser, name) + return p } } - return nil + return p } diff --git a/filebeat/input/filestream/parser_test.go b/libbeat/parser/parser_test.go similarity index 88% rename from filebeat/input/filestream/parser_test.go rename to libbeat/parser/parser_test.go index 696729d1e31..058fdfc3d09 100644 --- a/filebeat/input/filestream/parser_test.go +++ b/libbeat/parser/parser_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package filestream +package parser import ( "io" @@ -40,16 +40,13 @@ func TestParsersConfigAndReading(t *testing.T) { expectedError string }{ "no parser, no error": { - lines: "line 1\nline 2\n", - parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, - }, + lines: "line 1\nline 2\n", + parsers: map[string]interface{}{}, expectedMessages: []string{"line 1\n", "line 2\n"}, }, "correct multiline parser": { lines: "line 1.1\nline 1.2\nline 1.3\nline 2.1\nline 2.2\nline 2.3\n", parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "multiline": map[string]interface{}{ @@ -72,7 +69,6 @@ func TestParsersConfigAndReading(t *testing.T) { {"log":"[log] In total there should be 3 events\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} `, parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "ndjson": map[string]interface{}{ @@ -97,7 +93,6 @@ func TestParsersConfigAndReading(t *testing.T) { }, "non existent parser configuration": { parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "no_such_parser": nil, @@ -108,7 +103,6 @@ func TestParsersConfigAndReading(t *testing.T) { }, "invalid multiline parser configuration is caught before parser creation": { parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "multiline": map[string]interface{}{ @@ -124,9 +118,11 @@ func TestParsersConfigAndReading(t *testing.T) { for name, test := range tests { test := test t.Run(name, func(t *testing.T) { - cfg := defaultConfig() - parsersConfig := common.MustNewConfigFrom(test.parsers) - err := parsersConfig.Unpack(&cfg) + cfg := common.MustNewConfigFrom(test.parsers) + var parsersConfig testParsersConfig + err := cfg.Unpack(&parsersConfig) + require.NoError(t, err) + c, err := NewConfig(nil, CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) if test.expectedError == "" { require.NoError(t, err) } else { @@ -134,7 +130,7 @@ func TestParsersConfigAndReading(t *testing.T) { return } - p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 64}, cfg.Reader.Parsers) + p := c.Create(testReader(test.lines)) i := 0 msg, err := p.Next() @@ -157,9 +153,7 @@ func TestJSONParsersWithFields(t *testing.T) { message: reader.Message{ Content: []byte("line 1"), }, - config: map[string]interface{}{ - "paths": []string{"dummy_path"}, - }, + config: map[string]interface{}{}, expectedMessage: reader.Message{ Content: []byte("line 1"), }, @@ -170,7 +164,6 @@ func TestJSONParsersWithFields(t *testing.T) { Fields: common.MapStr{}, }, config: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "ndjson": map[string]interface{}{ @@ -192,7 +185,6 @@ func TestJSONParsersWithFields(t *testing.T) { Fields: common.MapStr{}, }, config: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "ndjson": map[string]interface{}{ @@ -221,7 +213,6 @@ func TestJSONParsersWithFields(t *testing.T) { }, }, config: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "ndjson": map[string]interface{}{ @@ -244,12 +235,13 @@ func TestJSONParsersWithFields(t *testing.T) { for name, test := range tests { test := test t.Run(name, func(t *testing.T) { - cfg := defaultConfig() - common.MustNewConfigFrom(test.config).Unpack(&cfg) - p, err := newParsers(msgReader(test.message), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 64}, cfg.Reader.Parsers) - if err != nil { - t.Fatalf("failed to init parser: %+v", err) - } + cfg := common.MustNewConfigFrom(test.config) + var parsersConfig testParsersConfig + err := cfg.Unpack(&parsersConfig) + require.NoError(t, err) + c, err := NewConfig(nil, CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + require.NoError(t, err) + p := c.Create(msgReader(test.message)) msg, _ := p.Next() require.Equal(t, test.expectedMessage, msg) @@ -271,7 +263,6 @@ func TestContainerParser(t *testing.T) { {"log":"patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n","stream":"stdout","time":"2016-03-02T22:59:04.626534779Z"} `, parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "container": map[string]interface{}{}, @@ -309,7 +300,6 @@ func TestContainerParser(t *testing.T) { lines: `2017-09-12T22:32:21.212861448Z stdout F 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache `, parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "container": map[string]interface{}{ @@ -333,7 +323,6 @@ func TestContainerParser(t *testing.T) { {"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"} `, parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "container": map[string]interface{}{}, @@ -360,12 +349,13 @@ func TestContainerParser(t *testing.T) { for name, test := range tests { test := test t.Run(name, func(t *testing.T) { - cfg := defaultConfig() - parsersConfig := common.MustNewConfigFrom(test.parsers) - err := parsersConfig.Unpack(&cfg) + cfg := common.MustNewConfigFrom(test.parsers) + var parsersConfig testParsersConfig + err := cfg.Unpack(&parsersConfig) require.NoError(t, err) - - p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 1024}, cfg.Reader.Parsers) + c, err := NewConfig(nil, CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + require.NoError(t, err) + p := c.Create(testReader(test.lines)) i := 0 msg, err := p.Next() @@ -378,6 +368,11 @@ func TestContainerParser(t *testing.T) { }) } } + +type testParsersConfig struct { + Parsers []common.ConfigNamespace `struct:"parsers` +} + func testReader(lines string) reader.Reader { encF, _ := encoding.FindEncoding("") reader := strings.NewReader(lines) From 984e710223452c77356dc42b57965d8e476d6d92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Mon, 28 Jun 2021 20:13:58 +0200 Subject: [PATCH 2/5] remove allowed parsers --- filebeat/input/filestream/config.go | 2 +- filebeat/input/filestream/input.go | 1 - libbeat/parser/parser.go | 27 +++++---------------------- libbeat/parser/parser_test.go | 6 +++--- 4 files changed, 9 insertions(+), 27 deletions(-) diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 7aff3328e8f..8a2824528e3 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -137,7 +137,7 @@ func (c *config) Validate() error { return fmt.Errorf("no path is configured") } - if _, err := parser.NewConfig(nil, parser.CommonConfig{MaxBytes: c.Reader.MaxBytes, LineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil { + if _, err := parser.NewConfig(parser.CommonConfig{MaxBytes: c.Reader.MaxBytes, LineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil { return fmt.Errorf("cannot parse parser configuration: %+v", err) } diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index f5b5e73e363..50812d6ab27 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -95,7 +95,6 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) } parsers, err := parser.NewConfig( - nil, parser.CommonConfig{ MaxBytes: config.Reader.MaxBytes, LineTerminator: config.Reader.LineTerminator, diff --git a/libbeat/parser/parser.go b/libbeat/parser/parser.go index 770fc33832d..d84679a2a1e 100644 --- a/libbeat/parser/parser.go +++ b/libbeat/parser/parser.go @@ -46,17 +46,13 @@ type CommonConfig struct { } type Config struct { - pCfg CommonConfig - parsers []common.ConfigNamespace - allowedParsers []string + pCfg CommonConfig + parsers []common.ConfigNamespace } -func NewConfig(allowedParsers []string, pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) { +func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) { for _, ns := range parsers { name := ns.Name() - if !isParserAllowed(name, allowedParsers) { - return nil, fmt.Errorf("parser is not allowed: %s", name) - } switch name { case "multiline": var config multiline.Config @@ -85,25 +81,12 @@ func NewConfig(allowedParsers []string, pCfg CommonConfig, parsers []common.Conf } return &Config{ - pCfg: pCfg, - parsers: parsers, - allowedParsers: allowedParsers, + pCfg: pCfg, + parsers: parsers, }, nil } -func isParserAllowed(parser string, allowedParsers []string) bool { - if len(allowedParsers) == 0 { - return true - } - for _, p := range allowedParsers { - if p == parser { - return true - } - } - return false -} - func (c *Config) Create(in reader.Reader) Parser { p := in for _, ns := range c.parsers { diff --git a/libbeat/parser/parser_test.go b/libbeat/parser/parser_test.go index 058fdfc3d09..120d6abd7e1 100644 --- a/libbeat/parser/parser_test.go +++ b/libbeat/parser/parser_test.go @@ -122,7 +122,7 @@ func TestParsersConfigAndReading(t *testing.T) { var parsersConfig testParsersConfig err := cfg.Unpack(&parsersConfig) require.NoError(t, err) - c, err := NewConfig(nil, CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) if test.expectedError == "" { require.NoError(t, err) } else { @@ -239,7 +239,7 @@ func TestJSONParsersWithFields(t *testing.T) { var parsersConfig testParsersConfig err := cfg.Unpack(&parsersConfig) require.NoError(t, err) - c, err := NewConfig(nil, CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) require.NoError(t, err) p := c.Create(msgReader(test.message)) @@ -353,7 +353,7 @@ func TestContainerParser(t *testing.T) { var parsersConfig testParsersConfig err := cfg.Unpack(&parsersConfig) require.NoError(t, err) - c, err := NewConfig(nil, CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) require.NoError(t, err) p := c.Create(testReader(test.lines)) From f01f7977bdba30bc9a358b3b867741870a6571f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 29 Jun 2021 14:03:13 +0200 Subject: [PATCH 3/5] rename config to creator better reflect what it is --- filebeat/input/filestream/config.go | 4 +- filebeat/input/filestream/input.go | 10 +- libbeat/{ => reader}/parser/parser.go | 8 +- libbeat/reader/parser/parser_example_test.go | 102 +++++++++++++++++++ libbeat/{ => reader}/parser/parser_test.go | 6 +- 5 files changed, 116 insertions(+), 14 deletions(-) rename libbeat/{ => reader}/parser/parser.go (94%) create mode 100644 libbeat/reader/parser/parser_example_test.go rename libbeat/{ => reader}/parser/parser_test.go (96%) diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 8a2824528e3..3293b79b454 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -25,7 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/match" - "github.com/elastic/beats/v7/libbeat/parser" + "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" ) @@ -137,7 +137,7 @@ func (c *config) Validate() error { return fmt.Errorf("no path is configured") } - if _, err := parser.NewConfig(parser.CommonConfig{MaxBytes: c.Reader.MaxBytes, LineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil { + if _, err := parser.NewCreator(parser.CommonConfig{MaxBytes: c.Reader.MaxBytes, LineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil { return fmt.Errorf("cannot parse parser configuration: %+v", err) } diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 50812d6ab27..4084fab6aac 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -33,9 +33,9 @@ import ( "github.com/elastic/beats/v7/libbeat/common/match" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/libbeat/parser" "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/debug" + "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" ) @@ -58,7 +58,7 @@ type filestream struct { encodingFactory encoding.EncodingFactory encoding encoding.Encoding closerConfig closerConfig - parserConfig *parser.Config + parsers *parser.Creator } // Plugin creates a new filestream input plugin for creating a stateful input. @@ -94,7 +94,7 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding) } - parsers, err := parser.NewConfig( + parsers, err := parser.NewCreator( parser.CommonConfig{ MaxBytes: config.Reader.MaxBytes, LineTerminator: config.Reader.LineTerminator, @@ -109,7 +109,7 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) readerConfig: config.Reader, encodingFactory: encodingFactory, closerConfig: config.Close, - parserConfig: parsers, + parsers: parsers, } return prospector, filestream, nil @@ -232,7 +232,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo r = readfile.NewFilemeta(r, fs.newPath) - r = inp.parserConfig.Create(r) + r = inp.parsers.Create(r) r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes) diff --git a/libbeat/parser/parser.go b/libbeat/reader/parser/parser.go similarity index 94% rename from libbeat/parser/parser.go rename to libbeat/reader/parser/parser.go index d84679a2a1e..61dab276165 100644 --- a/libbeat/parser/parser.go +++ b/libbeat/reader/parser/parser.go @@ -45,12 +45,12 @@ type CommonConfig struct { LineTerminator readfile.LineTerminator } -type Config struct { +type Creator struct { pCfg CommonConfig parsers []common.ConfigNamespace } -func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) { +func NewCreator(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Creator, error) { for _, ns := range parsers { name := ns.Name() switch name { @@ -80,14 +80,14 @@ func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, er } } - return &Config{ + return &Creator{ pCfg: pCfg, parsers: parsers, }, nil } -func (c *Config) Create(in reader.Reader) Parser { +func (c *Creator) Create(in reader.Reader) Parser { p := in for _, ns := range c.parsers { name := ns.Name() diff --git a/libbeat/reader/parser/parser_example_test.go b/libbeat/reader/parser/parser_example_test.go new file mode 100644 index 00000000000..695091f8bd3 --- /dev/null +++ b/libbeat/reader/parser/parser_example_test.go @@ -0,0 +1,102 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package parser + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/reader/readfile" +) + +type inputParsersConfig struct { + MaxBytes int `config:"max_bytes"` + LineTerminator readfile.LineTerminator `config:"line_terminator"` + Parsers []common.ConfigNamespace `config:"parsers"` +} + +func TestParsersExampleInline(t *testing.T) { + tests := map[string]struct { + lines string + parsers map[string]interface{} + expectedMessages []string + }{ + "multiline docker logs parser": { + lines: `{"log":"[log] The following are log messages\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"[log] This one is\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":" on multiple\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":" lines","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"[log] In total there should be 3 events\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +`, + parsers: map[string]interface{}{ + "max_bytes": 1024, + "line_terminator": "auto", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "ndjson": map[string]interface{}{ + "keys_under_root": true, + "message_key": "log", + }, + }, + map[string]interface{}{ + "multiline": map[string]interface{}{ + "match": "after", + "negate": true, + "pattern": "^\\[log\\]", + }, + }, + }, + }, + expectedMessages: []string{ + "[log] The following are log messages\n", + "[log] This one is\n\n on multiple\n\n lines", + "[log] In total there should be 3 events\n", + }, + }, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + cfg := common.MustNewConfigFrom(test.parsers) + var c inputParsersConfig + err := cfg.Unpack(&c) + require.NoError(t, err) + + creator, err := NewCreator( + CommonConfig{ + MaxBytes: c.MaxBytes, + LineTerminator: c.LineTerminator, + }, + c.Parsers, + ) + require.NoError(t, err) + p := creator.Create(testReader(test.lines)) + + i := 0 + msg, err := p.Next() + for err == nil { + require.Equal(t, test.expectedMessages[i], string(msg.Content)) + i++ + msg, err = p.Next() + } + }) + } +} diff --git a/libbeat/parser/parser_test.go b/libbeat/reader/parser/parser_test.go similarity index 96% rename from libbeat/parser/parser_test.go rename to libbeat/reader/parser/parser_test.go index 120d6abd7e1..187ac077c29 100644 --- a/libbeat/parser/parser_test.go +++ b/libbeat/reader/parser/parser_test.go @@ -122,7 +122,7 @@ func TestParsersConfigAndReading(t *testing.T) { var parsersConfig testParsersConfig err := cfg.Unpack(&parsersConfig) require.NoError(t, err) - c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + c, err := NewCreator(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) if test.expectedError == "" { require.NoError(t, err) } else { @@ -239,7 +239,7 @@ func TestJSONParsersWithFields(t *testing.T) { var parsersConfig testParsersConfig err := cfg.Unpack(&parsersConfig) require.NoError(t, err) - c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + c, err := NewCreator(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) require.NoError(t, err) p := c.Create(msgReader(test.message)) @@ -353,7 +353,7 @@ func TestContainerParser(t *testing.T) { var parsersConfig testParsersConfig err := cfg.Unpack(&parsersConfig) require.NoError(t, err) - c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + c, err := NewCreator(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) require.NoError(t, err) p := c.Create(testReader(test.lines)) From 8394f290133dd9e8eaf9d557f7f11ae1ad7e3a56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 29 Jun 2021 14:52:49 +0200 Subject: [PATCH 4/5] fix unpack --- filebeat/input/filestream/config.go | 7 +--- filebeat/input/filestream/input.go | 15 +------- libbeat/reader/parser/parser.go | 39 +++++++++++++++++--- libbeat/reader/parser/parser_example_test.go | 16 ++------ libbeat/reader/parser/parser_test.go | 6 +-- 5 files changed, 43 insertions(+), 40 deletions(-) diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 3293b79b454..9020093ba4a 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -72,7 +72,7 @@ type readerConfig struct { MaxBytes int `config:"message_max_bytes" validate:"min=0,nonzero"` Tail bool `config:"seek_to_tail"` - Parsers []common.ConfigNamespace `config:"parsers"` + Parsers parser.Config `config:",inline"` } type backoffConfig struct { @@ -128,7 +128,6 @@ func defaultReaderConfig() readerConfig { LineTerminator: readfile.AutoLineTerminator, MaxBytes: 10 * humanize.MiByte, Tail: false, - Parsers: make([]common.ConfigNamespace, 0), } } @@ -137,9 +136,5 @@ func (c *config) Validate() error { return fmt.Errorf("no path is configured") } - if _, err := parser.NewCreator(parser.CommonConfig{MaxBytes: c.Reader.MaxBytes, LineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil { - return fmt.Errorf("cannot parse parser configuration: %+v", err) - } - return nil } diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 4084fab6aac..48d5f6dca77 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -58,7 +58,7 @@ type filestream struct { encodingFactory encoding.EncodingFactory encoding encoding.Encoding closerConfig closerConfig - parsers *parser.Creator + parsers parser.Config } // Plugin creates a new filestream input plugin for creating a stateful input. @@ -94,22 +94,11 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding) } - parsers, err := parser.NewCreator( - parser.CommonConfig{ - MaxBytes: config.Reader.MaxBytes, - LineTerminator: config.Reader.LineTerminator, - }, - config.Reader.Parsers, - ) - if err != nil { - return nil, nil, fmt.Errorf("cannot create parsers: %v", err) - } - filestream := &filestream{ readerConfig: config.Reader, encodingFactory: encodingFactory, closerConfig: config.Close, - parsers: parsers, + parsers: config.Reader.Parsers, } return prospector, filestream, nil diff --git a/libbeat/reader/parser/parser.go b/libbeat/reader/parser/parser.go index 61dab276165..fa01181c2aa 100644 --- a/libbeat/reader/parser/parser.go +++ b/libbeat/reader/parser/parser.go @@ -22,6 +22,8 @@ import ( "fmt" "io" + "github.com/dustin/go-humanize" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/multiline" @@ -41,16 +43,41 @@ type Parser interface { } type CommonConfig struct { - MaxBytes int - LineTerminator readfile.LineTerminator + MaxBytes int `config:"max_bytes"` + LineTerminator readfile.LineTerminator `config:"line_terminator"` } -type Creator struct { +type Config struct { pCfg CommonConfig parsers []common.ConfigNamespace } -func NewCreator(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Creator, error) { +func (c *Config) Unpack(cc *common.Config) error { + tmp := struct { + Common CommonConfig `config:",inline"` + Parsers []common.ConfigNamespace `config:"parsers"` + }{ + CommonConfig{ + MaxBytes: 10 * humanize.MiByte, + LineTerminator: readfile.AutoLineTerminator, + }, + nil, + } + err := cc.Unpack(&tmp) + if err != nil { + return err + } + + newC, err := NewConfig(tmp.Common, tmp.Parsers) + if err != nil { + return err + } + *c = *newC + + return nil +} + +func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) { for _, ns := range parsers { name := ns.Name() switch name { @@ -80,14 +107,14 @@ func NewCreator(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Creator, } } - return &Creator{ + return &Config{ pCfg: pCfg, parsers: parsers, }, nil } -func (c *Creator) Create(in reader.Reader) Parser { +func (c *Config) Create(in reader.Reader) Parser { p := in for _, ns := range c.parsers { name := ns.Name() diff --git a/libbeat/reader/parser/parser_example_test.go b/libbeat/reader/parser/parser_example_test.go index 695091f8bd3..ed8c12e2146 100644 --- a/libbeat/reader/parser/parser_example_test.go +++ b/libbeat/reader/parser/parser_example_test.go @@ -27,9 +27,9 @@ import ( ) type inputParsersConfig struct { - MaxBytes int `config:"max_bytes"` - LineTerminator readfile.LineTerminator `config:"line_terminator"` - Parsers []common.ConfigNamespace `config:"parsers"` + MaxBytes int `config:"max_bytes"` + LineTerminator readfile.LineTerminator `config:"line_terminator"` + Parsers Config `config:",inline"` } func TestParsersExampleInline(t *testing.T) { @@ -80,15 +80,7 @@ func TestParsersExampleInline(t *testing.T) { err := cfg.Unpack(&c) require.NoError(t, err) - creator, err := NewCreator( - CommonConfig{ - MaxBytes: c.MaxBytes, - LineTerminator: c.LineTerminator, - }, - c.Parsers, - ) - require.NoError(t, err) - p := creator.Create(testReader(test.lines)) + p := c.Parsers.Create(testReader(test.lines)) i := 0 msg, err := p.Next() diff --git a/libbeat/reader/parser/parser_test.go b/libbeat/reader/parser/parser_test.go index 187ac077c29..120d6abd7e1 100644 --- a/libbeat/reader/parser/parser_test.go +++ b/libbeat/reader/parser/parser_test.go @@ -122,7 +122,7 @@ func TestParsersConfigAndReading(t *testing.T) { var parsersConfig testParsersConfig err := cfg.Unpack(&parsersConfig) require.NoError(t, err) - c, err := NewCreator(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) if test.expectedError == "" { require.NoError(t, err) } else { @@ -239,7 +239,7 @@ func TestJSONParsersWithFields(t *testing.T) { var parsersConfig testParsersConfig err := cfg.Unpack(&parsersConfig) require.NoError(t, err) - c, err := NewCreator(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) require.NoError(t, err) p := c.Create(msgReader(test.message)) @@ -353,7 +353,7 @@ func TestContainerParser(t *testing.T) { var parsersConfig testParsersConfig err := cfg.Unpack(&parsersConfig) require.NoError(t, err) - c, err := NewCreator(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) require.NoError(t, err) p := c.Create(testReader(test.lines)) From eefea28451f917f5dc9651a71f282be14d8ac71f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 29 Jun 2021 15:22:02 +0200 Subject: [PATCH 5/5] fix test --- libbeat/reader/parser/parser_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/reader/parser/parser_test.go b/libbeat/reader/parser/parser_test.go index 120d6abd7e1..37eba5d15f9 100644 --- a/libbeat/reader/parser/parser_test.go +++ b/libbeat/reader/parser/parser_test.go @@ -370,7 +370,7 @@ func TestContainerParser(t *testing.T) { } type testParsersConfig struct { - Parsers []common.ConfigNamespace `struct:"parsers` + Parsers []common.ConfigNamespace `struct:"parsers"` } func testReader(lines string) reader.Reader {