From 952c65fc26732be429142d129f0d458ce75afe71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 29 Jun 2021 16:40:26 +0200 Subject: [PATCH] Move parsers outside of filestream input so others can use them as well (#26541) ## What does this PR do? The object has its own `Unpack` function, so it is enough for you to add it as an attribute to your configuration. ```golang config parser.Config ``` Then create the parser based on the configuration ```golang p = inp.parserConfig.Create(r) ``` Example configuration accepted by the code above ```yaml parsers: - multiline: type: count lines_count: 3 ``` (cherry picked from commit 2a56cd7fe026c5b59e76e47e397041f765125700) --- filebeat/input/filestream/config.go | 8 +- filebeat/input/filestream/input.go | 9 +- .../reader/parser}/parser.go | 81 +++++++++++----- libbeat/reader/parser/parser_example_test.go | 94 +++++++++++++++++++ .../reader/parser}/parser_test.go | 61 ++++++------ 5 files changed, 186 insertions(+), 67 deletions(-) rename {filebeat/input/filestream => libbeat/reader/parser}/parser.go (68%) create mode 100644 libbeat/reader/parser/parser_example_test.go rename {filebeat/input/filestream => libbeat/reader/parser}/parser_test.go (88%) diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 007da10a045..9020093ba4a 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/match" + "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" ) @@ -71,7 +72,7 @@ type readerConfig struct { MaxBytes int `config:"message_max_bytes" validate:"min=0,nonzero"` Tail bool `config:"seek_to_tail"` - Parsers []common.ConfigNamespace `config:"parsers"` + Parsers parser.Config `config:",inline"` } type backoffConfig struct { @@ -127,7 +128,6 @@ func defaultReaderConfig() readerConfig { LineTerminator: readfile.AutoLineTerminator, MaxBytes: 10 * humanize.MiByte, Tail: false, - Parsers: make([]common.ConfigNamespace, 0), } } @@ -136,9 +136,5 @@ func (c *config) Validate() error { return fmt.Errorf("no path is configured") } - if err := validateParserConfig(parserConfig{maxBytes: c.Reader.MaxBytes, lineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil { - return fmt.Errorf("cannot parse parser configuration: %+v", err) - } - return nil } diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index e143280e5b9..48d5f6dca77 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -35,6 +35,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/debug" + "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" ) @@ -57,7 +58,7 @@ type filestream struct { encodingFactory encoding.EncodingFactory encoding encoding.Encoding closerConfig closerConfig - parserConfig []common.ConfigNamespace + parsers parser.Config } // Plugin creates a new filestream input plugin for creating a stateful input. @@ -97,6 +98,7 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error) readerConfig: config.Reader, encodingFactory: encodingFactory, closerConfig: config.Close, + parsers: config.Reader.Parsers, } return prospector, filestream, nil @@ -219,10 +221,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo r = readfile.NewFilemeta(r, fs.newPath) - r, err = newParsers(r, parserConfig{maxBytes: inp.readerConfig.MaxBytes, lineTerminator: inp.readerConfig.LineTerminator}, inp.readerConfig.Parsers) - if err != nil { - return nil, err - } + r = inp.parsers.Create(r) r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes) diff --git a/filebeat/input/filestream/parser.go b/libbeat/reader/parser/parser.go similarity index 68% rename from filebeat/input/filestream/parser.go rename to libbeat/reader/parser/parser.go index c64b8981ae4..fa01181c2aa 100644 --- a/filebeat/input/filestream/parser.go +++ b/libbeat/reader/parser/parser.go @@ -15,13 +15,15 @@ // specific language governing permissions and limitations // under the License. -package filestream +package parser import ( "errors" "fmt" "io" + "github.com/dustin/go-humanize" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/multiline" @@ -35,20 +37,48 @@ var ( // parser transforms or translates the Content attribute of a Message. // They are able to aggregate two or more Messages into a single one. -type parser interface { +type Parser interface { io.Closer Next() (reader.Message, error) } -type parserConfig struct { - maxBytes int - lineTerminator readfile.LineTerminator +type CommonConfig struct { + MaxBytes int `config:"max_bytes"` + LineTerminator readfile.LineTerminator `config:"line_terminator"` } -func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) (parser, error) { - p := in +type Config struct { + pCfg CommonConfig + parsers []common.ConfigNamespace +} + +func (c *Config) Unpack(cc *common.Config) error { + tmp := struct { + Common CommonConfig `config:",inline"` + Parsers []common.ConfigNamespace `config:"parsers"` + }{ + CommonConfig{ + MaxBytes: 10 * humanize.MiByte, + LineTerminator: readfile.AutoLineTerminator, + }, + nil, + } + err := cc.Unpack(&tmp) + if err != nil { + return err + } - for _, ns := range c { + newC, err := NewConfig(tmp.Common, tmp.Parsers) + if err != nil { + return err + } + *c = *newC + + return nil +} + +func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) { + for _, ns := range parsers { name := ns.Name() switch name { case "multiline": @@ -58,10 +88,6 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) if err != nil { return nil, fmt.Errorf("error while parsing multiline parser config: %+v", err) } - p, err = multiline.New(p, "\n", pCfg.maxBytes, &config) - if err != nil { - return nil, fmt.Errorf("error while creating multiline parser: %+v", err) - } case "ndjson": var config readjson.ParserConfig cfg := ns.Config() @@ -69,7 +95,6 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) if err != nil { return nil, fmt.Errorf("error while parsing ndjson parser config: %+v", err) } - p = readjson.NewJSONParser(p, &config) case "container": config := readjson.DefaultContainerConfig() cfg := ns.Config() @@ -77,17 +102,21 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) if err != nil { return nil, fmt.Errorf("error while parsing container parser config: %+v", err) } - p = readjson.NewContainerParser(p, &config) default: return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name) } } - return p, nil + return &Config{ + pCfg: pCfg, + parsers: parsers, + }, nil + } -func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error { - for _, ns := range c { +func (c *Config) Create(in reader.Reader) Parser { + p := in + for _, ns := range c.parsers { name := ns.Name() switch name { case "multiline": @@ -95,26 +124,32 @@ func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error { cfg := ns.Config() err := cfg.Unpack(&config) if err != nil { - return fmt.Errorf("error while parsing multiline parser config: %+v", err) + return p + } + p, err = multiline.New(p, "\n", c.pCfg.MaxBytes, &config) + if err != nil { + return p } case "ndjson": - var config readjson.Config + var config readjson.ParserConfig cfg := ns.Config() err := cfg.Unpack(&config) if err != nil { - return fmt.Errorf("error while parsing ndjson parser config: %+v", err) + return p } + p = readjson.NewJSONParser(p, &config) case "container": config := readjson.DefaultContainerConfig() cfg := ns.Config() err := cfg.Unpack(&config) if err != nil { - return fmt.Errorf("error while parsing container parser config: %+v", err) + return p } + p = readjson.NewContainerParser(p, &config) default: - return fmt.Errorf("%s: %s", ErrNoSuchParser, name) + return p } } - return nil + return p } diff --git a/libbeat/reader/parser/parser_example_test.go b/libbeat/reader/parser/parser_example_test.go new file mode 100644 index 00000000000..ed8c12e2146 --- /dev/null +++ b/libbeat/reader/parser/parser_example_test.go @@ -0,0 +1,94 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package parser + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/reader/readfile" +) + +type inputParsersConfig struct { + MaxBytes int `config:"max_bytes"` + LineTerminator readfile.LineTerminator `config:"line_terminator"` + Parsers Config `config:",inline"` +} + +func TestParsersExampleInline(t *testing.T) { + tests := map[string]struct { + lines string + parsers map[string]interface{} + expectedMessages []string + }{ + "multiline docker logs parser": { + lines: `{"log":"[log] The following are log messages\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"[log] This one is\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":" on multiple\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":" lines","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"[log] In total there should be 3 events\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +`, + parsers: map[string]interface{}{ + "max_bytes": 1024, + "line_terminator": "auto", + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "ndjson": map[string]interface{}{ + "keys_under_root": true, + "message_key": "log", + }, + }, + map[string]interface{}{ + "multiline": map[string]interface{}{ + "match": "after", + "negate": true, + "pattern": "^\\[log\\]", + }, + }, + }, + }, + expectedMessages: []string{ + "[log] The following are log messages\n", + "[log] This one is\n\n on multiple\n\n lines", + "[log] In total there should be 3 events\n", + }, + }, + } + + for name, test := range tests { + test := test + t.Run(name, func(t *testing.T) { + cfg := common.MustNewConfigFrom(test.parsers) + var c inputParsersConfig + err := cfg.Unpack(&c) + require.NoError(t, err) + + p := c.Parsers.Create(testReader(test.lines)) + + i := 0 + msg, err := p.Next() + for err == nil { + require.Equal(t, test.expectedMessages[i], string(msg.Content)) + i++ + msg, err = p.Next() + } + }) + } +} diff --git a/filebeat/input/filestream/parser_test.go b/libbeat/reader/parser/parser_test.go similarity index 88% rename from filebeat/input/filestream/parser_test.go rename to libbeat/reader/parser/parser_test.go index 696729d1e31..37eba5d15f9 100644 --- a/filebeat/input/filestream/parser_test.go +++ b/libbeat/reader/parser/parser_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package filestream +package parser import ( "io" @@ -40,16 +40,13 @@ func TestParsersConfigAndReading(t *testing.T) { expectedError string }{ "no parser, no error": { - lines: "line 1\nline 2\n", - parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, - }, + lines: "line 1\nline 2\n", + parsers: map[string]interface{}{}, expectedMessages: []string{"line 1\n", "line 2\n"}, }, "correct multiline parser": { lines: "line 1.1\nline 1.2\nline 1.3\nline 2.1\nline 2.2\nline 2.3\n", parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "multiline": map[string]interface{}{ @@ -72,7 +69,6 @@ func TestParsersConfigAndReading(t *testing.T) { {"log":"[log] In total there should be 3 events\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} `, parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "ndjson": map[string]interface{}{ @@ -97,7 +93,6 @@ func TestParsersConfigAndReading(t *testing.T) { }, "non existent parser configuration": { parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "no_such_parser": nil, @@ -108,7 +103,6 @@ func TestParsersConfigAndReading(t *testing.T) { }, "invalid multiline parser configuration is caught before parser creation": { parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "multiline": map[string]interface{}{ @@ -124,9 +118,11 @@ func TestParsersConfigAndReading(t *testing.T) { for name, test := range tests { test := test t.Run(name, func(t *testing.T) { - cfg := defaultConfig() - parsersConfig := common.MustNewConfigFrom(test.parsers) - err := parsersConfig.Unpack(&cfg) + cfg := common.MustNewConfigFrom(test.parsers) + var parsersConfig testParsersConfig + err := cfg.Unpack(&parsersConfig) + require.NoError(t, err) + c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) if test.expectedError == "" { require.NoError(t, err) } else { @@ -134,7 +130,7 @@ func TestParsersConfigAndReading(t *testing.T) { return } - p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 64}, cfg.Reader.Parsers) + p := c.Create(testReader(test.lines)) i := 0 msg, err := p.Next() @@ -157,9 +153,7 @@ func TestJSONParsersWithFields(t *testing.T) { message: reader.Message{ Content: []byte("line 1"), }, - config: map[string]interface{}{ - "paths": []string{"dummy_path"}, - }, + config: map[string]interface{}{}, expectedMessage: reader.Message{ Content: []byte("line 1"), }, @@ -170,7 +164,6 @@ func TestJSONParsersWithFields(t *testing.T) { Fields: common.MapStr{}, }, config: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "ndjson": map[string]interface{}{ @@ -192,7 +185,6 @@ func TestJSONParsersWithFields(t *testing.T) { Fields: common.MapStr{}, }, config: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "ndjson": map[string]interface{}{ @@ -221,7 +213,6 @@ func TestJSONParsersWithFields(t *testing.T) { }, }, config: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "ndjson": map[string]interface{}{ @@ -244,12 +235,13 @@ func TestJSONParsersWithFields(t *testing.T) { for name, test := range tests { test := test t.Run(name, func(t *testing.T) { - cfg := defaultConfig() - common.MustNewConfigFrom(test.config).Unpack(&cfg) - p, err := newParsers(msgReader(test.message), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 64}, cfg.Reader.Parsers) - if err != nil { - t.Fatalf("failed to init parser: %+v", err) - } + cfg := common.MustNewConfigFrom(test.config) + var parsersConfig testParsersConfig + err := cfg.Unpack(&parsersConfig) + require.NoError(t, err) + c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + require.NoError(t, err) + p := c.Create(msgReader(test.message)) msg, _ := p.Next() require.Equal(t, test.expectedMessage, msg) @@ -271,7 +263,6 @@ func TestContainerParser(t *testing.T) { {"log":"patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n","stream":"stdout","time":"2016-03-02T22:59:04.626534779Z"} `, parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "container": map[string]interface{}{}, @@ -309,7 +300,6 @@ func TestContainerParser(t *testing.T) { lines: `2017-09-12T22:32:21.212861448Z stdout F 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache `, parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "container": map[string]interface{}{ @@ -333,7 +323,6 @@ func TestContainerParser(t *testing.T) { {"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"} `, parsers: map[string]interface{}{ - "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ "container": map[string]interface{}{}, @@ -360,12 +349,13 @@ func TestContainerParser(t *testing.T) { for name, test := range tests { test := test t.Run(name, func(t *testing.T) { - cfg := defaultConfig() - parsersConfig := common.MustNewConfigFrom(test.parsers) - err := parsersConfig.Unpack(&cfg) + cfg := common.MustNewConfigFrom(test.parsers) + var parsersConfig testParsersConfig + err := cfg.Unpack(&parsersConfig) require.NoError(t, err) - - p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 1024}, cfg.Reader.Parsers) + c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers) + require.NoError(t, err) + p := c.Create(testReader(test.lines)) i := 0 msg, err := p.Next() @@ -378,6 +368,11 @@ func TestContainerParser(t *testing.T) { }) } } + +type testParsersConfig struct { + Parsers []common.ConfigNamespace `struct:"parsers"` +} + func testReader(lines string) reader.Reader { encF, _ := encoding.FindEncoding("") reader := strings.NewReader(lines)