Skip to content

Commit

Permalink
[Filebeat] Fix s3 input parsing json file without expand_event_list_f…
Browse files Browse the repository at this point in the history
…rom_field (elastic#19962)

* Fix s3 input parsing json file without expand_event_list_from_field
  • Loading branch information
kaiyan-sheng authored Jul 22, 2020
1 parent 5445670 commit 9cf6b12
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix S3 input to trim delimiter /n from each log line. {pull}19972[19972]
- Ignore missing in Zeek module when dropping unecessary fields. {pull}19984[19984]
- Fix Filebeat OOMs on very long lines {issue}19500[19500], {pull}19552[19552]
- Fix s3 input parsing json file without expand_event_list_from_field. {issue}19902[19902] {pull}19962[19962]

*Heartbeat*

Expand Down
79 changes: 40 additions & 39 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,17 +455,10 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
gzipReader.Close()
}

// Check if expand_event_list_from_field is given with document content-type = "application/json"
if resp.ContentType != nil && *resp.ContentType == "application/json" && p.config.ExpandEventListFromField == "" {
err := errors.New("expand_event_list_from_field parameter is missing in config for application/json content-type file")
p.logger.Error(err)
return err
}

// Decode JSON documents when expand_event_list_from_field is given in config
if p.config.ExpandEventListFromField != "" {
// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
if resp.ContentType != nil && *resp.ContentType == "application/json" || p.config.ExpandEventListFromField != "" {
decoder := json.NewDecoder(reader)
err := p.decodeJSONWithKey(decoder, objectHash, info, s3Ctx)
err := p.decodeJSON(decoder, objectHash, info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "decodeJSONWithKey failed for '%s' from S3 bucket '%s'", info.key, info.name)
p.logger.Error(err)
Expand Down Expand Up @@ -512,59 +505,67 @@ func (p *s3Input) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info, s3C
return nil
}

func (p *s3Input) decodeJSONWithKey(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
func (p *s3Input) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
offset := 0
for {
var jsonFields map[string][]interface{}
var jsonFields interface{}
err := decoder.Decode(&jsonFields)
if jsonFields == nil {
return nil
}

if err == io.EOF {
// create event for last line
// get logs from expand_event_list_from_field
textValues, ok := jsonFields[p.config.ExpandEventListFromField]
if !ok {
err = errors.Wrapf(err, "key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
offset, err = p.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx)
if err != nil {
return err
}

for _, v := range textValues {
err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name)
p.logger.Error(err)
return err
}
}
} else if err != nil {
// decode json failed, skip this log file
err = errors.Wrapf(err, "decode json failed for '%s' from S3 bucket '%s', skipping this file", s3Info.key, s3Info.name)
p.logger.Warn(err)
return nil
}

textValues, ok := jsonFields[p.config.ExpandEventListFromField]
if !ok {
err = errors.Wrapf(err, "Key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
offset, err = p.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx)
if err != nil {
return err
}
}
}

for _, v := range textValues {
err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "Key '%s' not found", p.config.ExpandEventListFromField)
func (p *s3Input) jsonFieldsType(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) {
switch f := jsonFields.(type) {
case map[string][]interface{}:
if p.config.ExpandEventListFromField != "" {
textValues, ok := f[p.config.ExpandEventListFromField]
if !ok {
err := errors.Errorf("key '%s' not found", p.config.ExpandEventListFromField)
p.logger.Error(err)
return err
return offset, err
}
for _, v := range textValues {
offset, err := p.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name)
p.logger.Error(err)
return offset, err
}
}
return offset, nil
}
case map[string]interface{}:
offset, err := p.convertJSONToEvent(f, offset, objectHash, s3Info, s3Ctx)
if err != nil {
err = errors.Wrapf(err, "convertJSONToEvent failed for '%s' from S3 bucket '%s'", s3Info.key, s3Info.name)
p.logger.Error(err)
return offset, err
}
return offset, nil
}
return offset, nil
}

func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int, error) {
vJSON, err := json.Marshal(jsonFields)
logOriginal := string(vJSON)
log := trimLogDelimiter(logOriginal)
Expand All @@ -575,9 +576,9 @@ func (p *s3Input) convertJSONToEvent(jsonFields interface{}, offset int, objectH
if err != nil {
err = errors.Wrap(err, "forwardEvent failed")
p.logger.Error(err)
return err
return offset, err
}
return nil
return offset, nil
}

func (p *s3Input) forwardEvent(event beat.Event) error {
Expand Down

0 comments on commit 9cf6b12

Please sign in to comment.