diff --git a/filebeat/Jenkinsfile.yml b/filebeat/Jenkinsfile.yml index 0cf61544a6f..55a296e9fdb 100644 --- a/filebeat/Jenkinsfile.yml +++ b/filebeat/Jenkinsfile.yml @@ -44,6 +44,14 @@ stages: pythonIntegTest: mage: "mage pythonIntegTest" ## run the ITs only if the changeset affects a specific module. stage: mandatory + module-compat-7.11: + mage: >- ## Run module integration tests under ES 7.11 to ensure ingest pipeline compatibility. + STACK_ENVIRONMENT=7.11 + TESTING_FILEBEAT_SKIP_DIFF=1 + PYTEST_ADDOPTS='-k test_modules' + mage pythonIntegTest + withModule: true + stage: mandatory macos: mage: "mage build unitTest" platforms: ## override default label in this specific stage. diff --git a/filebeat/docker-compose.yml b/filebeat/docker-compose.yml index 5a447d6cd66..19302ae1e6f 100644 --- a/filebeat/docker-compose.yml +++ b/filebeat/docker-compose.yml @@ -40,8 +40,6 @@ services: extends: file: ${ES_BEATS}/testing/environments/${TESTING_ENVIRONMENT}.yml service: elasticsearch - environment: - script.cache.max_size: "500" kafka: build: ${ES_BEATS}/testing/environments/docker/kafka diff --git a/filebeat/fileset/compatibility.go b/filebeat/fileset/compatibility.go index 5e9c0cd91f1..8fe4e64a4db 100644 --- a/filebeat/fileset/compatibility.go +++ b/filebeat/fileset/compatibility.go @@ -18,6 +18,7 @@ package fileset import ( + "encoding/json" "fmt" "strings" @@ -30,31 +31,9 @@ import ( // processorCompatibility defines a processor's minimum version requirements or // a transformation to make it compatible. type processorCompatibility struct { - checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies. - procType string // Elasticsearch Ingest Node processor type. - adaptConfig func(processor map[string]interface{}, log *logp.Logger) compatAction // Adapt the configuration to make it compatible. -} - -type compatAction func(interface{}) (interface{}, error) - -func keepProcessor(original interface{}) (interface{}, error) { - return original, nil -} - -func dropProcessor(interface{}) (interface{}, error) { - return nil, nil -} - -func replaceProcessor(newProc interface{}) compatAction { - return func(interface{}) (interface{}, error) { - return newProc, nil - } -} - -func fail(err error) compatAction { - return func(interface{}) (interface{}, error) { - return nil, err - } + checkVersion func(esVersion *common.Version) bool // Version check returns true if this check applies. + procType string // Elasticsearch Ingest Node processor type. + adaptConfig func(processor Processor, log *logp.Logger) (Processor, error) // Adapt the configuration to make it compatible. } var processorCompatibilityChecks = []processorCompatibility{ @@ -92,9 +71,9 @@ var processorCompatibilityChecks = []processorCompatibility{ return esVersion.LessThan(common.MustNewVersion("7.0.0")) && !esVersion.LessThan(common.MustNewVersion("6.7.0")) }, - adaptConfig: func(config map[string]interface{}, _ *logp.Logger) compatAction { - config["ecs"] = true - return keepProcessor + adaptConfig: func(processor Processor, _ *logp.Logger) (Processor, error) { + processor.Set("ecs", true) + return processor, nil }, }, { @@ -102,8 +81,8 @@ var processorCompatibilityChecks = []processorCompatibility{ checkVersion: func(esVersion *common.Version) bool { return esVersion.LessThan(common.MustNewVersion("6.7.0")) }, - adaptConfig: func(config map[string]interface{}, _ *logp.Logger) compatAction { - return fail(errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required")) + adaptConfig: func(_ Processor, _ *logp.Logger) (Processor, error) { + return Processor{}, errors.New("user_agent processor requires option 'ecs: true', Elasticsearch 6.7 or newer required") }, }, { @@ -129,85 +108,220 @@ var processorCompatibilityChecks = []processorCompatibility{ }, } +// Processor represents and Ingest Node processor definition. +type Processor struct { + name string + config map[string]interface{} +} + +// NewProcessor returns the representation of an Ingest Node processor +// for the given configuration. +func NewProcessor(raw interface{}) (p Processor, err error) { + rawAsMap, ok := raw.(map[string]interface{}) + if !ok { + return p, fmt.Errorf("processor is not an object, got %T", raw) + } + + var keys []string + for k := range rawAsMap { + keys = append(keys, k) + } + if len(keys) != 1 { + return p, fmt.Errorf("processor doesn't have exactly 1 key, got %d: %v", len(keys), keys) + } + p.name = keys[0] + if p.config, ok = rawAsMap[p.name].(map[string]interface{}); !ok { + return p, fmt.Errorf("processor config is not an object, got %T", rawAsMap[p.name]) + } + return p, nil +} + +// Name of the processor. +func (p *Processor) Name() string { + return p.name +} + +// IsNil returns a boolean indicating if the processor is the zero value. +func (p *Processor) IsNil() bool { + return p.name == "" +} + +// Config returns the processor configuration as a map. +func (p *Processor) Config() map[string]interface{} { + return p.config +} + +// GetBool returns a boolean flag from the processor's configuration. +func (p *Processor) GetBool(key string) (value, ok bool) { + value, ok = p.config[key].(bool) + return +} + +// GetString returns a string flag from the processor's configuration. +func (p *Processor) GetString(key string) (value string, ok bool) { + value, ok = p.config[key].(string) + return +} + +// GetList returns an array from the processor's configuration. +func (p *Processor) GetList(key string) (value []interface{}, ok bool) { + value, ok = p.config[key].([]interface{}) + return +} + +// Set a flag in the processor's configuration. +func (p *Processor) Set(key string, value interface{}) { + p.config[key] = value +} + +// Get a flag from the processor's configuration. +func (p *Processor) Get(key string) (value interface{}, ok bool) { + value, ok = p.config[key] + return +} + +// Delete a configuration flag. +func (p *Processor) Delete(key string) { + delete(p.config, key) +} + +// ToMap returns the representation for the processor as a map. +func (p *Processor) ToMap() map[string]interface{} { + return map[string]interface{}{ + p.name: p.config, + } +} + +// String returns a string representation for the processor. +func (p *Processor) String() string { + b, err := json.Marshal(p.ToMap()) + if err != nil { + return fmt.Sprintf("/* encoding error: %v */", err) + } + return string(b) +} + // adaptPipelineForCompatibility iterates over all processors in the pipeline // and adapts them for version of Elasticsearch used. Adapt can mean modifying // processor options or removing the processor. func adaptPipelineForCompatibility(esVersion common.Version, pipelineID string, content map[string]interface{}, log *logp.Logger) (err error) { - p, ok := content["processors"] + log = log.With("pipeline_id", pipelineID) + // Adapt the main processors in the pipeline. + if err = adaptProcessorsForCompatibility(esVersion, content, "processors", false, log); err != nil { + return err + } + // Adapt any `on_failure` processors in the pipeline. + return adaptProcessorsForCompatibility(esVersion, content, "on_failure", true, log) +} + +func adaptProcessorsForCompatibility(esVersion common.Version, content map[string]interface{}, section string, ignoreMissingsection bool, log *logp.Logger) (err error) { + p, ok := content[section] if !ok { - return errors.New("'processors' is missing from the pipeline definition") + if ignoreMissingsection { + return nil + } + return fmt.Errorf("'%s' is missing from the pipeline definition", section) } processors, ok := p.([]interface{}) if !ok { - return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) + return fmt.Errorf("'%s' expected to be a list, found %T", section, p) } var filteredProcs []interface{} + log = log.With("processors_section", section) nextProcessor: for i, obj := range processors { - for _, proc := range processorCompatibilityChecks { - processor, ok := obj.(map[string]interface{}) - if !ok { - return fmt.Errorf("processor at index %d is not an object, got %T", i, obj) + processor, err := NewProcessor(obj) + if err != nil { + return errors.Wrapf(err, "cannot parse processor in section '%s' index %d body=%+v", section, i, obj) + } + + // Adapt any on_failure processors for this processor. + prevOnFailure, _ := processor.GetList("on_failure") + if err = adaptProcessorsForCompatibility(esVersion, processor.Config(), "on_failure", true, + log.With("parent_processor_type", processor.Name(), "parent_processor_index", i)); err != nil { + return errors.Wrapf(err, "cannot parse on_failure for processor in section '%s' index %d body=%+v", section, i, obj) + } + if onFailure, _ := processor.GetList("on_failure"); len(prevOnFailure) > 0 && len(onFailure) == 0 { + processor.Delete("on_failure") + } + + // Adapt inner processor in case of foreach. + if inner, found := processor.Get("processor"); found && processor.Name() == "foreach" { + processor.Set("processor", []interface{}{inner}) + if err = adaptProcessorsForCompatibility(esVersion, processor.Config(), "processor", false, + log.With("parent_processor_type", processor.Name(), "parent_processor_index", i)); err != nil { + return errors.Wrapf(err, "cannot parse inner processor for foreach in section '%s' index %d", section, i) } + newList, _ := processor.GetList("processor") + switch len(newList) { + case 0: + // compatibility has removed the inner processor of a foreach processor, + // must also remove the foreach processor itself. + continue nextProcessor + case 1: + // replace existing processor with possibly modified one. + processor.Set("processor", newList[0]) + default: + // This is actually not possible as compatibility checks + // can't inject extra processors. + return fmt.Errorf("parsing inner processor for foreach in section '%s' index %d results in more than one processor, which is unsupported by foreach", section, i) + } + } - configIfc, found := processor[proc.procType] - if !found { + // Run compatibility checks on the processor. + for _, proc := range processorCompatibilityChecks { + if processor.Name() != proc.procType { continue } - config, ok := configIfc.(map[string]interface{}) - if !ok { - return fmt.Errorf("processor config at index %d is not an object, got %T", i, obj) - } if !proc.checkVersion(&esVersion) { continue } - act := proc.adaptConfig(config, log.With("processor_type", proc.procType, "processor_index", i)) - obj, err = act(obj) + processor, err = proc.adaptConfig(processor, log.With("processor_type", proc.procType, "processor_index", i)) if err != nil { return fmt.Errorf("failed to adapt %q processor at index %d: %w", proc.procType, i, err) } - if obj == nil { + if processor.IsNil() { continue nextProcessor } } - filteredProcs = append(filteredProcs, obj) + filteredProcs = append(filteredProcs, processor.ToMap()) } - content["processors"] = filteredProcs + content[section] = filteredProcs return nil } // deleteProcessor returns true to indicate that the processor should be deleted // in order to adapt the pipeline for backwards compatibility to Elasticsearch. -func deleteProcessor(_ map[string]interface{}, _ *logp.Logger) compatAction { - return dropProcessor +func deleteProcessor(_ Processor, _ *logp.Logger) (Processor, error) { + return Processor{}, nil } // replaceSetIgnoreEmptyValue replaces ignore_empty_value option with an if // statement so ES less than 7.9 will work. -func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) compatAction { - _, ok := config["ignore_empty_value"].(bool) +func replaceSetIgnoreEmptyValue(processor Processor, log *logp.Logger) (Processor, error) { + _, ok := processor.GetBool("ignore_empty_value") if !ok { - return keepProcessor + return processor, nil } log.Debug("Removing unsupported 'ignore_empty_value' from set processor.") - delete(config, "ignore_empty_value") + processor.Delete("ignore_empty_value") - _, ok = config["if"].(string) + _, ok = processor.GetString("if") if ok { // assume if check is sufficient - return keepProcessor + return processor, nil } - val, ok := config["value"].(string) + val, ok := processor.GetString("value") if !ok { - return keepProcessor + return processor, nil } newIf := strings.TrimLeft(val, "{ ") @@ -215,39 +329,39 @@ func replaceSetIgnoreEmptyValue(config map[string]interface{}, log *logp.Logger) newIf = strings.ReplaceAll(newIf, ".", "?.") newIf = "ctx?." + newIf + " != null" - log.Debug("Adding if %s to replace 'ignore_empty_value' in set processor.", newIf) - config["if"] = newIf - return keepProcessor + log.Debugf("Adding if %s to replace 'ignore_empty_value' in set processor.", newIf) + processor.Set("if", newIf) + return processor, nil } // replaceAppendAllowDuplicates replaces allow_duplicates option with an if statement // so ES less than 7.10 will work. -func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logger) compatAction { - allow, ok := config["allow_duplicates"].(bool) +func replaceAppendAllowDuplicates(processor Processor, log *logp.Logger) (Processor, error) { + allow, ok := processor.GetBool("allow_duplicates") if !ok { - return keepProcessor + return processor, nil } log.Debug("Removing unsupported 'allow_duplicates' from append processor.") - delete(config, "allow_duplicates") + processor.Delete("allow_duplicates") if allow { // It was set to true, nothing else to do after removing the option. - return keepProcessor + return processor, nil } - currIf, _ := config["if"].(string) + currIf, _ := processor.GetString("if") if strings.Contains(strings.ToLower(currIf), "contains") { // If it has a contains statement, we assume it is checking for duplicates already. - return keepProcessor + return processor, nil } - field, ok := config["field"].(string) + field, ok := processor.GetString("field") if !ok { - return keepProcessor + return processor, nil } - val, ok := config["value"].(string) + val, ok := processor.GetString("value") if !ok { - return keepProcessor + return processor, nil } field = strings.ReplaceAll(field, ".", "?.") @@ -263,36 +377,34 @@ func replaceAppendAllowDuplicates(config map[string]interface{}, log *logp.Logge newIf := fmt.Sprintf("%s && ((ctx?.%s instanceof List && !ctx?.%s.contains(ctx?.%s)) || ctx?.%s != ctx?.%s)", currIf, field, field, val, field, val) - log.Debug("Adding if %s to replace 'allow_duplicates: false' in append processor.", newIf) - config["if"] = newIf + log.Debugf("Adding if %s to replace 'allow_duplicates: false' in append processor.", newIf) + processor.Set("if", newIf) - return keepProcessor + return processor, nil } // replaceConvertIP replaces convert processors with type: ip with a grok expression that uses // the IP pattern. -func replaceConvertIP(config map[string]interface{}, log *logp.Logger) compatAction { - wantedType, found := config["type"] - if !found || wantedType != "ip" { - return keepProcessor +func replaceConvertIP(processor Processor, log *logp.Logger) (Processor, error) { + if wantedType, _ := processor.GetString("type"); wantedType != "ip" { + return processor, nil } - log.Debug("processor input=", config) - delete(config, "type") + log.Debug("processor input=", processor.String()) + processor.Delete("type") var srcIf, dstIf interface{} - if srcIf, found = config["field"]; !found { - return fail(errors.New("field option is required for convert processor")) + var found bool + if srcIf, found = processor.Get("field"); !found { + return Processor{}, errors.New("field option is required for convert processor") } - if dstIf, found = config["target_field"]; found { - delete(config, "target_field") + if dstIf, found = processor.Get("target_field"); found { + processor.Delete("target_field") } else { dstIf = srcIf } - config["patterns"] = []string{ + processor.Set("patterns", []string{ fmt.Sprintf("^%%{IP:%s}$", dstIf), - } - grok := map[string]interface{}{ - "grok": config, - } - log.Debug("processor output=", grok) - return replaceProcessor(grok) + }) + processor.name = "grok" + log.Debug("processor output=", processor.String()) + return processor, nil } diff --git a/filebeat/fileset/compatibility_test.go b/filebeat/fileset/compatibility_test.go index e7fa8a1267a..0d3b000d8b7 100644 --- a/filebeat/fileset/compatibility_test.go +++ b/filebeat/fileset/compatibility_test.go @@ -898,9 +898,15 @@ func TestReplaceConvertIPWithGrok(t *testing.T) { "ignore_failure": false, "tag": "myTag", "on_failure": []interface{}{ - "foo", map[string]interface{}{ - "bar": []int{1, 2, 3}, + "foo": map[string]interface{}{ + "baz": false, + }, + }, + map[string]interface{}{ + "bar": map[string]interface{}{ + "baz": true, + }, }, }, }, @@ -921,9 +927,15 @@ func TestReplaceConvertIPWithGrok(t *testing.T) { "ignore_failure": false, "tag": "myTag", "on_failure": []interface{}{ - "foo", map[string]interface{}{ - "bar": []int{1, 2, 3}, + "foo": map[string]interface{}{ + "baz": false, + }, + }, + map[string]interface{}{ + "bar": map[string]interface{}{ + "baz": true, + }, }, }, }, @@ -1068,3 +1080,264 @@ func TestRemoveRegisteredDomainProcessor(t *testing.T) { }) } } + +func TestReplaceAlternativeFlowProcessors(t *testing.T) { + logp.TestingSetup() + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "Replace in on_failure section", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}(nil), + "on_failure": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + map[string]interface{}{ + "community_id": map[string]interface{}{}, + }, + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "error.message", + "value": "something's wrong", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}(nil), + "on_failure": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + }, + }, + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "error.message", + "value": "something's wrong", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "Replace in processor's on_failure", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "foo": map[string]interface{}{ + "bar": "baz", + "on_failure": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + }, + }, + map[string]interface{}{ + "community_id": map[string]interface{}{}, + }, + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "error.message", + "value": "something's wrong", + }, + }, + }, + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "foo": map[string]interface{}{ + "bar": "baz", + "on_failure": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + }, + }, + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "error.message", + "value": "something's wrong", + }, + }, + }, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "Remove empty on_failure key", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "foo": map[string]interface{}{ + "bar": "baz", + "on_failure": []interface{}{ + map[string]interface{}{ + "community_id": map[string]interface{}{}, + }, + }, + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "foo": map[string]interface{}{ + "bar": "baz", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "process foreach processor", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + "if": "ctx?.host?.hostname != null", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "Remove leftover foreach processor", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "foreach": map[string]interface{}{ + "field": "foo", + "processor": map[string]interface{}{ + "community_id": map[string]interface{}{}, + }, + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}(nil), + }, + isErrExpected: false, + }, + { + name: "nested", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}(nil), + "on_failure": []interface{}{ + map[string]interface{}{ + "foreach": map[string]interface{}{ + "field": "foo", + "processor": map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "allow_duplicates": false, + "if": "ctx?.host?.hostname != null", + "on_failure": []interface{}{ + map[string]interface{}{ + "community_id": map[string]interface{}{}, + }, + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "error.message", + "value": "panic", + }, + }, + }, + }, + }, + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}(nil), + "on_failure": []interface{}{ + map[string]interface{}{ + "foreach": map[string]interface{}{ + "field": "foo", + "processor": map[string]interface{}{ + "append": map[string]interface{}{ + "field": "related.hosts", + "value": "{{host.hostname}}", + "if": "ctx?.host?.hostname != null && ((ctx?.related?.hosts instanceof List && !ctx?.related?.hosts.contains(ctx?.host?.hostname)) || ctx?.related?.hosts != ctx?.host?.hostname)", + "on_failure": []interface{}{ + map[string]interface{}{ + "append": map[string]interface{}{ + "field": "error.message", + "value": "panic", + }, + }, + }, + }, + }, + }, + }, + }, + }, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := adaptPipelineForCompatibility(*test.esVersion, "foo-pipeline", test.content, logp.NewLogger(logName)) + if test.isErrExpected { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +} diff --git a/filebeat/tests/system/test_modules.py b/filebeat/tests/system/test_modules.py index b2e6c03b1c5..e6adfdd2c63 100644 --- a/filebeat/tests/system/test_modules.py +++ b/filebeat/tests/system/test_modules.py @@ -75,14 +75,6 @@ def init(self): self.index_name = "test-filebeat-modules" - body = { - "transient": { - "script.max_compilations_rate": "2000/1m" - } - } - - self.es.transport.perform_request('PUT', "/_cluster/settings", body=body) - @parameterized.expand(load_fileset_test_cases) @unittest.skipIf(not INTEGRATION_TESTS, "integration tests are disabled, run with INTEGRATION_TESTS=1 to enable them.") @@ -214,6 +206,15 @@ def _test_expected_events(self, test_file, objects): assert len(expected) == len(objects), "expected {} events to compare but got {}".format( len(expected), len(objects)) + # Do not perform a comparison between the resulting and expected documents + # if the TESTING_FILEBEAT_SKIP_DIFF flag is set. + # + # This allows to run a basic check with older versions of ES that can lead + # to slightly different documents without maintaining multiple sets of + # golden files. + if os.getenv("TESTING_FILEBEAT_SKIP_DIFF"): + return + for idx in range(len(expected)): ev = expected[idx] obj = objects[idx] diff --git a/filebeat/tests/system/test_pipeline.py b/filebeat/tests/system/test_pipeline.py index da6d357f8e8..afb3219e62d 100644 --- a/filebeat/tests/system/test_pipeline.py +++ b/filebeat/tests/system/test_pipeline.py @@ -45,14 +45,6 @@ def test_input_pipeline_config(self): pass self.wait_until(lambda: not self.es.indices.exists(index_name)) - body = { - "transient": { - "script.max_compilations_rate": "100/1m" - } - } - - self.es.transport.perform_request('PUT', "/_cluster/settings", body=body) - self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", elasticsearch=dict( diff --git a/testing/environments/7.11.yml b/testing/environments/7.11.yml new file mode 100644 index 00000000000..7f93445987c --- /dev/null +++ b/testing/environments/7.11.yml @@ -0,0 +1,38 @@ +# This is the latest 7.11 + +version: '2.3' +services: + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:7.11.2 + healthcheck: + test: ["CMD-SHELL", "curl -s http://localhost:9200/_cat/health?h=status | grep -q green"] + retries: 300 + interval: 1s + environment: + - "ES_JAVA_OPTS=-Xms1g -Xmx1g" + - "network.host=" + - "transport.host=127.0.0.1" + - "http.host=0.0.0.0" + - "xpack.security.enabled=false" + - "script.context.template.max_compilations_rate=unlimited" + - "script.context.ingest.cache_max_size=2000" + - "script.context.processor_conditional.cache_max_size=2000" + - "script.context.template.cache_max_size=2000" + - "action.destructive_requires_name=false" + + logstash: + image: docker.elastic.co/logstash/logstash:7.11.2 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9600/_node/stats"] + retries: 600 + interval: 1s + volumes: + - ./docker/logstash/pipeline:/usr/share/logstash/pipeline:ro + - ./docker/logstash/pki:/etc/pki:ro + + kibana: + image: docker.elastic.co/kibana/kibana:7.11.2 + healthcheck: + test: ["CMD-SHELL", "curl -s http://localhost:5601/api/status | grep -q 'Looking good'"] + retries: 600 + interval: 1s diff --git a/testing/environments/snapshot.yml b/testing/environments/snapshot.yml index 574ea08263a..e2f452a7eed 100644 --- a/testing/environments/snapshot.yml +++ b/testing/environments/snapshot.yml @@ -14,6 +14,10 @@ services: - "transport.host=127.0.0.1" - "http.host=0.0.0.0" - "xpack.security.enabled=false" + - "script.context.template.max_compilations_rate=unlimited" + - "script.context.ingest.cache_max_size=2000" + - "script.context.processor_conditional.cache_max_size=2000" + - "script.context.template.cache_max_size=2000" - "action.destructive_requires_name=false" # Disable geoip updates to prevent golden file test failures when the database # changes and prevent race conditions between tests and database updates. diff --git a/x-pack/filebeat/Jenkinsfile.yml b/x-pack/filebeat/Jenkinsfile.yml index ef338263c6b..09c837bccf6 100644 --- a/x-pack/filebeat/Jenkinsfile.yml +++ b/x-pack/filebeat/Jenkinsfile.yml @@ -44,6 +44,14 @@ stages: pythonIntegTest: mage: "mage pythonIntegTest" ## run the ITs only if the changeset affects a specific module. stage: mandatory + module-compat-7.11: + mage: >- ## Run module integration tests under ES 7.11 to ensure ingest pipeline compatibility. + STACK_ENVIRONMENT=7.11 + TESTING_FILEBEAT_SKIP_DIFF=1 + PYTEST_ADDOPTS='-k test_xpack_modules' + mage pythonIntegTest + withModule: true + stage: mandatory macos: mage: "mage build unitTest" platforms: ## override default label in this specific stage. diff --git a/x-pack/filebeat/docker-compose.yml b/x-pack/filebeat/docker-compose.yml index af81ccb13fb..0c0b477a611 100644 --- a/x-pack/filebeat/docker-compose.yml +++ b/x-pack/filebeat/docker-compose.yml @@ -26,6 +26,4 @@ services: extends: file: ${ES_BEATS}/testing/environments/${STACK_ENVIRONMENT}.yml service: elasticsearch - environment: - script.cache.max_size: "500" diff --git a/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml b/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml index 9b2ea7b0b24..2955cd66b26 100644 --- a/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml +++ b/x-pack/filebeat/module/fortinet/firewall/ingest/pipeline.yml @@ -414,17 +414,18 @@ processors: - remove: field: - _temp + - host - syslog5424_sd - syslog5424_pri - - fortinet.firewall.tz + - fortinet.firewall.agent - fortinet.firewall.date - fortinet.firewall.devid - - fortinet.firewall.eventtime - - fortinet.firewall.time - fortinet.firewall.duration - - host + - fortinet.firewall.eventtime - fortinet.firewall.hostname - - fortinet.firewall.agent + - fortinet.firewall.time + - fortinet.firewall.tz + - fortinet.firewall.url ignore_missing: true - script: lang: painless