Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move parsers outside of filestream input so others can use them as well #26541

Merged
merged 5 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/parser"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
)

Expand Down Expand Up @@ -136,7 +137,7 @@ func (c *config) Validate() error {
return fmt.Errorf("no path is configured")
}

if err := validateParserConfig(parserConfig{maxBytes: c.Reader.MaxBytes, lineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil {
if _, err := parser.NewConfig(parser.CommonConfig{MaxBytes: c.Reader.MaxBytes, LineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil {
return fmt.Errorf("cannot parse parser configuration: %+v", err)
}

Expand Down
20 changes: 15 additions & 5 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/parser"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/debug"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
Expand All @@ -57,7 +58,7 @@ type filestream struct {
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
closerConfig closerConfig
parserConfig []common.ConfigNamespace
parserConfig *parser.Config
}

// Plugin creates a new filestream input plugin for creating a stateful input.
Expand Down Expand Up @@ -93,10 +94,22 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding)
}

parsers, err := parser.NewConfig(
parser.CommonConfig{
MaxBytes: config.Reader.MaxBytes,
LineTerminator: config.Reader.LineTerminator,
},
config.Reader.Parsers,
)
if err != nil {
return nil, nil, fmt.Errorf("cannot create parsers: %v", err)
}

filestream := &filestream{
readerConfig: config.Reader,
encodingFactory: encodingFactory,
closerConfig: config.Close,
parserConfig: parsers,
}

return prospector, filestream, nil
Expand Down Expand Up @@ -219,10 +232,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo

r = readfile.NewFilemeta(r, fs.newPath)

r, err = newParsers(r, parserConfig{maxBytes: inp.readerConfig.MaxBytes, lineTerminator: inp.readerConfig.LineTerminator}, inp.readerConfig.Parsers)
if err != nil {
return nil, err
}
r = inp.parserConfig.Create(r)

r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)

Expand Down
54 changes: 31 additions & 23 deletions filebeat/input/filestream/parser.go → libbeat/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package filestream
package parser
kvch marked this conversation as resolved.
Show resolved Hide resolved

import (
"errors"
Expand All @@ -35,20 +35,23 @@ var (

// parser transforms or translates the Content attribute of a Message.
// They are able to aggregate two or more Messages into a single one.
type parser interface {
type Parser interface {
io.Closer
Next() (reader.Message, error)
}

type parserConfig struct {
maxBytes int
lineTerminator readfile.LineTerminator
type CommonConfig struct {
MaxBytes int
LineTerminator readfile.LineTerminator
}

func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) (parser, error) {
p := in
type Config struct {
pCfg CommonConfig
parsers []common.ConfigNamespace
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe consider to implement Unpack on the config. Then it can be used directly with go-ucfg unpack like this (we have a similar pattern for TLS and I just introduced one for http):

type myConfig struct {

  Parser parser.Config `config:",inline"`
}

with

func (c *Config) Unpack(cfg *common.Config) error {
  tmp := struct {
    Common CommonConfig `config:",inline"`
    Parsers []common.ConfigNamespace `config:"parser"`
  }{}

  if err := cfg.Unpack(&tmp); err != nil {
    return err
  }

  newC, err := NewConfig(tmp.Common, tmp.Parsers)
  if err != nil {
    return err
  }

  *c = *newC
  return nil
}

That way if we have an error, the full name of the setting that failed to parse will be pointed out.

Plus users can use it like:

var settings myConfig
if err := cfg.Unpack(&settings); err != nil {
  return err
}

reader := ...
contentsReader := settings.Parser.Create(reader)


for _, ns := range c {
func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) {
for _, ns := range parsers {
name := ns.Name()
switch name {
case "multiline":
Expand All @@ -58,63 +61,68 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace)
if err != nil {
return nil, fmt.Errorf("error while parsing multiline parser config: %+v", err)
}
p, err = multiline.New(p, "\n", pCfg.maxBytes, &config)
if err != nil {
return nil, fmt.Errorf("error while creating multiline parser: %+v", err)
}
case "ndjson":
var config readjson.ParserConfig
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("error while parsing ndjson parser config: %+v", err)
}
p = readjson.NewJSONParser(p, &config)
case "container":
config := readjson.DefaultContainerConfig()
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("error while parsing container parser config: %+v", err)
}
p = readjson.NewContainerParser(p, &config)
default:
return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name)
}
}

return p, nil
return &Config{
pCfg: pCfg,
parsers: parsers,
}, nil

}

func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error {
for _, ns := range c {
func (c *Config) Create(in reader.Reader) Parser {
p := in
for _, ns := range c.parsers {
name := ns.Name()
switch name {
case "multiline":
var config multiline.Config
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return fmt.Errorf("error while parsing multiline parser config: %+v", err)
return p
}
p, err = multiline.New(p, "\n", c.pCfg.MaxBytes, &config)
if err != nil {
return p
}
case "ndjson":
var config readjson.Config
var config readjson.ParserConfig
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return fmt.Errorf("error while parsing ndjson parser config: %+v", err)
return p
}
p = readjson.NewJSONParser(p, &config)
case "container":
config := readjson.DefaultContainerConfig()
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return fmt.Errorf("error while parsing container parser config: %+v", err)
return p
}
p = readjson.NewContainerParser(p, &config)
default:
return fmt.Errorf("%s: %s", ErrNoSuchParser, name)
return p
}
}

return nil
return p
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package filestream
package parser

import (
"io"
Expand All @@ -40,16 +40,13 @@ func TestParsersConfigAndReading(t *testing.T) {
expectedError string
}{
"no parser, no error": {
lines: "line 1\nline 2\n",
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
},
lines: "line 1\nline 2\n",
parsers: map[string]interface{}{},
expectedMessages: []string{"line 1\n", "line 2\n"},
},
"correct multiline parser": {
lines: "line 1.1\nline 1.2\nline 1.3\nline 2.1\nline 2.2\nline 2.3\n",
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"multiline": map[string]interface{}{
Expand All @@ -72,7 +69,6 @@ func TestParsersConfigAndReading(t *testing.T) {
{"log":"[log] In total there should be 3 events\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
`,
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"ndjson": map[string]interface{}{
Expand All @@ -97,7 +93,6 @@ func TestParsersConfigAndReading(t *testing.T) {
},
"non existent parser configuration": {
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"no_such_parser": nil,
Expand All @@ -108,7 +103,6 @@ func TestParsersConfigAndReading(t *testing.T) {
},
"invalid multiline parser configuration is caught before parser creation": {
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"multiline": map[string]interface{}{
Expand All @@ -124,17 +118,19 @@ func TestParsersConfigAndReading(t *testing.T) {
for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
cfg := defaultConfig()
parsersConfig := common.MustNewConfigFrom(test.parsers)
err := parsersConfig.Unpack(&cfg)
cfg := common.MustNewConfigFrom(test.parsers)
var parsersConfig testParsersConfig
err := cfg.Unpack(&parsersConfig)
require.NoError(t, err)
c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
if test.expectedError == "" {
require.NoError(t, err)
} else {
require.Contains(t, err.Error(), test.expectedError)
return
}

p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 64}, cfg.Reader.Parsers)
p := c.Create(testReader(test.lines))

i := 0
msg, err := p.Next()
Expand All @@ -157,9 +153,7 @@ func TestJSONParsersWithFields(t *testing.T) {
message: reader.Message{
Content: []byte("line 1"),
},
config: map[string]interface{}{
"paths": []string{"dummy_path"},
},
config: map[string]interface{}{},
expectedMessage: reader.Message{
Content: []byte("line 1"),
},
Expand All @@ -170,7 +164,6 @@ func TestJSONParsersWithFields(t *testing.T) {
Fields: common.MapStr{},
},
config: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"ndjson": map[string]interface{}{
Expand All @@ -192,7 +185,6 @@ func TestJSONParsersWithFields(t *testing.T) {
Fields: common.MapStr{},
},
config: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"ndjson": map[string]interface{}{
Expand Down Expand Up @@ -221,7 +213,6 @@ func TestJSONParsersWithFields(t *testing.T) {
},
},
config: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"ndjson": map[string]interface{}{
Expand All @@ -244,12 +235,13 @@ func TestJSONParsersWithFields(t *testing.T) {
for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
cfg := defaultConfig()
common.MustNewConfigFrom(test.config).Unpack(&cfg)
p, err := newParsers(msgReader(test.message), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 64}, cfg.Reader.Parsers)
if err != nil {
t.Fatalf("failed to init parser: %+v", err)
}
cfg := common.MustNewConfigFrom(test.config)
var parsersConfig testParsersConfig
err := cfg.Unpack(&parsersConfig)
require.NoError(t, err)
c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
require.NoError(t, err)
p := c.Create(msgReader(test.message))

msg, _ := p.Next()
require.Equal(t, test.expectedMessage, msg)
Expand All @@ -271,7 +263,6 @@ func TestContainerParser(t *testing.T) {
{"log":"patching file vendor/github.com/tsg/gopacket/pcap/pcap.go\n","stream":"stdout","time":"2016-03-02T22:59:04.626534779Z"}
`,
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"container": map[string]interface{}{},
Expand Down Expand Up @@ -309,7 +300,6 @@ func TestContainerParser(t *testing.T) {
lines: `2017-09-12T22:32:21.212861448Z stdout F 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache
`,
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"container": map[string]interface{}{
Expand All @@ -333,7 +323,6 @@ func TestContainerParser(t *testing.T) {
{"log":"Execute /scripts/packetbeat_before_build.sh\n","stream":"stdout","time":"2016-03-02T22:59:04.617434682Z"}
`,
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"container": map[string]interface{}{},
Expand All @@ -360,12 +349,13 @@ func TestContainerParser(t *testing.T) {
for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
cfg := defaultConfig()
parsersConfig := common.MustNewConfigFrom(test.parsers)
err := parsersConfig.Unpack(&cfg)
cfg := common.MustNewConfigFrom(test.parsers)
var parsersConfig testParsersConfig
err := cfg.Unpack(&parsersConfig)
require.NoError(t, err)

p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 1024}, cfg.Reader.Parsers)
c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
require.NoError(t, err)
p := c.Create(testReader(test.lines))

i := 0
msg, err := p.Next()
Expand All @@ -378,6 +368,11 @@ func TestContainerParser(t *testing.T) {
})
}
}

type testParsersConfig struct {
Parsers []common.ConfigNamespace `struct:"parsers`
}

func testReader(lines string) reader.Reader {
encF, _ := encoding.FindEncoding("")
reader := strings.NewReader(lines)
Expand Down