Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move handling of json fields to harvester #4159

Merged
merged 2 commits into from
May 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,15 @@ func (h *Harvester) open() error {
func (h *Harvester) forwardEvent(event *input.Event) error {

// Add additional prospector meta data to the event
event.EventMetadata = h.config.EventMetadata
event.InputType = h.config.InputType
event.DocumentType = h.config.DocumentType
event.JSONConfig = h.config.JSON
event.Pipeline = h.config.Pipeline
event.Module = h.config.Module
event.Fileset = h.config.Fileset

if event.Data != nil {
event.Data[common.EventMetadataKey] = h.config.EventMetadata
}

eventHolder := event.GetData()
//run the filters before sending to spooler
if event.Bytes > 0 {
Expand Down
147 changes: 147 additions & 0 deletions filebeat/harvester/json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package harvester

import (
"testing"
"time"

"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/libbeat/common"
"github.com/stretchr/testify/assert"
)

func TestAddJSONFields(t *testing.T) {
type io struct {
Data common.MapStr
Text *string
JSONConfig reader.JSONConfig
ExpectedItems common.MapStr
}

text := "hello"

now := time.Now()

tests := []io{
{
// by default, don't overwrite keys
Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": "test", "text": "hello"}},
Text: &text,
JSONConfig: reader.JSONConfig{KeysUnderRoot: true},
ExpectedItems: common.MapStr{
"type": "test_type",
"text": "hello",
},
},
{
// overwrite keys if asked
Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": "test", "text": "hello"}},
Text: &text,
JSONConfig: reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
ExpectedItems: common.MapStr{
"type": "test",
"text": "hello",
},
},
{
// without keys_under_root, put everything in a json key
Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": "test", "text": "hello"}},
Text: &text,
JSONConfig: reader.JSONConfig{},
ExpectedItems: common.MapStr{
"json": common.MapStr{"type": "test", "text": "hello"},
},
},
{
// when MessageKey is defined, the Text overwrites the value of that key
Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": "test", "text": "hi"}},
Text: &text,
JSONConfig: reader.JSONConfig{MessageKey: "text"},
ExpectedItems: common.MapStr{
"json": common.MapStr{"type": "test", "text": "hello"},
"type": "test_type",
},
},
{
// when @timestamp is in JSON and overwrite_keys is true, parse it
// in a common.Time
Data: common.MapStr{"@timestamp": now, "type": "test_type", "json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}},
Text: &text,
JSONConfig: reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
ExpectedItems: common.MapStr{
"@timestamp": common.MustParseTime("2016-04-05T18:47:18.444Z"),
"type": "test",
},
},
{
// when the parsing on @timestamp fails, leave the existing value and add an error key
// in a common.Time
Data: common.MapStr{"@timestamp": common.Time(now), "type": "test_type", "json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}},
Text: &text,
JSONConfig: reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
ExpectedItems: common.MapStr{
"@timestamp": common.Time(now),
"type": "test",
"json_error": "@timestamp not overwritten (parse error on 2016-04-05T18:47:18.44XX4Z)",
},
},
{
// when the @timestamp has the wrong type, leave the existing value and add an error key
// in a common.Time
Data: common.MapStr{"@timestamp": common.Time(now), "type": "test_type", "json": common.MapStr{"type": "test", "@timestamp": 42}},
Text: &text,
JSONConfig: reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
ExpectedItems: common.MapStr{
"@timestamp": common.Time(now),
"type": "test",
"json_error": "@timestamp not overwritten (not string)",
},
},
{
// if overwrite_keys is true, but the `type` key in json is not a string, ignore it
Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": 42}},
Text: &text,
JSONConfig: reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
ExpectedItems: common.MapStr{
"type": "test_type",
"json_error": "type not overwritten (not string)",
},
},
{
// if overwrite_keys is true, but the `type` key in json is empty, ignore it
Data: common.MapStr{"type": "test_type", "json": common.MapStr{"type": ""}},
Text: &text,
JSONConfig: reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
ExpectedItems: common.MapStr{
"type": "test_type",
"json_error": "type not overwritten (invalid value [])",
},
},
{
// if overwrite_keys is true, but the `type` key in json starts with _, ignore it
Data: common.MapStr{"@timestamp": common.Time(now), "type": "test_type", "json": common.MapStr{"type": "_type"}},
Text: &text,
JSONConfig: reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
ExpectedItems: common.MapStr{
"type": "test_type",
"json_error": "type not overwritten (invalid value [_type])",
},
},
}

for _, test := range tests {
h := Harvester{}
h.config.JSON = &test.JSONConfig

var jsonFields common.MapStr
if fields, ok := test.Data["json"]; ok {
jsonFields = fields.(common.MapStr)
}

h.mergeJSONFields(test.Data, jsonFields, test.Text)

t.Log("Executing test:", test)
for k, v := range test.ExpectedItems {
assert.Equal(t, v, test.Data[k])
}
}
}
48 changes: 45 additions & 3 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/elastic/beats/filebeat/harvester/source"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"

Expand Down Expand Up @@ -138,10 +140,31 @@ func (h *Harvester) Harvest(r reader.Reader) {

// Check if data should be added to event. Only export non empty events.
if !message.IsEmpty() && h.shouldExportLine(text) {
event.ReadTime = message.Ts
event.Bytes = message.Bytes
event.Text = &text
event.Data = message.Fields

event.Data = common.MapStr{
"@timestamp": common.Time(message.Ts),
"source": h.state.Source,
"offset": h.state.Offset, // Offset here is the offset before the starting char.
"type": h.config.DocumentType,
"input_type": h.config.InputType,
}
event.Data.DeepUpdate(message.Fields)

// Check if json fields exist
var jsonFields common.MapStr
if fields, ok := event.Data["json"]; ok {
jsonFields = fields.(common.MapStr)
}

if h.config.JSON != nil && len(jsonFields) > 0 {
h.mergeJSONFields(event.Data, jsonFields, &text)
} else if &text != nil {
if event.Data == nil {
event.Data = common.MapStr{}
}
event.Data["message"] = text
}
}

// Always send event to update state, also if lines was skipped
Expand Down Expand Up @@ -383,3 +406,22 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {

return reader.NewLimit(r, h.config.MaxBytes), nil
}

// mergeJSONFields writes the JSON fields in the event map,
// respecting the KeysUnderRoot and OverwriteKeys configuration options.
// If MessageKey is defined, the Text value from the event always
// takes precedence.
func (h *Harvester) mergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string) {

// The message key might have been modified by multiline
if len(h.config.JSON.MessageKey) > 0 && text != nil {
jsonFields[h.config.JSON.MessageKey] = *text
}

if h.config.JSON.KeysUnderRoot {
// Delete existing json key
delete(data, "json")

jsontransform.WriteJSONKeys(data, jsonFields, h.config.JSON.OverwriteKeys, reader.JsonErrorKey)
}
}
83 changes: 16 additions & 67 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,23 @@
package input

import (
"time"

"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/jsontransform"
)

// Event is sent to the output and must contain all relevant information
type Event struct {
EventMeta
Text *string
JSONConfig *reader.JSONConfig
Data common.MapStr // Use in readers to add data to the event

Data common.MapStr // Use in readers to add data to the event
}

type EventMeta struct {
common.EventMetadata
Pipeline string
Fileset string
Module string
InputType string
DocumentType string
ReadTime time.Time
Bytes int
State file.State
Pipeline string
Fileset string
Module string
InputType string
Bytes int
State file.State
}

type Data struct {
Expand All @@ -45,13 +35,10 @@ func NewEvent(state file.State) *Event {
}

func (e *Event) ToMapStr() common.MapStr {
event := common.MapStr{
common.EventMetadataKey: e.EventMetadata,
"@timestamp": common.Time(e.ReadTime),
"source": e.State.Source,
"offset": e.State.Offset, // Offset here is the offset before the starting char.
"type": e.DocumentType,
"input_type": e.InputType,

event := e.Data
if event == nil {
event = common.MapStr{}
}

if e.Fileset != "" && e.Module != "" {
Expand All @@ -61,37 +48,18 @@ func (e *Event) ToMapStr() common.MapStr {
}
}

// Add data fields which are added by the readers
for key, value := range e.Data {
event[key] = value
}

// Check if json fields exist
var jsonFields common.MapStr
if fields, ok := event["json"]; ok {
jsonFields = fields.(common.MapStr)
}

if e.JSONConfig != nil && len(jsonFields) > 0 {
mergeJSONFields(e, event, jsonFields)
} else if e.Text != nil {
event["message"] = *e.Text
}

return event
}

func (e *Event) GetData() Data {
return Data{
Event: e.ToMapStr(),
Metadata: EventMeta{
Pipeline: e.Pipeline,
Bytes: e.Bytes,
State: e.State,
Fileset: e.Fileset,
Module: e.Module,
ReadTime: e.ReadTime,
EventMetadata: e.EventMetadata,
Pipeline: e.Pipeline,
Bytes: e.Bytes,
State: e.State,
Fileset: e.Fileset,
Module: e.Module,
},
}
}
Expand All @@ -112,22 +80,3 @@ func (eh *Data) GetMetadata() common.MapStr {
func (eh *Data) HasData() bool {
return eh.Metadata.Bytes > 0
}

// mergeJSONFields writes the JSON fields in the event map,
// respecting the KeysUnderRoot and OverwriteKeys configuration options.
// If MessageKey is defined, the Text value from the event always
// takes precedence.
func mergeJSONFields(e *Event, event common.MapStr, jsonFields common.MapStr) {

// The message key might have been modified by multiline
if len(e.JSONConfig.MessageKey) > 0 && e.Text != nil {
jsonFields[e.JSONConfig.MessageKey] = *e.Text
}

if e.JSONConfig.KeysUnderRoot {
// Delete existing json key
delete(event, "json")

jsontransform.WriteJSONKeys(event, jsonFields, e.JSONConfig.OverwriteKeys, reader.JsonErrorKey)
}
}
Loading