From 682a6056d12ae92e7e2d34dda3c6d6448906c823 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 25 Feb 2019 11:00:37 +0100 Subject: [PATCH 1/2] Remove ECS options in filebeat pipelines when not available Similar to #10875, but removing ecs flags when using pipelines on Elasticsearch versions below 6.7.0, that don't have them. --- CHANGELOG.next.asciidoc | 1 + filebeat/fileset/pipelines.go | 35 +++++++++ filebeat/fileset/pipelines_test.go | 122 +++++++++++++++++++++++++++++ 3 files changed, 158 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fcb54660ac0..b1f3b7f092e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -28,6 +28,7 @@ https://github.com/elastic/beats/compare/v6.6.0...6.x[Check the HEAD diff] - Filesets with multiple ingest pipelines added in {pull}8914[8914] only work with Elasticsearch >= 6.5.0 {pull}10001[10001] - Add grok pattern to support redis 5.0.3 log timestamp. {issue}9819[9819] {pull}10033[10033] - Ingesting Elasticsearch audit logs is only supported with Elasticsearch 6.5.0 and above {pull}8852[8852] +- Set `ecs: true` in user_agent processors when loading pipelines with Filebeat 7.0.x into Elasticsearch 6.7.x. {issue}10655[10655] {pull}10875[10875] *Heartbeat* diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 07e520d64f8..2073da4863a 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -121,6 +121,12 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return nil } } + + err := setECSProcessors(esClient.GetVersion(), pipelineID, content) + if err != nil { + return fmt.Errorf("failed to adapt pipeline for ECS compatibility: %v", err) + } + body, err := esClient.LoadJSON(path, content) if err != nil { return interpretError(err, body) @@ -129,6 +135,35 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return nil } +// setECSProcessors removes ECS-specific versions from processors in versions not supporting them +func setECSProcessors(esVersion common.Version, pipelineID string, content map[string]interface{}) error { + ecsFlagVersion := common.MustNewVersion("6.7.0") + if !esVersion.LessThan(ecsFlagVersion) { + return nil + } + + p, ok := content["processors"] + if !ok { + return nil + } + processors, ok := p.([]interface{}) + if !ok { + return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) + } + + for _, p := range processors { + processor, ok := p.(map[string]interface{}) + if !ok { + continue + } + if options, ok := processor["user_agent"].(map[string]interface{}); ok { + logp.Debug("modules", "Removing 'ecs' option in user_agent processor for field '%v' in pipeline '%s' as it is not supported in Elasticsearch %v", options["field"], pipelineID, esVersion) + delete(options, "ecs") + } + } + return nil +} + func deletePipeline(esClient PipelineLoader, pipelineID string) error { path := makeIngestPipelinePath(pipelineID) _, _, err := esClient.Request("DELETE", path, "", nil, nil) diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index 194df5e9f14..0e46bcb156a 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/outputs/elasticsearch" ) @@ -103,3 +104,124 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { }) } } + +func TestSetECSProcessors(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 6.7.0", + esVersion: common.MustNewVersion("6.6.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + "ecs": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ES == 6.7.0", + esVersion: common.MustNewVersion("6.7.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + "ecs": false, + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + "ecs": false, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES >= 7.0.0", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + "ecs": false, + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + "ecs": false, + }, + }, + }, + }, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := setECSProcessors(*test.esVersion, "foo-pipeline", test.content) + if test.isErrExpected { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, test.content) + } + }) + } +} From 4203078b9aa54840223214d039fb50f73c045c92 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 21 Mar 2019 10:07:25 +0100 Subject: [PATCH 2/2] Fix changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b1f3b7f092e..7510d7861f8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -28,7 +28,7 @@ https://github.com/elastic/beats/compare/v6.6.0...6.x[Check the HEAD diff] - Filesets with multiple ingest pipelines added in {pull}8914[8914] only work with Elasticsearch >= 6.5.0 {pull}10001[10001] - Add grok pattern to support redis 5.0.3 log timestamp. {issue}9819[9819] {pull}10033[10033] - Ingesting Elasticsearch audit logs is only supported with Elasticsearch 6.5.0 and above {pull}8852[8852] -- Set `ecs: true` in user_agent processors when loading pipelines with Filebeat 7.0.x into Elasticsearch 6.7.x. {issue}10655[10655] {pull}10875[10875] +- Remove `ecs` option from user_agent processors when loading pipelines with Filebeat 6.7.x into Elasticsearch < 6.7.0. {issue}10655[10655] {pull}11362[11362] *Heartbeat*