Skip to content

Commit

Permalink
Force ECS options in filebeat pipelines when filebeat version is over…
Browse files Browse the repository at this point in the history
… 7.0 and ES version is 6.7.X (elastic#10875)

When using the `user_agent` processor to ingest data from Filebeat 7.0
into Elasticsearch 6.X conflicts appear with ECS user_agent fields, this can
be solved by setting `ecs: true` when pipelines are being loaded into
Elasticsearch 6.7.0.

For minor versions where `ecs` option is not available, pipelines will fail
to load.

Fix elastic#10655

(cherry picked from commit 34eaf57)
  • Loading branch information
jsoriano committed Feb 25, 2019
1 parent 76e61da commit ef7742f
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di

*Filebeat*

- Set `ecs: true` in user_agent processors when loading pipelines with Filebeat 7.0.x into Elasticsearch 6.7.x. {issue}10655[10655] {pull}10875[10875]

*Heartbeat*

*Journalbeat*
Expand Down
40 changes: 40 additions & 0 deletions filebeat/fileset/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string
return nil
}
}

err := setECSProcessors(esClient.GetVersion(), pipelineID, content)
if err != nil {
return fmt.Errorf("failed to adapt pipeline for ECS compatibility: %v", err)
}

body, err := esClient.LoadJSON(path, content)
if err != nil {
return interpretError(err, body)
Expand All @@ -129,6 +135,40 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string
return nil
}

// setECSProcessors sets required ECS options in processors when filebeat version is >= 7.0.0
// and ES is 6.7.X to ease migration to ECS.
func setECSProcessors(esVersion common.Version, pipelineID string, content map[string]interface{}) error {
ecsVersion := common.MustNewVersion("7.0.0")
if !esVersion.LessThan(ecsVersion) {
return nil
}

p, ok := content["processors"]
if !ok {
return nil
}
processors, ok := p.([]interface{})
if !ok {
return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p)
}

minUserAgentVersion := common.MustNewVersion("6.7.0")
for _, p := range processors {
processor, ok := p.(map[string]interface{})
if !ok {
continue
}
if options, ok := processor["user_agent"].(map[string]interface{}); ok {
if esVersion.LessThan(minUserAgentVersion) {
return fmt.Errorf("user_agent processor requires option 'ecs: true', but Elasticsearch %v does not support this option (Elasticsearch %v or newer is required)", esVersion, minUserAgentVersion)
}
logp.Debug("modules", "Setting 'ecs: true' option in user_agent processor for field '%v' in pipeline '%s'", options["field"], pipelineID)
options["ecs"] = true
}
}
return nil
}

func deletePipeline(esClient PipelineLoader, pipelineID string) error {
path := makeIngestPipelinePath(pipelineID)
_, _, err := esClient.Request("DELETE", path, "", nil, nil)
Expand Down
110 changes: 110 additions & 0 deletions filebeat/fileset/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
)

Expand Down Expand Up @@ -103,3 +104,112 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) {
})
}
}

func TestSetEcsProcessors(t *testing.T) {
cases := []struct {
name string
esVersion *common.Version
content map[string]interface{}
expected map[string]interface{}
isErrExpected bool
}{
{
name: "ES < 6.7.0",
esVersion: common.MustNewVersion("6.6.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"user_agent": map[string]interface{}{
"field": "foo.http_user_agent",
},
},
}},
isErrExpected: true,
},
{
name: "ES == 6.7.0",
esVersion: common.MustNewVersion("6.7.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"rename": map[string]interface{}{
"field": "foo.src_ip",
"target_field": "source.ip",
},
},
map[string]interface{}{
"user_agent": map[string]interface{}{
"field": "foo.http_user_agent",
},
},
},
},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"rename": map[string]interface{}{
"field": "foo.src_ip",
"target_field": "source.ip",
},
},
map[string]interface{}{
"user_agent": map[string]interface{}{
"field": "foo.http_user_agent",
"ecs": true,
},
},
},
},
isErrExpected: false,
},
{
name: "ES >= 7.0.0",
esVersion: common.MustNewVersion("7.0.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"rename": map[string]interface{}{
"field": "foo.src_ip",
"target_field": "source.ip",
},
},
map[string]interface{}{
"user_agent": map[string]interface{}{
"field": "foo.http_user_agent",
},
},
},
},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"rename": map[string]interface{}{
"field": "foo.src_ip",
"target_field": "source.ip",
},
},
map[string]interface{}{
"user_agent": map[string]interface{}{
"field": "foo.http_user_agent",
},
},
},
},
isErrExpected: false,
},
}

for _, test := range cases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := setECSProcessors(*test.esVersion, "foo-pipeline", test.content)
if test.isErrExpected {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expected, test.content)
}
})
}
}

0 comments on commit ef7742f

Please sign in to comment.