From 5ba5d36d17dfe38f27cc080a3d7676f62d0eed09 Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 18 Feb 2019 19:22:25 +0100 Subject: [PATCH 01/13] Separate event processing from pipeline This changes moves the generation of the event processing into it's distinct package, such that the actual publisher pipeline will not define any processors anymore. A new instance of a publisher pipeline must not add fields on it's own. This change converts the event processing pipline into the 'Supporter' pattern, which is already used for Index Management. As different beats ask for slightly different behavior in the event processing (e.g. normalize, default builtins and so on), the `processing.Supporter` can be used for customizations. --- filebeat/channel/factory.go | 16 +- heartbeat/monitors/task.go | 8 +- journalbeat/input/input.go | 10 +- libbeat/beat/pipeline.go | 50 +- libbeat/cmd/instance/beat.go | 11 + libbeat/cmd/instance/settings.go | 3 + libbeat/publisher/pipeline/module.go | 20 +- libbeat/publisher/pipeline/pipeline.go | 85 +--- libbeat/publisher/pipeline/processor_test.go | 433 ------------------ libbeat/publisher/processing/default.go | 261 +++++++++++ libbeat/publisher/processing/default_test.go | 361 +++++++++++++++ libbeat/publisher/processing/processing.go | 28 ++ .../processor.go => processing/processors.go} | 185 ++------ metricbeat/mb/module/connector.go | 8 +- packetbeat/beater/packetbeat.go | 6 +- packetbeat/publish/publish.go | 6 +- winlogbeat/beater/eventlogger.go | 10 +- 17 files changed, 774 insertions(+), 727 deletions(-) delete mode 100644 libbeat/publisher/pipeline/processor_test.go create mode 100644 libbeat/publisher/processing/default.go create mode 100644 libbeat/publisher/processing/default_test.go create mode 100644 libbeat/publisher/processing/processing.go rename libbeat/publisher/{pipeline/processor.go => processing/processors.go} (52%) diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index c938aafe8bd9..c295d1a40560 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -125,13 +125,15 @@ func (f *OutletFactory) Create(p beat.Pipeline, cfg *common.Config, dynFields *c } client, err := p.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - EventMetadata: config.EventMetadata, - DynamicFields: dynFields, - Meta: meta, - Fields: fields, - Processor: processors, - Events: f.eventer, + PublishMode: beat.GuaranteedSend, + Processing: beat.ProcessingConfig{ + EventMetadata: config.EventMetadata, + DynamicFields: dynFields, + Meta: meta, + Fields: fields, + Processor: processors, + }, + Events: f.eventer, }) if err != nil { return nil, err diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 3ee62eff2357..43e4a8056fa1 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -139,9 +139,11 @@ func (t *configuredJob) Start() { } t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{ - EventMetadata: t.config.EventMetadata, - Processor: t.processors, - Fields: fields, + Processing: beat.ProcessingConfig{ + EventMetadata: t.config.EventMetadata, + Processor: t.processors, + Fields: fields, + }, }) if err != nil { logp.Err("could not start monitor: %v", err) diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index 70d9bcb6874d..7f1941271dcd 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -125,10 +125,12 @@ func New( func (i *Input) Run() { var err error i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - EventMetadata: i.eventMeta, - Meta: nil, - Processor: i.processors, + PublishMode: beat.GuaranteedSend, + Processing: beat.ProcessingConfig{ + EventMetadata: i.eventMeta, + Meta: nil, + Processor: i.processors, + }, ACKCount: func(n int) { i.logger.Infof("journalbeat successfully published %d events", n) }, diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 3db7baae6d27..5743e1cc92a4 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -46,22 +46,7 @@ type Client interface { type ClientConfig struct { PublishMode PublishMode - // EventMetadata configures additional fields/tags to be added to published events. - EventMetadata common.EventMetadata - - // Meta provides additional meta data to be added to the Meta field in the beat.Event - // structure. - Meta common.MapStr - - // Fields provides additional 'global' fields to be added to every event - Fields common.MapStr - - // DynamicFields provides additional fields to be added to every event, supporting live updates - DynamicFields *common.MapStrPointer - - // Processors passes additional processor to the client, to be executed before - // the pipeline processors. - Processor ProcessorList + Processing ProcessingConfig // WaitClose sets the maximum duration to wait on ACK, if client still has events // active non-acknowledged events in the publisher pipeline. @@ -72,14 +57,6 @@ type ClientConfig struct { // Events configures callbacks for common client callbacks Events ClientEventer - // By default events are normalized within processor pipeline, - // if the normalization step should be skipped set this to true. - SkipNormalization bool - - // By default events are decorated with agent metadata. - // To skip adding that metadata set this to true. - SkipAgentMetadata bool - // ACK handler strategies. // Note: ack handlers are run in another go-routine owned by the publisher pipeline. // They should not block for to long, to not block the internal buffers for @@ -101,6 +78,31 @@ type ClientConfig struct { ACKLastEvent func(interface{}) } +// ProcessingConfig provides additional event processing settings a client can +// pass to the publisher pipeline on Connect. +type ProcessingConfig struct { + // EventMetadata configures additional fields/tags to be added to published events. + EventMetadata common.EventMetadata + + // Meta provides additional meta data to be added to the Meta field in the beat.Event + // structure. + Meta common.MapStr + + // Fields provides additional 'global' fields to be added to every event + Fields common.MapStr + + // DynamicFields provides additional fields to be added to every event, supporting live updates + DynamicFields *common.MapStrPointer + + // Processors passes additional processor to the client, to be executed before + // the pipeline processors. + Processor ProcessorList + + // Private contains additional information to be passed to the processing + // pipeline builder. + Private interface{} +} + // ClientEventer provides access to internal client events. type ClientEventer interface { Closing() // Closing indicates the client is being shutdown next diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 3ab2507486ec..198dea7806c6 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -62,6 +62,7 @@ import ( "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/libbeat/plugin" "github.com/elastic/beats/libbeat/publisher/pipeline" + "github.com/elastic/beats/libbeat/publisher/processing" svc "github.com/elastic/beats/libbeat/service" "github.com/elastic/beats/libbeat/version" sysinfo "github.com/elastic/go-sysinfo" @@ -78,6 +79,8 @@ type Beat struct { keystore keystore.Keystore index idxmgmt.Supporter + + processing processing.Supporter } type beatConfig struct { @@ -310,6 +313,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { Logger: logp.L().Named("publisher"), }, b.Config.Pipeline, + b.processing, b.makeOutputFactory(b.Config.Output), ) @@ -593,6 +597,13 @@ func (b *Beat) configure(settings Settings) error { imFactory = idxmgmt.MakeDefaultSupport(settings.ILM) } b.index, err = imFactory(nil, b.Beat.Info, b.RawConfig) + + processingFactory := settings.Processing + if processingFactory == nil { + processingFactory = processing.NewBeatSupport() + } + b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig) + return err } diff --git a/libbeat/cmd/instance/settings.go b/libbeat/cmd/instance/settings.go index 9b2bfd34a056..dfb8735f26a6 100644 --- a/libbeat/cmd/instance/settings.go +++ b/libbeat/cmd/instance/settings.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/libbeat/idxmgmt" "github.com/elastic/beats/libbeat/idxmgmt/ilm" "github.com/elastic/beats/libbeat/monitoring/report" + "github.com/elastic/beats/libbeat/publisher/processing" ) // Settings contains basic settings for any beat to pass into GenRootCmd @@ -40,4 +41,6 @@ type Settings struct { // load custom index manager. The config object will be the Beats root configuration. IndexManagement idxmgmt.SupportFactory ILM ilm.SupportFactory + + Processing processing.SupporterFactory } diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index 2c854c13676c..8b5a66c63c6d 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -26,7 +26,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/outputs" - "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/publisher/processing" "github.com/elastic/beats/libbeat/publisher/queue" ) @@ -60,6 +60,7 @@ func Load( beatInfo beat.Info, monitors Monitors, config Config, + processors processing.Supporter, makeOutput func(outputs.Observer) (string, outputs.Group, error), ) (*Pipeline, error) { log := monitors.Logger @@ -71,28 +72,11 @@ func Load( log.Info("Dry run mode. All output types except the file based one are disabled.") } - processors, err := processors.New(config.Processors) - if err != nil { - return nil, fmt.Errorf("error initializing processors: %v", err) - } - name := beatInfo.Name settings := Settings{ WaitClose: 0, WaitCloseMode: NoWaitOnClose, - Disabled: publishDisabled, Processors: processors, - Annotations: Annotations{ - Event: config.EventMetadata, - Builtin: common.MapStr{ - "host": common.MapStr{ - "name": name, - }, - "ecs": common.MapStr{ - "version": "1.0.0-beta2", - }, - }, - }, } queueBuilder, err := createQueueBuilder(config.Queue, monitors) diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index fbe49510e0dc..de6602e886e3 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -31,8 +31,8 @@ import ( "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" - "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/publisher" + "github.com/elastic/beats/libbeat/publisher/processing" "github.com/elastic/beats/libbeat/publisher/queue" ) @@ -73,21 +73,7 @@ type Pipeline struct { ackBuilder ackBuilder eventSema *sema - processors pipelineProcessors -} - -type pipelineProcessors struct { - // The pipeline its processor settings for - // constructing the clients complete processor - // pipeline on connect. - builtinMeta common.MapStr - fields common.MapStr - tags []string - - processors beat.Processor - - disabled bool // disabled is set if outputs have been disabled via CLI - alwaysCopy bool + processors processing.Supporter } // Settings is used to pass additional settings to a newly created pipeline instance. @@ -98,19 +84,7 @@ type Settings struct { WaitCloseMode WaitCloseMode - Annotations Annotations - Processors *processors.Processors - - Disabled bool -} - -// Annotations configures additional metadata to be adde to every single event -// being published. The meta data will be added before executing the configured -// processors, so all processors configured with the pipeline or client will see -// the same/complete event. -type Annotations struct { - Event common.EventMetadata - Builtin common.MapStr + Processors processing.Supporter } // WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline. @@ -172,17 +146,13 @@ func New( monitors.Logger = logp.NewLogger("publish") } - annotations := settings.Annotations - processors := settings.Processors - log := monitors.Logger - disabledOutput := settings.Disabled p := &Pipeline{ beatInfo: beat, monitors: monitors, observer: nilObserver, waitCloseMode: settings.WaitCloseMode, waitCloseTimeout: settings.WaitClose, - processors: makePipelineProcessors(log, annotations, processors, disabledOutput), + processors: settings.Processors, } p.ackBuilder = &pipelineEmptyACK{p} p.ackActive = atomic.MakeBool(true) @@ -347,7 +317,16 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } } - processors := newProcessorPipeline(p.beatInfo, p.monitors, p.processors, cfg) + var processors beat.Processor + if p.processors != nil { + proc, err := p.processors(cfg.Processing, publishDisabled) + if err != nil { + return nil, err + } + + processors = proc + } + acker := p.makeACKer(processors != nil, &cfg, waitClose) producerCfg := queue.ProducerConfig{ // Cancel events from queue if acker is configured @@ -414,42 +393,6 @@ func (e *waitCloser) wait() { e.events.Wait() } -func makePipelineProcessors( - log *logp.Logger, - annotations Annotations, - processors *processors.Processors, - disabled bool, -) pipelineProcessors { - p := pipelineProcessors{ - disabled: disabled, - } - - hasProcessors := processors != nil && len(processors.List) > 0 - if hasProcessors { - tmp := newProgram("global", log) - for _, p := range processors.List { - tmp.add(p) - } - p.processors = tmp - } - - if meta := annotations.Builtin; meta != nil { - p.builtinMeta = meta - } - - if em := annotations.Event; len(em.Fields) > 0 { - fields := common.MapStr{} - common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) - p.fields = fields - } - - if t := annotations.Event.Tags; len(t) > 0 { - p.tags = t - } - - return p -} - // OutputReloader returns a reloadable object for the output section of this pipeline func (p *Pipeline) OutputReloader() OutputReloader { return p.output diff --git a/libbeat/publisher/pipeline/processor_test.go b/libbeat/publisher/pipeline/processor_test.go deleted file mode 100644 index 07b643d0c81c..000000000000 --- a/libbeat/publisher/pipeline/processor_test.go +++ /dev/null @@ -1,433 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package pipeline - -import ( - "sync" - "testing" - "time" - - "github.com/gofrs/uuid" - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" -) - -func TestProcessors(t *testing.T) { - defaultInfo := beat.Info{} - - type local struct { - config beat.ClientConfig - events []common.MapStr - expected []common.MapStr - includeAgentMetadata bool - } - - tests := []struct { - name string - global pipelineProcessors - local []local - info *beat.Info - }{ - { - name: "user global fields and tags", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, - tags: []string{"tag"}, - }, - local: []local{ - { - config: beat.ClientConfig{}, - events: []common.MapStr{{"value": "abc", "user": nil}}, - expected: []common.MapStr{ - {"value": "abc", "global": 1, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "no normalization", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, - tags: []string{"tag"}, - }, - local: []local{ - { - config: beat.ClientConfig{SkipNormalization: true}, - events: []common.MapStr{{"value": "abc", "user": nil}}, - expected: []common.MapStr{ - {"value": "abc", "user": nil, "global": 1, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "add agent metadata", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1, "agent": common.MapStr{"foo": "bar"}}, - tags: []string{"tag"}, - }, - info: &beat.Info{ - Beat: "test", - EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), - Hostname: "test.host.name", - ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), - Name: "test.host.name", - Version: "0.1", - }, - local: []local{ - { - config: beat.ClientConfig{}, - events: []common.MapStr{{"value": "abc", "user": nil}}, - expected: []common.MapStr{ - { - "agent": common.MapStr{ - "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", - "hostname": "test.host.name", - "id": "123e4567-e89b-12d3-a456-426655440001", - "type": "test", - "version": "0.1", - "foo": "bar", - }, - "value": "abc", "global": 1, "tags": []string{"tag"}, - }, - }, - includeAgentMetadata: true, - }, - }, - }, - { - name: "add agent metadata with custom host.name", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, - tags: []string{"tag"}, - }, - info: &beat.Info{ - Beat: "test", - EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), - Hostname: "test.host.name", - ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), - Name: "other.test.host.name", - Version: "0.1", - }, - local: []local{ - { - config: beat.ClientConfig{}, - events: []common.MapStr{{"value": "abc", "user": nil}}, - expected: []common.MapStr{ - { - "agent": common.MapStr{ - "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", - "hostname": "test.host.name", - "id": "123e4567-e89b-12d3-a456-426655440001", - "name": "other.test.host.name", - "type": "test", - "version": "0.1", - }, - "value": "abc", "global": 1, "tags": []string{"tag"}, - }, - }, - includeAgentMetadata: true, - }, - }, - }, - { - name: "beat local fields", - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "local": 1}}, - }, - }, - }, - { - name: "beat local and user global fields", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1}, - tags: []string{"tag"}, - }, - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - {"value": "abc", "local": 1, "global": 1, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "user global fields overwrite beat local fields", - global: pipelineProcessors{ - fields: common.MapStr{"global": 1, "shared": "global"}, - tags: []string{"tag"}, - }, - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1, "shared": "local"}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - {"value": "abc", "local": 1, "global": 1, "shared": "global", "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "beat local fields isolated", - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "local": 1}}, - }, - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 2}, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "local": 2}}, - }, - }, - }, - - { - name: "beat local fields + user global fields isolated", - global: pipelineProcessors{ - fields: common.MapStr{"global": 0}, - }, - local: []local{ - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 1}, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}}, - }, - { - config: beat.ClientConfig{ - Fields: common.MapStr{"local": 2}, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}}, - }, - }, - }, - { - name: "user local fields and tags", - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - Tags: []string{"tag"}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - {"value": "abc", "fields": common.MapStr{"local": 1}, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "user local fields (under root) and tags", - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - FieldsUnderRoot: true, - Tags: []string{"tag"}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - {"value": "abc", "local": 1, "tags": []string{"tag"}}, - }, - }, - }, - }, - { - name: "user local fields overwrite user global fields", - global: pipelineProcessors{ - fields: common.MapStr{"global": 0, "shared": "global"}, - tags: []string{"global"}, - }, - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1, "shared": "local"}, - FieldsUnderRoot: true, - Tags: []string{"local"}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{ - { - "value": "abc", - "global": 0, "local": 1, "shared": "local", - "tags": []string{"global", "local"}, - }, - }, - }, - }, - }, - { - name: "user local fields isolated", - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"local": 1}}}, - }, - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 2}, - }, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"local": 2}}}, - }, - }, - }, - { - name: "user local + global fields isolated", - global: pipelineProcessors{ - fields: common.MapStr{"fields": common.MapStr{"global": 0}}, - }, - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "fields": common.MapStr{"global": 0, "local": 1}}}, - }, - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 2}, - }, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "fields": common.MapStr{"global": 0, "local": 2}}}, - }, - }, - }, - { - name: "user local + global fields isolated (fields with root)", - global: pipelineProcessors{ - fields: common.MapStr{"global": 0}, - }, - local: []local{ - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 1}, - FieldsUnderRoot: true, - }, - }, - events: []common.MapStr{{"value": "abc"}}, - expected: []common.MapStr{{"value": "abc", "global": 0, "local": 1}}, - }, - { - config: beat.ClientConfig{ - EventMetadata: common.EventMetadata{ - Fields: common.MapStr{"local": 2}, - FieldsUnderRoot: true, - }, - }, - events: []common.MapStr{{"value": "def"}}, - expected: []common.MapStr{{"value": "def", "global": 0, "local": 2}}, - }, - }, - }, - } - - for _, test := range tests { - test := test - t.Run(test.name, func(t *testing.T) { - monitors := Monitors{ - Logger: logp.NewLogger("test processors"), - } - - // create processor pipelines - programs := make([]beat.Processor, len(test.local)) - info := defaultInfo - if test.info != nil { - info = *test.info - } - for i, local := range test.local { - local.config.SkipAgentMetadata = !local.includeAgentMetadata - programs[i] = newProcessorPipeline(info, monitors, test.global, local.config) - } - - // run processor pipelines in parallel - var ( - wg sync.WaitGroup - mux sync.Mutex - results = make([][]common.MapStr, len(programs)) - ) - for id, local := range test.local { - wg.Add(1) - id, program, local := id, programs[id], local - go func() { - defer wg.Done() - - actual := make([]common.MapStr, len(local.events)) - for i, event := range local.events { - out, _ := program.Run(&beat.Event{ - Timestamp: time.Now(), - Fields: event, - }) - actual[i] = out.Fields - } - - mux.Lock() - defer mux.Unlock() - results[id] = actual - }() - } - wg.Wait() - - // validate - for i, local := range test.local { - assert.Equal(t, local.expected, results[i]) - } - }) - } -} diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go new file mode 100644 index 000000000000..cda5e8021cd7 --- /dev/null +++ b/libbeat/publisher/processing/default.go @@ -0,0 +1,261 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processing + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/actions" +) + +type pipelineProcessors struct { + info beat.Info + log *logp.Logger + + skipNormalize bool + + // The pipeline its processor settings for + // constructing the clients complete processor + // pipeline on connect. + builtinMeta common.MapStr + fields common.MapStr + tags []string + + processors beat.Processor + + drop bool // disabled is set if outputs have been disabled via CLI + alwaysCopy bool +} + +const ecsVersion = "1.0.0-beta2" + +func NewBeatSupport() SupporterFactory { + return NewDefaultSupport(true, WithECS, WithHost, WithBeatMeta("agent")) +} + +func NewObserverSupport(normalize bool) SupporterFactory { + return NewDefaultSupport(normalize, WithECS, WithBeatMeta("observer")) +} + +func NewDefaultSupport( + normalize bool, + makeBuiltin ...func(beat.Info) common.MapStr, +) SupporterFactory { + return func(info beat.Info, log *logp.Logger, beatCfg *common.Config) (Supporter, error) { + cfg := struct { + common.EventMetadata `config:",inline"` // Fields and tags to add to each event. + Processors processors.PluginConfig `config:"processors"` + }{} + if err := beatCfg.Unpack(&cfg); err != nil { + return nil, err + } + + processors, err := processors.New(cfg.Processors) + if err != nil { + return nil, fmt.Errorf("error initializing processors: %v", err) + } + + p := pipelineProcessors{ + skipNormalize: !normalize, + log: log, + } + + hasProcessors := processors != nil && len(processors.List) > 0 + if hasProcessors { + tmp := newProgram("global", log) + for _, p := range processors.List { + tmp.add(p) + } + p.processors = tmp + } + + builtin := common.MapStr{} + for _, mk := range makeBuiltin { + builtin.DeepUpdate(mk(info).Clone()) + } + if len(builtin) > 0 { + p.builtinMeta = builtin + } + + if em := cfg.EventMetadata; len(em.Fields) > 0 { + fields := common.MapStr{} + common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) + p.fields = fields + } + + if t := cfg.EventMetadata.Tags; len(t) > 0 { + p.tags = t + } + + return p.build, nil + } +} + +func WithFields(fields common.MapStr) func(beat.Info) common.MapStr { + return func(_ beat.Info) common.MapStr { + return fields + } +} + +func WithSchema(key, version string) func(beat.Info) common.MapStr { + return WithFields(common.MapStr{key: version}) +} + +var WithECS = WithSchema("ecs", ecsVersion) + +func WithHost(info beat.Info) common.MapStr { + return common.MapStr{ + "host": common.MapStr{ + "name": info.Name, + }, + } +} + +func WithBeatMeta(key string) func(beat.Info) common.MapStr { + return func(info beat.Info) common.MapStr { + metadata := common.MapStr{ + "type": info.Beat, + "ephemeral_id": info.EphemeralID.String(), + "hostname": info.Hostname, + "id": info.ID.String(), + "version": info.Version, + } + if info.Name != info.Hostname { + metadata.Put("name", info.Name) + } + return common.MapStr{key: metadata} + } +} + +// build prepares the processor pipeline, merging +// post processing, event annotations and actual configured processors. +// The pipeline generated ensure the client and pipeline processors +// will see the complete events with all meta data applied. +// +// Pipeline (C=client, P=pipeline) +// +// 1. (P) generalize/normalize event +// 2. (C) add Meta from client Config to event.Meta +// 3. (C) add Fields from client config to event.Fields +// 4. (P) add pipeline fields + tags +// 5. (C) add client fields + tags +// 6. (C) client processors list +// 7. (P) add builtins +// 8. (P) pipeline processors list +// 9. (P) (if publish/debug enabled) log event +// 10. (P) (if output disabled) dropEvent +func (pp *pipelineProcessors) build( + cfg beat.ProcessingConfig, + drop bool, +) (beat.Processor, error) { + var ( + // pipeline processors + processors = &program{ + title: "processPipeline", + log: pp.log, + } + + // client fields and metadata + clientMeta = cfg.Meta + localProcessors = makeClientProcessors(pp.log, cfg) + ) + + needsCopy := pp.alwaysCopy || localProcessors != nil || pp.processors != nil + + if !pp.skipNormalize { + // setup 1: generalize/normalize output (P) + processors.add(generalizeProcessor) + } + + // setup 2: add Meta from client config (C) + if m := clientMeta; len(m) > 0 { + processors.add(clientEventMeta(m, needsCopy)) + } + + // setup 4, 5: pipeline tags + client tags + var tags []string + tags = append(tags, pp.tags...) + tags = append(tags, cfg.EventMetadata.Tags...) + if len(tags) > 0 { + processors.add(actions.NewAddTags("tags", tags)) + } + + // setup 3, 4, 5: client config fields + pipeline fields + client fields + dyn metadata + fields := cfg.Fields.Clone() + fields.DeepUpdate(pp.fields.Clone()) + if em := cfg.EventMetadata; len(em.Fields) > 0 { + common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) + } + + if len(fields) > 0 { + // Enforce a copy of fields if dynamic fields are configured or agent + // metadata will be merged into the fields. + // With dynamic fields potentially changing at any time, we need to copy, + // so we do not change shared structures be accident. + fieldsNeedsCopy := needsCopy || cfg.DynamicFields != nil || hasKeyAnyOf(fields, pp.builtinMeta) + processors.add(actions.NewAddFields(fields, fieldsNeedsCopy)) + } + + if cfg.DynamicFields != nil { + checkCopy := func(m common.MapStr) bool { + return needsCopy || hasKeyAnyOf(m, pp.builtinMeta) + } + processors.add(makeAddDynMetaProcessor("dynamicFields", cfg.DynamicFields, checkCopy)) + } + + // setup 5: client processor list + processors.add(localProcessors) + + // setup 6: add beats and host metadata + if meta := pp.builtinMeta; len(meta) > 0 { + processors.add(actions.NewAddFields(meta, needsCopy)) + } + + // setup 8: pipeline processors list + processors.add(pp.processors) + + // setup 9: debug print final event (P) + if logp.IsDebug("publish") { + processors.add(debugPrintProcessor(pp.info, pp.log)) + } + + // setup 10: drop all events if outputs are disabled (P) + if drop { + processors.add(dropDisabledProcessor) + } + + return processors, nil +} + +func makeClientProcessors( + log *logp.Logger, + cfg beat.ProcessingConfig, +) processors.Processor { + procs := cfg.Processor + if procs == nil || len(procs.All()) == 0 { + return nil + } + + p := newProgram("client", log) + p.list = procs.All() + return p +} diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go new file mode 100644 index 000000000000..15e77d020939 --- /dev/null +++ b/libbeat/publisher/processing/default_test.go @@ -0,0 +1,361 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processing + +import ( + "encoding/json" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors/actions" +) + +func TestProcessorsConfigs(t *testing.T) { + defaultInfo := beat.Info{ + Beat: "test", + EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), + Hostname: "test.host.name", + ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), + Name: "test.host.name", + Version: "0.1", + } + + cases := map[string]struct { + factory SupporterFactory + global string + local beat.ProcessingConfig + drop bool + event string + want common.MapStr + wantMeta common.MapStr + infoMod func(beat.Info) beat.Info + }{ + "user global fields and tags": { + global: "{fields: {global: 1}, fields_under_root: true, tags: [tag]}", + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "global": uint64(1), + "tags": []string{"tag"}, + }, + }, + "beat local fields": { + global: "", + local: beat.ProcessingConfig{ + Fields: common.MapStr{"local": 1}, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "local": 1, + }, + }, + "beat local and user global fields": { + global: "{fields: {global: 1}, fields_under_root: true, tags: [tag]}", + local: beat.ProcessingConfig{ + Fields: common.MapStr{"local": 1}, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "global": uint64(1), + "local": 1, + "tags": []string{"tag"}, + }, + }, + "user global fields overwrite beat local fields": { + global: "{fields: {global: a, shared: global}, fields_under_root: true}", + local: beat.ProcessingConfig{ + Fields: common.MapStr{"local": "b", "shared": "local"}, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "local": "b", + "global": "a", + "shared": "global", + }, + }, + "user local fields and tags": { + local: beat.ProcessingConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": "a"}, + Tags: []string{"tag"}, + }, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "fields": common.MapStr{ + "local": "a", + }, + "tags": []string{"tag"}, + }, + }, + "user local fields (under root) and tags": { + local: beat.ProcessingConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{"local": "a"}, + FieldsUnderRoot: true, + Tags: []string{"tag"}, + }, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "local": "a", + "tags": []string{"tag"}, + }, + }, + "user local fields overwrite user global fields": { + global: `{fields: {global: a, shared: global}, fields_under_root: true, tags: [global]}`, + local: beat.ProcessingConfig{ + EventMetadata: common.EventMetadata{ + Fields: common.MapStr{ + "local": "a", + "shared": "local", + }, + FieldsUnderRoot: true, + Tags: []string{"local"}, + }, + }, + event: `{"value": "abc"}`, + want: common.MapStr{ + "value": "abc", + "global": "a", + "local": "a", + "shared": "local", + "tags": []string{"global", "local"}, + }, + }, + "with client metadata": { + local: beat.ProcessingConfig{ + Meta: common.MapStr{"index": "test"}, + }, + event: `{"value": "abc"}`, + want: common.MapStr{"value": "abc"}, + wantMeta: common.MapStr{"index": "test"}, + }, + "with client processor": { + local: beat.ProcessingConfig{ + Processor: func() beat.ProcessorList { + p := newProgram("test", logp.L()) + p.add(actions.NewAddFields(common.MapStr{"custom": "value"}, true)) + return p + }(), + }, + event: `{"value": "abc"}`, + want: common.MapStr{"value": "abc", "custom": "value"}, + }, + "with beat default fields": { + factory: NewBeatSupport(), + global: `{fields: {global: a, agent.foo: bar}, fields_under_root: true, tags: [tag]}`, + event: `{"value": "abc"}`, + want: common.MapStr{ + "ecs": ecsVersion, + "host": common.MapStr{ + "name": "test.host.name", + }, + "agent": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "type": "test", + "version": "0.1", + "foo": "bar", + }, + "value": "abc", + "global": "a", + "tags": []string{"tag"}, + }, + }, + "with beat default fields and custom name": { + factory: NewBeatSupport(), + global: `{fields: {global: a, agent.foo: bar}, fields_under_root: true, tags: [tag]}`, + event: `{"value": "abc"}`, + infoMod: func(info beat.Info) beat.Info { + info.Name = "other.test.host.name" + return info + }, + want: common.MapStr{ + "ecs": ecsVersion, + "host": common.MapStr{ + "name": "other.test.host.name", + }, + "agent": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "name": "other.test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "type": "test", + "version": "0.1", + "foo": "bar", + }, + "value": "abc", + "global": "a", + "tags": []string{"tag"}, + }, + }, + "with observer default fields": { + factory: NewObserverSupport(false), + global: `{fields: {global: a, observer.foo: bar}, fields_under_root: true, tags: [tag]}`, + event: `{"value": "abc"}`, + want: common.MapStr{ + "ecs": ecsVersion, + "observer": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "type": "test", + "version": "0.1", + "foo": "bar", + }, + "value": "abc", + "global": "a", + "tags": []string{"tag"}, + }, + }, + } + + for name, test := range cases { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + + cfg, err := common.NewConfigWithYAML([]byte(test.global), "test") + require.NoError(t, err) + + info := defaultInfo + if test.infoMod != nil { + info = test.infoMod(info) + } + + factory := test.factory + if factory == nil { + factory = NewDefaultSupport(true) + } + + support, err := factory(info, logp.L(), cfg) + require.NoError(t, err) + + prog, err := support(test.local, test.drop) + require.NoError(t, err) + + actual, err := prog.Run(&beat.Event{ + Timestamp: time.Now(), + Fields: fromJSON(test.event), + }) + require.NoError(t, err) + + // validate + assert.Equal(t, test.want, actual.Fields) + assert.Equal(t, test.wantMeta, actual.Meta) + }) + } +} + +func TestNormalization(t *testing.T) { + cases := map[string]struct { + normalize bool + in common.MapStr + mod common.MapStr + want common.MapStr + }{ + "no sharing if normalized": { + normalize: true, + in: common.MapStr{"a": "b"}, + mod: common.MapStr{"change": "x"}, + want: common.MapStr{"a": "b"}, + }, + "data sharing if not normalized": { + normalize: false, + in: common.MapStr{"a": "b"}, + mod: common.MapStr{"change": "x"}, + want: common.MapStr{"a": "b", "change": "x"}, + }, + } + + for name, test := range cases { + test := test + t.Run(name, func(t *testing.T) { + t.Parallel() + + s, err := NewDefaultSupport(test.normalize)(beat.Info{}, logp.L(), common.NewConfig()) + require.NoError(t, err) + + prog, err := s(beat.ProcessingConfig{}, false) + require.NoError(t, err) + + fields := test.in.Clone() + actual, err := prog.Run(&beat.Event{Fields: fields}) + require.NoError(t, err) + require.NotNil(t, actual) + + fields.DeepUpdate(test.mod) + assert.Equal(t, test.want, actual.Fields) + }) + } +} + +func TestAlwaysDrop(t *testing.T) { + s, err := NewDefaultSupport(true)(beat.Info{}, logp.L(), common.NewConfig()) + require.NoError(t, err) + + prog, err := s(beat.ProcessingConfig{}, true) + require.NoError(t, err) + + actual, err := prog.Run(&beat.Event{}) + require.NoError(t, err) + assert.Nil(t, actual) +} + +func TestDynamicFields(t *testing.T) { + factory, err := NewDefaultSupport(true)(beat.Info{}, logp.L(), common.NewConfig()) + require.NoError(t, err) + + dynFields := common.NewMapStrPointer(common.MapStr{}) + prog, err := factory(beat.ProcessingConfig{ + DynamicFields: &dynFields, + }, false) + require.NoError(t, err) + + actual, err := prog.Run(&beat.Event{Fields: common.MapStr{"hello": "world"}}) + require.NoError(t, err) + assert.Equal(t, common.MapStr{"hello": "world"}, actual.Fields) + + dynFields.Set(common.MapStr{"dyn": "field"}) + actual, err = prog.Run(&beat.Event{Fields: common.MapStr{"hello": "world"}}) + require.NoError(t, err) + assert.Equal(t, common.MapStr{"hello": "world", "dyn": "field"}, actual.Fields) +} + +func fromJSON(in string) common.MapStr { + var tmp common.MapStr + err := json.Unmarshal([]byte(in), &tmp) + if err != nil { + panic(err) + } + return tmp +} diff --git a/libbeat/publisher/processing/processing.go b/libbeat/publisher/processing/processing.go new file mode 100644 index 000000000000..2c8d9feaba54 --- /dev/null +++ b/libbeat/publisher/processing/processing.go @@ -0,0 +1,28 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package processing + +import ( + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +type SupporterFactory func(info beat.Info, log *logp.Logger, cfg *common.Config) (Supporter, error) + +type Supporter func(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) diff --git a/libbeat/publisher/pipeline/processor.go b/libbeat/publisher/processing/processors.go similarity index 52% rename from libbeat/publisher/pipeline/processor.go rename to libbeat/publisher/processing/processors.go index 81834f7bdff5..18d77dd68270 100644 --- a/libbeat/publisher/pipeline/processor.go +++ b/libbeat/publisher/processing/processors.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package pipeline +package processing import ( "fmt" @@ -27,7 +27,6 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs/codec/json" "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/processors/actions" ) type program struct { @@ -41,113 +40,25 @@ type processorFn struct { fn func(event *beat.Event) (*beat.Event, error) } -// newProcessorPipeline prepares the processor pipeline, merging -// post processing, event annotations and actual configured processors. -// The pipeline generated ensure the client and pipeline processors -// will see the complete events with all meta data applied. -// -// Pipeline (C=client, P=pipeline) -// -// 1. (P) generalize/normalize event -// 2. (C) add Meta from client Config to event.Meta -// 3. (C) add Fields from client config to event.Fields -// 4. (P) add pipeline fields + tags -// 5. (C) add client fields + tags -// 6. (C) client processors list -// 7. (P) add beats metadata -// 8. (P) pipeline processors list -// 9. (P) (if publish/debug enabled) log event -// 10. (P) (if output disabled) dropEvent -func newProcessorPipeline( - info beat.Info, - monitors Monitors, - global pipelineProcessors, - config beat.ClientConfig, -) beat.Processor { - var ( - // pipeline processors - processors = &program{ - title: "processPipeline", - log: monitors.Logger, - } - - // client fields and metadata - clientMeta = config.Meta - localProcessors = makeClientProcessors(monitors, config) - ) - - needsCopy := global.alwaysCopy || localProcessors != nil || global.processors != nil - - if !config.SkipNormalization { - // setup 1: generalize/normalize output (P) - processors.add(generalizeProcessor) - } - - // setup 2: add Meta from client config (C) - if m := clientMeta; len(m) > 0 { - processors.add(clientEventMeta(m, needsCopy)) - } - - // setup 4, 5: pipeline tags + client tags - var tags []string - tags = append(tags, global.tags...) - tags = append(tags, config.EventMetadata.Tags...) - if len(tags) > 0 { - processors.add(actions.NewAddTags("tags", tags)) - } - - // setup 3, 4, 5: client config fields + pipeline fields + client fields + dyn metadata - fields := config.Fields.Clone() - fields.DeepUpdate(global.fields.Clone()) - if em := config.EventMetadata; len(em.Fields) > 0 { - common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) - } - - if len(fields) > 0 { - // Enforce a copy of fields if dynamic fields are configured or agent - // metadata will be merged into the fields. - // With dynamic fields potentially changing at any time, we need to copy, - // so we do not change shared structures be accident. - fieldsNeedsCopy := needsCopy || config.DynamicFields != nil || fields["agent"] != nil - processors.add(actions.NewAddFields(fields, fieldsNeedsCopy)) - } - - if config.DynamicFields != nil { - checkCopy := func(m common.MapStr) bool { - return needsCopy || hasKey(m, "agent") - } - processors.add(makeAddDynMetaProcessor("dynamicFields", config.DynamicFields, checkCopy)) - } - - // setup 5: client processor list - processors.add(localProcessors) - - // setup 6: add beats and host metadata - if meta := global.builtinMeta; len(meta) > 0 { - processors.add(actions.NewAddFields(meta, needsCopy)) - } - - // setup 7: add agent metadata - if !config.SkipAgentMetadata { - needsCopy := global.alwaysCopy || global.processors != nil - processors.add(actions.NewAddFields(createAgentFields(info), needsCopy)) +var generalizeProcessor = newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) { + // Filter out empty events. Empty events are still reported by ACK callbacks. + if len(event.Fields) == 0 { + return nil, nil } - // setup 8: pipeline processors list - processors.add(global.processors) - - // setup 9: debug print final event (P) - if logp.IsDebug("publish") { - processors.add(debugPrintProcessor(info, monitors)) + fields := common.ConvertToGenericEvent(event.Fields) + if fields == nil { + logp.Err("fail to convert to generic event") + return nil, nil } - // setup 10: drop all events if outputs are disabled (P) - if global.disabled { - processors.add(dropDisabledProcessor) - } + event.Fields = fields + return event, nil +}) - return processors -} +var dropDisabledProcessor = newProcessor("dropDisabled", func(event *beat.Event) (*beat.Event, error) { + return nil, nil +}) func newProgram(title string, log *logp.Logger) *program { return &program{ @@ -175,6 +86,10 @@ func (p *program) String() string { return fmt.Sprintf("%v{%v}", p.title, str) } +func (p *program) All() []beat.Processor { + return p.list +} + func (p *program) Run(event *beat.Event) (*beat.Event, error) { if p == nil || len(p.list) == 0 { return event, nil @@ -215,27 +130,6 @@ func newAnnotateProcessor(name string, fn func(*beat.Event)) *processorFn { func (p *processorFn) String() string { return p.name } func (p *processorFn) Run(e *beat.Event) (*beat.Event, error) { return p.fn(e) } -var generalizeProcessor = newProcessor("generalizeEvent", func(event *beat.Event) (*beat.Event, error) { - - // Filter out empty events. Empty events are still reported by ACK callbacks. - if len(event.Fields) == 0 { - return nil, nil - } - - fields := common.ConvertToGenericEvent(event.Fields) - if fields == nil { - logp.Err("fail to convert to generic event") - return nil, nil - } - - event.Fields = fields - return event, nil -}) - -var dropDisabledProcessor = newProcessor("dropDisabled", func(event *beat.Event) (*beat.Event, error) { - return nil, nil -}) - func clientEventMeta(meta common.MapStr, needsCopy bool) *processorFn { fn := func(event *beat.Event) { addMeta(event, meta) } if needsCopy { @@ -268,22 +162,7 @@ func makeAddDynMetaProcessor( }) } -func createAgentFields(info beat.Info) common.MapStr { - metadata := common.MapStr{ - "type": info.Beat, - "ephemeral_id": info.EphemeralID.String(), - "hostname": info.Hostname, - "id": info.ID.String(), - "version": info.Version, - } - if info.Name != info.Hostname { - metadata.Put("name", info.Name) - } - - return common.MapStr{"agent": metadata} -} - -func debugPrintProcessor(info beat.Info, monitors Monitors) *processorFn { +func debugPrintProcessor(info beat.Info, log *logp.Logger) *processorFn { // ensure only one go-routine is using the encoder (in case // beat.Client is shared between multiple go-routines by accident) var mux sync.Mutex @@ -292,7 +171,6 @@ func debugPrintProcessor(info beat.Info, monitors Monitors) *processorFn { Pretty: true, EscapeHTML: false, }) - log := monitors.Logger return newProcessor("debugPrint", func(event *beat.Event) (*beat.Event, error) { mux.Lock() defer mux.Unlock() @@ -307,21 +185,16 @@ func debugPrintProcessor(info beat.Info, monitors Monitors) *processorFn { }) } -func makeClientProcessors( - monitors Monitors, - config beat.ClientConfig, -) processors.Processor { - procs := config.Processor - if procs == nil || len(procs.All()) == 0 { - return nil - } - - p := newProgram("client", monitors.Logger) - p.list = procs.All() - return p -} - func hasKey(m common.MapStr, key string) bool { _, exists := m[key] return exists } + +func hasKeyAnyOf(m, builtin common.MapStr) bool { + for k := range builtin { + if hasKey(m, k) { + return true + } + } + return false +} diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index 2da153d98ea0..d00e524a5555 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -58,8 +58,10 @@ func NewConnector(pipeline beat.Pipeline, c *common.Config, dynFields *common.Ma func (c *Connector) Connect() (beat.Client, error) { return c.pipeline.ConnectWith(beat.ClientConfig{ - EventMetadata: c.eventMeta, - Processor: c.processors, - DynamicFields: c.dynamicFields, + Processing: beat.ProcessingConfig{ + EventMetadata: c.eventMeta, + Processor: c.processors, + DynamicFields: c.dynamicFields, + }, }) } diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index becc53cf65bd..a4e812ad66ea 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -178,8 +178,10 @@ func (pb *packetbeat) setupFlows() error { } client, err := pb.pipeline.ConnectWith(beat.ClientConfig{ - EventMetadata: config.Flows.EventMetadata, - Processor: processors, + Processing: beat.ProcessingConfig{ + EventMetadata: config.Flows.EventMetadata, + Processor: processors, + }, }) if err != nil { return err diff --git a/packetbeat/publish/publish.go b/packetbeat/publish/publish.go index 0b34715e1b9c..db2648520363 100644 --- a/packetbeat/publish/publish.go +++ b/packetbeat/publish/publish.go @@ -97,8 +97,10 @@ func (p *TransactionPublisher) CreateReporter( } clientConfig := beat.ClientConfig{ - EventMetadata: meta.Event, - Processor: processors, + Processing: beat.ProcessingConfig{ + EventMetadata: meta.Event, + Processor: processors, + }, } if p.canDrop { clientConfig.PublishMode = beat.DropIfFull diff --git a/winlogbeat/beater/eventlogger.go b/winlogbeat/beater/eventlogger.go index 59c9ecb1f906..0eb9ec8c13bf 100644 --- a/winlogbeat/beater/eventlogger.go +++ b/winlogbeat/beater/eventlogger.go @@ -64,10 +64,12 @@ func newEventLogger( func (e *eventLogger) connect(pipeline beat.Pipeline) (beat.Client, error) { api := e.source.Name() return pipeline.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - EventMetadata: e.eventMeta, - Meta: nil, // TODO: configure modules/ES ingest pipeline? - Processor: e.processors, + PublishMode: beat.GuaranteedSend, + Processing: beat.ProcessingConfig{ + EventMetadata: e.eventMeta, + Meta: nil, // TODO: configure modules/ES ingest pipeline? + Processor: e.processors, + }, ACKCount: func(n int) { addPublished(api, n) logp.Info("EventLog[%s] successfully published %d events", api, n) From 4c80ef3286ee5d5bc6742d76a7adfd7fac77d639 Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 18 Feb 2019 20:45:58 +0100 Subject: [PATCH 02/13] fix stress runner --- libbeat/publisher/pipeline/stress/run.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/pipeline/stress/run.go b/libbeat/publisher/pipeline/stress/run.go index 3ebce0351f1e..b6e521073ce2 100644 --- a/libbeat/publisher/pipeline/stress/run.go +++ b/libbeat/publisher/pipeline/stress/run.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/publisher/pipeline" + "github.com/elastic/beats/libbeat/publisher/processing" ) type config struct { @@ -58,13 +59,21 @@ func RunTests( return fmt.Errorf("unpacking config failed: %v", err) } + log := logp.L() + + processing, err := processing.NewDefaultSupport(false)(info, log, cfg) + if err != nil { + return err + } + pipeline, err := pipeline.Load(info, pipeline.Monitors{ Metrics: nil, Telemetry: nil, - Logger: logp.L(), + Logger: log, }, config.Pipeline, + processing, func(stat outputs.Observer) (string, outputs.Group, error) { cfg := config.Output out, err := outputs.Load(nil, info, stat, cfg.Name(), cfg.Config()) From ccfeeac99b984ad2cc9c35c3c6a75b0316a3cef8 Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 18 Feb 2019 20:55:28 +0100 Subject: [PATCH 03/13] fix functionbeat build --- x-pack/functionbeat/beater/functionbeat.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/functionbeat/beater/functionbeat.go b/x-pack/functionbeat/beater/functionbeat.go index 61b61cf03442..b62e8cd704d9 100644 --- a/x-pack/functionbeat/beater/functionbeat.go +++ b/x-pack/functionbeat/beater/functionbeat.go @@ -143,9 +143,11 @@ func makeClientFactory(log *logp.Logger, manager *licenser.Manager, pipeline bea } client, err := core.NewSyncClient(log, pipeline, beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - Processor: processors, - EventMetadata: c.EventMetadata, + PublishMode: beat.GuaranteedSend, + Processing: beat.ProcessingConfig{ + Processor: processors, + EventMetadata: c.EventMetadata, + }, }) if err != nil { From 0c3f2930a2891795d96469e36b65afb29fe5419f Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 21 Feb 2019 13:28:05 +0100 Subject: [PATCH 04/13] fix default ecs fields --- libbeat/publisher/processing/default.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index cda5e8021cd7..050c7b1d1a4d 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -116,11 +116,11 @@ func WithFields(fields common.MapStr) func(beat.Info) common.MapStr { } } -func WithSchema(key, version string) func(beat.Info) common.MapStr { - return WithFields(common.MapStr{key: version}) -} - -var WithECS = WithSchema("ecs", ecsVersion) +var WithECS = WithFields(common.MapStr{ + "ecs": common.MapStr{ + "version": ecsVersion, + }, +}) func WithHost(info beat.Info) common.MapStr { return common.MapStr{ From 95dd496b14899e1396e3b8e0d811fd5b3b40e26c Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 21 Feb 2019 14:46:54 +0100 Subject: [PATCH 05/13] and unit tests --- libbeat/publisher/processing/default_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 15e77d020939..9e820d130f3b 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -42,6 +42,8 @@ func TestProcessorsConfigs(t *testing.T) { Version: "0.1", } + ecsFields := common.MapStr{"version": ecsVersion} + cases := map[string]struct { factory SupporterFactory global string @@ -174,7 +176,7 @@ func TestProcessorsConfigs(t *testing.T) { global: `{fields: {global: a, agent.foo: bar}, fields_under_root: true, tags: [tag]}`, event: `{"value": "abc"}`, want: common.MapStr{ - "ecs": ecsVersion, + "ecs": ecsFields, "host": common.MapStr{ "name": "test.host.name", }, @@ -200,7 +202,7 @@ func TestProcessorsConfigs(t *testing.T) { return info }, want: common.MapStr{ - "ecs": ecsVersion, + "ecs": ecsFields, "host": common.MapStr{ "name": "other.test.host.name", }, @@ -223,7 +225,7 @@ func TestProcessorsConfigs(t *testing.T) { global: `{fields: {global: a, observer.foo: bar}, fields_under_root: true, tags: [tag]}`, event: `{"value": "abc"}`, want: common.MapStr{ - "ecs": ecsVersion, + "ecs": ecsFields, "observer": common.MapStr{ "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", "hostname": "test.host.name", From d58a611079057b560ddfb1ee01326f7d8f046c86 Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 22 Feb 2019 09:24:18 +0100 Subject: [PATCH 06/13] Fix debug printer to respect the logging selector --- libbeat/monitoring/report/elasticsearch/elasticsearch.go | 9 ++++++++- libbeat/publisher/processing/default.go | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index c53d62639620..fe68de83fc81 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -37,6 +37,7 @@ import ( "github.com/elastic/beats/libbeat/outputs/outil" "github.com/elastic/beats/libbeat/outputs/transport" "github.com/elastic/beats/libbeat/publisher/pipeline" + "github.com/elastic/beats/libbeat/publisher/processing" "github.com/elastic/beats/libbeat/publisher/queue" "github.com/elastic/beats/libbeat/publisher/queue/memqueue" ) @@ -169,11 +170,16 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) outClient := outputs.NewFailoverClient(clients) outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max) + processing, err := processing.NewDefaultSupport(true)(beat, log, common.NewConfig()) + if err != nil { + return nil, err + } + pipeline, err := pipeline.New( beat, pipeline.Monitors{ Metrics: monitoring, - Logger: logp.NewLogger(selector), + Logger: log, }, queueFactory, outputs.Group{ @@ -184,6 +190,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) pipeline.Settings{ WaitClose: 0, WaitCloseMode: pipeline.NoWaitOnClose, + Processors: processing, }) if err != nil { return nil, err diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index 050c7b1d1a4d..893c6864a39c 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -234,7 +234,7 @@ func (pp *pipelineProcessors) build( processors.add(pp.processors) // setup 9: debug print final event (P) - if logp.IsDebug("publish") { + if pp.log.IsDebug() { processors.add(debugPrintProcessor(pp.info, pp.log)) } From 02e216a253a1adfd928fbcbf67db0cb2739e22d8 Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 22 Feb 2019 13:40:08 +0100 Subject: [PATCH 07/13] fix error return --- libbeat/cmd/instance/beat.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 198dea7806c6..97b548a70723 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -597,6 +597,9 @@ func (b *Beat) configure(settings Settings) error { imFactory = idxmgmt.MakeDefaultSupport(settings.ILM) } b.index, err = imFactory(nil, b.Beat.Info, b.RawConfig) + if err != nil { + return err + } processingFactory := settings.Processing if processingFactory == nil { From c6ecd82453b83d60ffd0a99e9e608997f81da9cb Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 22 Feb 2019 16:24:19 +0100 Subject: [PATCH 08/13] godoc --- libbeat/publisher/processing/default.go | 88 ++++++++++++++++++---- libbeat/publisher/processing/processing.go | 8 ++ 2 files changed, 81 insertions(+), 15 deletions(-) diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index 893c6864a39c..dbebaef39b94 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -36,6 +36,7 @@ type pipelineProcessors struct { // The pipeline its processor settings for // constructing the clients complete processor // pipeline on connect. + modifiers []modifier builtinMeta common.MapStr fields common.MapStr tags []string @@ -48,17 +49,41 @@ type pipelineProcessors struct { const ecsVersion = "1.0.0-beta2" +type modifier interface { + // BuiltinFields defines global fields to be added to every event. + BuiltinFields(beat.Info) common.MapStr + + // ClientFields defines connection local fields to be added to each event + // of a pipieline client. + ClientFields(beat.Info, beat.ProcessingConfig) common.MapStr +} + +type builtinModifier func(beat.Info) common.MapStr + +// NewObserverSupport creates a new SupporterFactory based on NewDefaultSupport. +// NewObserverSupport automatically adds the `ecs.version`, `host.name` and `agent.X` fields +// to each event. func NewBeatSupport() SupporterFactory { return NewDefaultSupport(true, WithECS, WithHost, WithBeatMeta("agent")) } +// NewObserverSupport creates a new SupporterFactory based on NewDefaultSupport. +// NewObserverSupport automatically adds the `ecs.version` and `observer.X` fields +// to each event. func NewObserverSupport(normalize bool) SupporterFactory { return NewDefaultSupport(normalize, WithECS, WithBeatMeta("observer")) } +// NewDefaultSupport creates a new SupporterFactory for use with the publisher pipeline. +// If normalize is set, events will be normalized first before being presented +// to the actual processors. +// The Supporter will apply the global `fields`, `fields_under_root`, `tags` +// and `processor` settings to the event processing pipeline to be generated. +// Use WithFields, WithBeatMeta, and other to declare the builtin fields to be added +// to each event. Builtin fields can be modified using global `processors`, and `fields` only. func NewDefaultSupport( normalize bool, - makeBuiltin ...func(beat.Info) common.MapStr, + modifiers ...modifier, ) SupporterFactory { return func(info beat.Info, log *logp.Logger, beatCfg *common.Config) (Supporter, error) { cfg := struct { @@ -89,8 +114,11 @@ func NewDefaultSupport( } builtin := common.MapStr{} - for _, mk := range makeBuiltin { - builtin.DeepUpdate(mk(info).Clone()) + for _, mod := range modifiers { + m := mod.BuiltinFields(info) + if len(m) > 0 { + builtin.DeepUpdate(m.Clone()) + } } if len(builtin) > 0 { p.builtinMeta = builtin @@ -110,28 +138,33 @@ func NewDefaultSupport( } } -func WithFields(fields common.MapStr) func(beat.Info) common.MapStr { - return func(_ beat.Info) common.MapStr { +// WithFields creates a modifier with the given default builtin fields. +func WithFields(fields common.MapStr) modifier { + return builtinModifier(func(_ beat.Info) common.MapStr { return fields - } + }) } -var WithECS = WithFields(common.MapStr{ +// WithECS modifier adds `ecs.version` builtin fields to a processing pipeline. +var WithECS modifier = WithFields(common.MapStr{ "ecs": common.MapStr{ "version": ecsVersion, }, }) -func WithHost(info beat.Info) common.MapStr { +// WithHost modifier adds `host.name` builtin fields to a processing pipeline +var WithHost modifier = builtinModifier(func(info beat.Info) common.MapStr { return common.MapStr{ "host": common.MapStr{ "name": info.Name, }, } -} +}) -func WithBeatMeta(key string) func(beat.Info) common.MapStr { - return func(info beat.Info) common.MapStr { +// WithBeatMeta adds beat meta information as builtin fields to a processing pipeline. +// The `key` parameter defines the field to be used. +func WithBeatMeta(key string) modifier { + return builtinModifier(func(info beat.Info) common.MapStr { metadata := common.MapStr{ "type": info.Beat, "ephemeral_id": info.EphemeralID.String(), @@ -143,7 +176,7 @@ func WithBeatMeta(key string) func(beat.Info) common.MapStr { metadata.Put("name", info.Name) } return common.MapStr{key: metadata} - } + }) } // build prepares the processor pipeline, merging @@ -181,6 +214,23 @@ func (pp *pipelineProcessors) build( needsCopy := pp.alwaysCopy || localProcessors != nil || pp.processors != nil + builtin := pp.builtinMeta + var clientFields common.MapStr + for _, mod := range pp.modifiers { + m := mod.ClientFields(pp.info, cfg) + if len(m) > 0 { + if clientFields == nil { + clientFields = common.MapStr{} + } + clientFields.DeepUpdate(m.Clone()) + } + } + if len(clientFields) > 0 { + tmp := builtin.Clone() + tmp.DeepUpdate(clientFields) + builtin = tmp + } + if !pp.skipNormalize { // setup 1: generalize/normalize output (P) processors.add(generalizeProcessor) @@ -211,13 +261,13 @@ func (pp *pipelineProcessors) build( // metadata will be merged into the fields. // With dynamic fields potentially changing at any time, we need to copy, // so we do not change shared structures be accident. - fieldsNeedsCopy := needsCopy || cfg.DynamicFields != nil || hasKeyAnyOf(fields, pp.builtinMeta) + fieldsNeedsCopy := needsCopy || cfg.DynamicFields != nil || hasKeyAnyOf(fields, builtin) processors.add(actions.NewAddFields(fields, fieldsNeedsCopy)) } if cfg.DynamicFields != nil { checkCopy := func(m common.MapStr) bool { - return needsCopy || hasKeyAnyOf(m, pp.builtinMeta) + return needsCopy || hasKeyAnyOf(m, builtin) } processors.add(makeAddDynMetaProcessor("dynamicFields", cfg.DynamicFields, checkCopy)) } @@ -226,7 +276,7 @@ func (pp *pipelineProcessors) build( processors.add(localProcessors) // setup 6: add beats and host metadata - if meta := pp.builtinMeta; len(meta) > 0 { + if meta := builtin; len(meta) > 0 { processors.add(actions.NewAddFields(meta, needsCopy)) } @@ -259,3 +309,11 @@ func makeClientProcessors( p.list = procs.All() return p } + +func (b builtinModifier) BuiltinFields(info beat.Info) common.MapStr { + return b(info) +} + +func (b builtinModifier) ClientFields(_ beat.Info, _ beat.ProcessingConfig) common.MapStr { + return nil +} diff --git a/libbeat/publisher/processing/processing.go b/libbeat/publisher/processing/processing.go index 2c8d9feaba54..f6a8f707cd61 100644 --- a/libbeat/publisher/processing/processing.go +++ b/libbeat/publisher/processing/processing.go @@ -23,6 +23,14 @@ import ( "github.com/elastic/beats/libbeat/logp" ) +// SupporterFactory creates a new processing Supporter that can be used with +// the publisher pipeline. The factory gets the global configuration passed, +// in order to configure some shared global event processing. type SupporterFactory func(info beat.Info, log *logp.Logger, cfg *common.Config) (Supporter, error) +// Supporter is used to create an event processing pipeline. It is used by the +// publisher pipeline when a client connects to the pipeline. The supporter +// will merge the global and local configurations into a common event +// processor. +// If `drop` is set, then the processor generated must always drop all events. type Supporter func(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) From 53202ace6f7c661ea9dc53d7395f96f3a28a44e7 Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 22 Feb 2019 16:27:02 +0100 Subject: [PATCH 09/13] typo --- libbeat/publisher/processing/default.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index dbebaef39b94..a32114951dc6 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -60,8 +60,8 @@ type modifier interface { type builtinModifier func(beat.Info) common.MapStr -// NewObserverSupport creates a new SupporterFactory based on NewDefaultSupport. -// NewObserverSupport automatically adds the `ecs.version`, `host.name` and `agent.X` fields +// NewBeatSupport creates a new SupporterFactory based on NewDefaultSupport. +// NewBeatSupport automatically adds the `ecs.version`, `host.name` and `agent.X` fields // to each event. func NewBeatSupport() SupporterFactory { return NewDefaultSupport(true, WithECS, WithHost, WithBeatMeta("agent")) From 73895c8a8b855937732b79d78fb437b4e9f17fbe Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 25 Feb 2019 19:02:02 +0100 Subject: [PATCH 10/13] turn processing.Supporter into interface --- libbeat/publisher/pipeline/pipeline.go | 2 +- libbeat/publisher/processing/default.go | 6 +++--- libbeat/publisher/processing/default_test.go | 8 ++++---- libbeat/publisher/processing/processing.go | 4 +++- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index de6602e886e3..e9f1a39cd57d 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -319,7 +319,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { var processors beat.Processor if p.processors != nil { - proc, err := p.processors(cfg.Processing, publishDisabled) + proc, err := p.processors.Create(cfg.Processing, publishDisabled) if err != nil { return nil, err } diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index a32114951dc6..4d7232c59011 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -99,7 +99,7 @@ func NewDefaultSupport( return nil, fmt.Errorf("error initializing processors: %v", err) } - p := pipelineProcessors{ + p := &pipelineProcessors{ skipNormalize: !normalize, log: log, } @@ -134,7 +134,7 @@ func NewDefaultSupport( p.tags = t } - return p.build, nil + return p, nil } } @@ -196,7 +196,7 @@ func WithBeatMeta(key string) modifier { // 8. (P) pipeline processors list // 9. (P) (if publish/debug enabled) log event // 10. (P) (if output disabled) dropEvent -func (pp *pipelineProcessors) build( +func (pp *pipelineProcessors) Create( cfg beat.ProcessingConfig, drop bool, ) (beat.Processor, error) { diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 9e820d130f3b..44bb1610503f 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -262,7 +262,7 @@ func TestProcessorsConfigs(t *testing.T) { support, err := factory(info, logp.L(), cfg) require.NoError(t, err) - prog, err := support(test.local, test.drop) + prog, err := support.Create(test.local, test.drop) require.NoError(t, err) actual, err := prog.Run(&beat.Event{ @@ -307,7 +307,7 @@ func TestNormalization(t *testing.T) { s, err := NewDefaultSupport(test.normalize)(beat.Info{}, logp.L(), common.NewConfig()) require.NoError(t, err) - prog, err := s(beat.ProcessingConfig{}, false) + prog, err := s.Create(beat.ProcessingConfig{}, false) require.NoError(t, err) fields := test.in.Clone() @@ -325,7 +325,7 @@ func TestAlwaysDrop(t *testing.T) { s, err := NewDefaultSupport(true)(beat.Info{}, logp.L(), common.NewConfig()) require.NoError(t, err) - prog, err := s(beat.ProcessingConfig{}, true) + prog, err := s.Create(beat.ProcessingConfig{}, true) require.NoError(t, err) actual, err := prog.Run(&beat.Event{}) @@ -338,7 +338,7 @@ func TestDynamicFields(t *testing.T) { require.NoError(t, err) dynFields := common.NewMapStrPointer(common.MapStr{}) - prog, err := factory(beat.ProcessingConfig{ + prog, err := factory.Create(beat.ProcessingConfig{ DynamicFields: &dynFields, }, false) require.NoError(t, err) diff --git a/libbeat/publisher/processing/processing.go b/libbeat/publisher/processing/processing.go index f6a8f707cd61..c849a5a7af2a 100644 --- a/libbeat/publisher/processing/processing.go +++ b/libbeat/publisher/processing/processing.go @@ -33,4 +33,6 @@ type SupporterFactory func(info beat.Info, log *logp.Logger, cfg *common.Config) // will merge the global and local configurations into a common event // processor. // If `drop` is set, then the processor generated must always drop all events. -type Supporter func(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) +type Supporter interface { + Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) +} From 7400c5243b32bba1aa8a86ccb1c06ccd4a4c23da Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 7 Mar 2019 04:16:13 +0100 Subject: [PATCH 11/13] review + minor cleanups --- libbeat/cmd/instance/beat.go | 2 +- libbeat/cmd/instance/settings.go | 2 +- .../report/elasticsearch/elasticsearch.go | 2 +- libbeat/publisher/pipeline/pipeline.go | 19 +- libbeat/publisher/pipeline/stress/run.go | 2 +- libbeat/publisher/processing/default.go | 170 +++++++++--------- libbeat/publisher/processing/default_test.go | 21 +-- libbeat/publisher/processing/processing.go | 4 +- libbeat/publisher/processing/processors.go | 14 +- 9 files changed, 122 insertions(+), 114 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 97b548a70723..520623a235da 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -603,7 +603,7 @@ func (b *Beat) configure(settings Settings) error { processingFactory := settings.Processing if processingFactory == nil { - processingFactory = processing.NewBeatSupport() + processingFactory = processing.MakeDefaultBeatSupport(true) } b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig) diff --git a/libbeat/cmd/instance/settings.go b/libbeat/cmd/instance/settings.go index dfb8735f26a6..765206fac8ab 100644 --- a/libbeat/cmd/instance/settings.go +++ b/libbeat/cmd/instance/settings.go @@ -42,5 +42,5 @@ type Settings struct { IndexManagement idxmgmt.SupportFactory ILM ilm.SupportFactory - Processing processing.SupporterFactory + Processing processing.SupportFactory } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index fe68de83fc81..46bd2104f5cb 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -170,7 +170,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) outClient := outputs.NewFailoverClient(clients) outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max) - processing, err := processing.NewDefaultSupport(true)(beat, log, common.NewConfig()) + processing, err := processing.MakeDefaultSupport(true)(beat, log, common.NewConfig()) if err != nil { return nil, err } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index e9f1a39cd57d..e8289e28ba52 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -317,16 +317,10 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { } } - var processors beat.Processor - if p.processors != nil { - proc, err := p.processors.Create(cfg.Processing, publishDisabled) - if err != nil { - return nil, err - } - - processors = proc + processors, err := p.createEventProcessing(cfg.Processing, publishDisabled) + if err != nil { + return nil, err } - acker := p.makeACKer(processors != nil, &cfg, waitClose) producerCfg := queue.ProducerConfig{ // Cancel events from queue if acker is configured @@ -368,6 +362,13 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { return client, nil } +func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) { + if p.processors == nil { + return nil, nil + } + return p.processors.Create(cfg, noPublish) +} + func (e *pipelineEventer) OnACK(n int) { e.observer.queueACKed(n) diff --git a/libbeat/publisher/pipeline/stress/run.go b/libbeat/publisher/pipeline/stress/run.go index b6e521073ce2..902d303e94f4 100644 --- a/libbeat/publisher/pipeline/stress/run.go +++ b/libbeat/publisher/pipeline/stress/run.go @@ -61,7 +61,7 @@ func RunTests( log := logp.L() - processing, err := processing.NewDefaultSupport(false)(info, log, cfg) + processing, err := processing.MakeDefaultSupport(false)(info, log, cfg) if err != nil { return err } diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index 4d7232c59011..6162ef6c57e3 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -20,6 +20,8 @@ package processing import ( "fmt" + "github.com/elastic/ecs/code/go/ecs" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -27,64 +29,66 @@ import ( "github.com/elastic/beats/libbeat/processors/actions" ) -type pipelineProcessors struct { +// builder is used to create the event processing pipeline in Beats. The +// builder orders and merges global and local (per client) event annotation +// settings, with the configured event processors into one common event +// processor for use with the publisher pipeline. +// Also See: (*builder).Create +type builder struct { info beat.Info log *logp.Logger skipNormalize bool - // The pipeline its processor settings for - // constructing the clients complete processor - // pipeline on connect. + // global pipeline fields and tags configurations modifiers []modifier builtinMeta common.MapStr fields common.MapStr tags []string - processors beat.Processor + // global pipeline processors + processors *group drop bool // disabled is set if outputs have been disabled via CLI alwaysCopy bool } -const ecsVersion = "1.0.0-beta2" - type modifier interface { // BuiltinFields defines global fields to be added to every event. BuiltinFields(beat.Info) common.MapStr // ClientFields defines connection local fields to be added to each event - // of a pipieline client. + // of a pipeline client. ClientFields(beat.Info, beat.ProcessingConfig) common.MapStr } type builtinModifier func(beat.Info) common.MapStr -// NewBeatSupport creates a new SupporterFactory based on NewDefaultSupport. -// NewBeatSupport automatically adds the `ecs.version`, `host.name` and `agent.X` fields +// MakeDefaultBeatSupport creates a new SupporterFactory based on NewDefaultSupport. +// MakeDefaultBeatSupport automatically adds the `ecs.version`, `host.name` and `agent.X` fields // to each event. -func NewBeatSupport() SupporterFactory { - return NewDefaultSupport(true, WithECS, WithHost, WithBeatMeta("agent")) +func MakeDefaultBeatSupport(normalize bool) SupportFactory { + return MakeDefaultSupport(normalize, WithECS, WithHost, WithBeatMeta("agent")) } -// NewObserverSupport creates a new SupporterFactory based on NewDefaultSupport. -// NewObserverSupport automatically adds the `ecs.version` and `observer.X` fields +// MakeDefaultObserverSupport creates a new SupporterFactory based on NewDefaultSupport. +// MakeDefaultObserverSupport automatically adds the `ecs.version` and `observer.X` fields // to each event. -func NewObserverSupport(normalize bool) SupporterFactory { - return NewDefaultSupport(normalize, WithECS, WithBeatMeta("observer")) +func MakeDefaultObserverSupport(normalize bool) SupportFactory { + return MakeDefaultSupport(normalize, WithECS, WithBeatMeta("observer")) } -// NewDefaultSupport creates a new SupporterFactory for use with the publisher pipeline. +// MakeDefaultSupport creates a new SupporterFactory for use with the publisher pipeline. // If normalize is set, events will be normalized first before being presented // to the actual processors. // The Supporter will apply the global `fields`, `fields_under_root`, `tags` // and `processor` settings to the event processing pipeline to be generated. // Use WithFields, WithBeatMeta, and other to declare the builtin fields to be added // to each event. Builtin fields can be modified using global `processors`, and `fields` only. -func NewDefaultSupport( +func MakeDefaultSupport( normalize bool, modifiers ...modifier, -) SupporterFactory { +) SupportFactory { return func(info beat.Info, log *logp.Logger, beatCfg *common.Config) (Supporter, error) { cfg := struct { common.EventMetadata `config:",inline"` // Fields and tags to add to each event. @@ -99,42 +103,7 @@ func NewDefaultSupport( return nil, fmt.Errorf("error initializing processors: %v", err) } - p := &pipelineProcessors{ - skipNormalize: !normalize, - log: log, - } - - hasProcessors := processors != nil && len(processors.List) > 0 - if hasProcessors { - tmp := newProgram("global", log) - for _, p := range processors.List { - tmp.add(p) - } - p.processors = tmp - } - - builtin := common.MapStr{} - for _, mod := range modifiers { - m := mod.BuiltinFields(info) - if len(m) > 0 { - builtin.DeepUpdate(m.Clone()) - } - } - if len(builtin) > 0 { - p.builtinMeta = builtin - } - - if em := cfg.EventMetadata; len(em.Fields) > 0 { - fields := common.MapStr{} - common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) - p.fields = fields - } - - if t := cfg.EventMetadata.Tags; len(t) > 0 { - p.tags = t - } - - return p, nil + return newBuilder(info, log, processors, cfg.EventMetadata, modifiers, !normalize), nil } } @@ -148,7 +117,7 @@ func WithFields(fields common.MapStr) modifier { // WithECS modifier adds `ecs.version` builtin fields to a processing pipeline. var WithECS modifier = WithFields(common.MapStr{ "ecs": common.MapStr{ - "version": ecsVersion, + "version": ecs.Version, }, }) @@ -179,13 +148,56 @@ func WithBeatMeta(key string) modifier { }) } -// build prepares the processor pipeline, merging -// post processing, event annotations and actual configured processors. -// The pipeline generated ensure the client and pipeline processors -// will see the complete events with all meta data applied. -// -// Pipeline (C=client, P=pipeline) +func newBuilder( + info beat.Info, + log *logp.Logger, + processors *processors.Processors, + eventMeta common.EventMetadata, + modifiers []modifier, + skipNormalize bool, +) *builder { + b := &builder{ + skipNormalize: skipNormalize, + modifiers: modifiers, + log: log, + } + + hasProcessors := processors != nil && len(processors.List) > 0 + if hasProcessors { + tmp := newGroup("global", log) + for _, p := range processors.List { + tmp.add(p) + } + b.processors = tmp + } + + builtin := common.MapStr{} + for _, mod := range modifiers { + m := mod.BuiltinFields(info) + if len(m) > 0 { + builtin.DeepUpdate(m.Clone()) + } + } + if len(builtin) > 0 { + b.builtinMeta = builtin + } + + if fields := eventMeta.Fields; len(fields) > 0 { + b.fields = common.MapStr{} + common.MergeFields(b.fields, fields.Clone(), eventMeta.FieldsUnderRoot) + } + + if t := eventMeta.Tags; len(t) > 0 { + b.tags = t + } + + return b +} + +// Create combines the builder configuration with the client settings +// in order to build the event processing pipeline. // +// Processing order (C=client, P=pipeline) // 1. (P) generalize/normalize event // 2. (C) add Meta from client Config to event.Meta // 3. (C) add Fields from client config to event.Fields @@ -196,28 +208,22 @@ func WithBeatMeta(key string) modifier { // 8. (P) pipeline processors list // 9. (P) (if publish/debug enabled) log event // 10. (P) (if output disabled) dropEvent -func (pp *pipelineProcessors) Create( - cfg beat.ProcessingConfig, - drop bool, -) (beat.Processor, error) { +func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) { var ( // pipeline processors - processors = &program{ - title: "processPipeline", - log: pp.log, - } + processors = newGroup("processPipeline", b.log) // client fields and metadata clientMeta = cfg.Meta - localProcessors = makeClientProcessors(pp.log, cfg) + localProcessors = makeClientProcessors(b.log, cfg) ) - needsCopy := pp.alwaysCopy || localProcessors != nil || pp.processors != nil + needsCopy := b.alwaysCopy || localProcessors != nil || b.processors != nil - builtin := pp.builtinMeta + builtin := b.builtinMeta var clientFields common.MapStr - for _, mod := range pp.modifiers { - m := mod.ClientFields(pp.info, cfg) + for _, mod := range b.modifiers { + m := mod.ClientFields(b.info, cfg) if len(m) > 0 { if clientFields == nil { clientFields = common.MapStr{} @@ -231,7 +237,7 @@ func (pp *pipelineProcessors) Create( builtin = tmp } - if !pp.skipNormalize { + if !b.skipNormalize { // setup 1: generalize/normalize output (P) processors.add(generalizeProcessor) } @@ -243,7 +249,7 @@ func (pp *pipelineProcessors) Create( // setup 4, 5: pipeline tags + client tags var tags []string - tags = append(tags, pp.tags...) + tags = append(tags, b.tags...) tags = append(tags, cfg.EventMetadata.Tags...) if len(tags) > 0 { processors.add(actions.NewAddTags("tags", tags)) @@ -251,7 +257,7 @@ func (pp *pipelineProcessors) Create( // setup 3, 4, 5: client config fields + pipeline fields + client fields + dyn metadata fields := cfg.Fields.Clone() - fields.DeepUpdate(pp.fields.Clone()) + fields.DeepUpdate(b.fields.Clone()) if em := cfg.EventMetadata; len(em.Fields) > 0 { common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) } @@ -281,11 +287,11 @@ func (pp *pipelineProcessors) Create( } // setup 8: pipeline processors list - processors.add(pp.processors) + processors.add(b.processors) // setup 9: debug print final event (P) - if pp.log.IsDebug() { - processors.add(debugPrintProcessor(pp.info, pp.log)) + if b.log.IsDebug() { + processors.add(debugPrintProcessor(b.info, b.log)) } // setup 10: drop all events if outputs are disabled (P) @@ -305,7 +311,7 @@ func makeClientProcessors( return nil } - p := newProgram("client", log) + p := newGroup("client", log) p.list = procs.All() return p } diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 44bb1610503f..0be5faa758c7 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors/actions" + "github.com/elastic/ecs/code/go/ecs" ) func TestProcessorsConfigs(t *testing.T) { @@ -42,10 +43,10 @@ func TestProcessorsConfigs(t *testing.T) { Version: "0.1", } - ecsFields := common.MapStr{"version": ecsVersion} + ecsFields := common.MapStr{"version": ecs.Version} cases := map[string]struct { - factory SupporterFactory + factory SupportFactory global string local beat.ProcessingConfig drop bool @@ -163,7 +164,7 @@ func TestProcessorsConfigs(t *testing.T) { "with client processor": { local: beat.ProcessingConfig{ Processor: func() beat.ProcessorList { - p := newProgram("test", logp.L()) + p := newGroup("test", logp.L()) p.add(actions.NewAddFields(common.MapStr{"custom": "value"}, true)) return p }(), @@ -172,7 +173,7 @@ func TestProcessorsConfigs(t *testing.T) { want: common.MapStr{"value": "abc", "custom": "value"}, }, "with beat default fields": { - factory: NewBeatSupport(), + factory: MakeDefaultBeatSupport(true), global: `{fields: {global: a, agent.foo: bar}, fields_under_root: true, tags: [tag]}`, event: `{"value": "abc"}`, want: common.MapStr{ @@ -194,7 +195,7 @@ func TestProcessorsConfigs(t *testing.T) { }, }, "with beat default fields and custom name": { - factory: NewBeatSupport(), + factory: MakeDefaultBeatSupport(true), global: `{fields: {global: a, agent.foo: bar}, fields_under_root: true, tags: [tag]}`, event: `{"value": "abc"}`, infoMod: func(info beat.Info) beat.Info { @@ -221,7 +222,7 @@ func TestProcessorsConfigs(t *testing.T) { }, }, "with observer default fields": { - factory: NewObserverSupport(false), + factory: MakeDefaultObserverSupport(false), global: `{fields: {global: a, observer.foo: bar}, fields_under_root: true, tags: [tag]}`, event: `{"value": "abc"}`, want: common.MapStr{ @@ -256,7 +257,7 @@ func TestProcessorsConfigs(t *testing.T) { factory := test.factory if factory == nil { - factory = NewDefaultSupport(true) + factory = MakeDefaultSupport(true) } support, err := factory(info, logp.L(), cfg) @@ -304,7 +305,7 @@ func TestNormalization(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() - s, err := NewDefaultSupport(test.normalize)(beat.Info{}, logp.L(), common.NewConfig()) + s, err := MakeDefaultSupport(test.normalize)(beat.Info{}, logp.L(), common.NewConfig()) require.NoError(t, err) prog, err := s.Create(beat.ProcessingConfig{}, false) @@ -322,7 +323,7 @@ func TestNormalization(t *testing.T) { } func TestAlwaysDrop(t *testing.T) { - s, err := NewDefaultSupport(true)(beat.Info{}, logp.L(), common.NewConfig()) + s, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), common.NewConfig()) require.NoError(t, err) prog, err := s.Create(beat.ProcessingConfig{}, true) @@ -334,7 +335,7 @@ func TestAlwaysDrop(t *testing.T) { } func TestDynamicFields(t *testing.T) { - factory, err := NewDefaultSupport(true)(beat.Info{}, logp.L(), common.NewConfig()) + factory, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), common.NewConfig()) require.NoError(t, err) dynFields := common.NewMapStrPointer(common.MapStr{}) diff --git a/libbeat/publisher/processing/processing.go b/libbeat/publisher/processing/processing.go index c849a5a7af2a..ff1be24fc453 100644 --- a/libbeat/publisher/processing/processing.go +++ b/libbeat/publisher/processing/processing.go @@ -23,10 +23,10 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -// SupporterFactory creates a new processing Supporter that can be used with +// SupportFactory creates a new processing Supporter that can be used with // the publisher pipeline. The factory gets the global configuration passed, // in order to configure some shared global event processing. -type SupporterFactory func(info beat.Info, log *logp.Logger, cfg *common.Config) (Supporter, error) +type SupportFactory func(info beat.Info, log *logp.Logger, cfg *common.Config) (Supporter, error) // Supporter is used to create an event processing pipeline. It is used by the // publisher pipeline when a client connects to the pipeline. The supporter diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go index 18d77dd68270..401b8849d2b0 100644 --- a/libbeat/publisher/processing/processors.go +++ b/libbeat/publisher/processing/processors.go @@ -29,7 +29,7 @@ import ( "github.com/elastic/beats/libbeat/processors" ) -type program struct { +type group struct { log *logp.Logger title string list []beat.Processor @@ -60,20 +60,20 @@ var dropDisabledProcessor = newProcessor("dropDisabled", func(event *beat.Event) return nil, nil }) -func newProgram(title string, log *logp.Logger) *program { - return &program{ +func newGroup(title string, log *logp.Logger) *group { + return &group{ title: title, log: log, } } -func (p *program) add(processor processors.Processor) { +func (p *group) add(processor processors.Processor) { if processor != nil { p.list = append(p.list, processor) } } -func (p *program) String() string { +func (p *group) String() string { var s []string for _, p := range p.list { s = append(s, p.String()) @@ -86,11 +86,11 @@ func (p *program) String() string { return fmt.Sprintf("%v{%v}", p.title, str) } -func (p *program) All() []beat.Processor { +func (p *group) All() []beat.Processor { return p.list } -func (p *program) Run(event *beat.Event) (*beat.Event, error) { +func (p *group) Run(event *beat.Event) (*beat.Event, error) { if p == nil || len(p.list) == 0 { return event, nil } From 73a6f0a7cf2d02b0df4782ec7242261ff5c96419 Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 7 Mar 2019 16:07:00 +0100 Subject: [PATCH 12/13] missing renames --- libbeat/publisher/processing/default.go | 6 +++--- libbeat/publisher/processing/default_test.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index 6162ef6c57e3..547593be95de 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -64,21 +64,21 @@ type modifier interface { type builtinModifier func(beat.Info) common.MapStr -// MakeDefaultBeatSupport creates a new SupporterFactory based on NewDefaultSupport. +// MakeDefaultBeatSupport creates a new SupportFactory based on NewDefaultSupport. // MakeDefaultBeatSupport automatically adds the `ecs.version`, `host.name` and `agent.X` fields // to each event. func MakeDefaultBeatSupport(normalize bool) SupportFactory { return MakeDefaultSupport(normalize, WithECS, WithHost, WithBeatMeta("agent")) } -// MakeDefaultObserverSupport creates a new SupporterFactory based on NewDefaultSupport. +// MakeDefaultObserverSupport creates a new SupportFactory based on NewDefaultSupport. // MakeDefaultObserverSupport automatically adds the `ecs.version` and `observer.X` fields // to each event. func MakeDefaultObserverSupport(normalize bool) SupportFactory { return MakeDefaultSupport(normalize, WithECS, WithBeatMeta("observer")) } -// MakeDefaultSupport creates a new SupporterFactory for use with the publisher pipeline. +// MakeDefaultSupport creates a new SupportFactory for use with the publisher pipeline. // If normalize is set, events will be normalized first before being presented // to the actual processors. // The Supporter will apply the global `fields`, `fields_under_root`, `tags` diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 0be5faa758c7..5f49b2b3829a 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -164,9 +164,9 @@ func TestProcessorsConfigs(t *testing.T) { "with client processor": { local: beat.ProcessingConfig{ Processor: func() beat.ProcessorList { - p := newGroup("test", logp.L()) - p.add(actions.NewAddFields(common.MapStr{"custom": "value"}, true)) - return p + g := newGroup("test", logp.L()) + g.add(actions.NewAddFields(common.MapStr{"custom": "value"}, true)) + return g }(), }, event: `{"value": "abc"}`, From d5ea8b65121300da9a13ef192dfe6ef4d8f4b69c Mon Sep 17 00:00:00 2001 From: urso Date: Fri, 8 Mar 2019 15:23:49 +0100 Subject: [PATCH 13/13] typo --- libbeat/publisher/processing/processors.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go index 401b8849d2b0..38a77bdd948e 100644 --- a/libbeat/publisher/processing/processors.go +++ b/libbeat/publisher/processing/processors.go @@ -100,7 +100,7 @@ func (p *group) Run(event *beat.Event) (*beat.Event, error) { event, err = sub.Run(event) if err != nil { - // XXX: We don't drop the event, but continue filtering here iff the most + // XXX: We don't drop the event, but continue filtering here if the most // recent processor did return an event. // We want processors having this kind of implicit behavior // on errors?