Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beats event processing and default fields #10801

Merged
merged 13 commits into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ func (f *OutletFactory) Create(p beat.Pipeline, cfg *common.Config, dynFields *c
}

client, err := p.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: config.EventMetadata,
DynamicFields: dynFields,
Meta: meta,
Fields: fields,
Processor: processors,
Events: f.eventer,
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: config.EventMetadata,
DynamicFields: dynFields,
Meta: meta,
Fields: fields,
Processor: processors,
},
Events: f.eventer,
})
if err != nil {
return nil, err
Expand Down
8 changes: 5 additions & 3 deletions heartbeat/monitors/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,11 @@ func (t *configuredJob) Start() {
}

t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{
EventMetadata: t.config.EventMetadata,
Processor: t.processors,
Fields: fields,
Processing: beat.ProcessingConfig{
EventMetadata: t.config.EventMetadata,
Processor: t.processors,
Fields: fields,
},
})
if err != nil {
logp.Err("could not start monitor: %v", err)
Expand Down
10 changes: 6 additions & 4 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ func New(
func (i *Input) Run() {
var err error
i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: i.eventMeta,
Meta: nil,
Processor: i.processors,
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: i.eventMeta,
Meta: nil,
Processor: i.processors,
},
ACKCount: func(n int) {
i.logger.Infof("journalbeat successfully published %d events", n)
},
Expand Down
50 changes: 26 additions & 24 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,7 @@ type Client interface {
type ClientConfig struct {
PublishMode PublishMode

// EventMetadata configures additional fields/tags to be added to published events.
EventMetadata common.EventMetadata

// Meta provides additional meta data to be added to the Meta field in the beat.Event
// structure.
Meta common.MapStr

// Fields provides additional 'global' fields to be added to every event
Fields common.MapStr

// DynamicFields provides additional fields to be added to every event, supporting live updates
DynamicFields *common.MapStrPointer

// Processors passes additional processor to the client, to be executed before
// the pipeline processors.
Processor ProcessorList
Processing ProcessingConfig

// WaitClose sets the maximum duration to wait on ACK, if client still has events
// active non-acknowledged events in the publisher pipeline.
Expand All @@ -72,14 +57,6 @@ type ClientConfig struct {
// Events configures callbacks for common client callbacks
Events ClientEventer

// By default events are normalized within processor pipeline,
// if the normalization step should be skipped set this to true.
SkipNormalization bool

// By default events are decorated with agent metadata.
// To skip adding that metadata set this to true.
SkipAgentMetadata bool

// ACK handler strategies.
// Note: ack handlers are run in another go-routine owned by the publisher pipeline.
// They should not block for to long, to not block the internal buffers for
Expand All @@ -101,6 +78,31 @@ type ClientConfig struct {
ACKLastEvent func(interface{})
}

// ProcessingConfig provides additional event processing settings a client can
// pass to the publisher pipeline on Connect.
type ProcessingConfig struct {
// EventMetadata configures additional fields/tags to be added to published events.
EventMetadata common.EventMetadata

// Meta provides additional meta data to be added to the Meta field in the beat.Event
// structure.
Meta common.MapStr

// Fields provides additional 'global' fields to be added to every event
Fields common.MapStr

// DynamicFields provides additional fields to be added to every event, supporting live updates
DynamicFields *common.MapStrPointer

// Processors passes additional processor to the client, to be executed before
// the pipeline processors.
Processor ProcessorList

// Private contains additional information to be passed to the processing
// pipeline builder.
Private interface{}
}

// ClientEventer provides access to internal client events.
type ClientEventer interface {
Closing() // Closing indicates the client is being shutdown next
Expand Down
14 changes: 14 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/plugin"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/processing"
svc "github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/libbeat/version"
sysinfo "github.com/elastic/go-sysinfo"
Expand All @@ -78,6 +79,8 @@ type Beat struct {

keystore keystore.Keystore
index idxmgmt.Supporter

processing processing.Supporter
}

type beatConfig struct {
Expand Down Expand Up @@ -310,6 +313,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
Logger: logp.L().Named("publisher"),
},
b.Config.Pipeline,
b.processing,
b.makeOutputFactory(b.Config.Output),
)

Expand Down Expand Up @@ -593,6 +597,16 @@ func (b *Beat) configure(settings Settings) error {
imFactory = idxmgmt.MakeDefaultSupport(settings.ILM)
}
b.index, err = imFactory(nil, b.Beat.Info, b.RawConfig)
if err != nil {
return err
}

processingFactory := settings.Processing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the ProcessorFactory defined on the root cmd level, but the ProcessingConfig on the pipeline level? Would it make sense to move the processorFactory also to the processing config level?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beats do not directly publish methods to the queue in the publisher pipeline, but do so via a Client.
The pipeline including global processors and fields settings, queue, and outputs is created on startup. There are no go-routines collecting data yet.
Go-routines are supposed to use Connect, so to connect to a publisher pipeline. This returns a beat.Client (Client instances are not guaranteed to be thread safe). Each client/go-routine is allowed to configure local processors/fields/tags, which gets merged with the global settings. The factory is the global entity, loading, checking, and preparing global configurations on startup. The ProcessingConfig specifies the per go-routine local processing, which might be eventually established due to the Beat modules initialization or much later via autodiscovery.

if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
}
b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig)

return err
}

Expand Down
3 changes: 3 additions & 0 deletions libbeat/cmd/instance/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/libbeat/idxmgmt"
"github.com/elastic/beats/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/libbeat/monitoring/report"
"github.com/elastic/beats/libbeat/publisher/processing"
)

// Settings contains basic settings for any beat to pass into GenRootCmd
Expand All @@ -40,4 +41,6 @@ type Settings struct {
// load custom index manager. The config object will be the Beats root configuration.
IndexManagement idxmgmt.SupportFactory
ILM ilm.SupportFactory

Processing processing.SupportFactory
}
9 changes: 8 additions & 1 deletion libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/processing"
"github.com/elastic/beats/libbeat/publisher/queue"
"github.com/elastic/beats/libbeat/publisher/queue/memqueue"
)
Expand Down Expand Up @@ -169,11 +170,16 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
outClient := outputs.NewFailoverClient(clients)
outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max)

processing, err := processing.MakeDefaultSupport(true)(beat, log, common.NewConfig())
if err != nil {
return nil, err
}

pipeline, err := pipeline.New(
beat,
pipeline.Monitors{
Metrics: monitoring,
Logger: logp.NewLogger(selector),
Logger: log,
},
queueFactory,
outputs.Group{
Expand All @@ -184,6 +190,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
Processors: processing,
})
if err != nil {
return nil, err
Expand Down
20 changes: 2 additions & 18 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/processing"
"github.com/elastic/beats/libbeat/publisher/queue"
)

Expand Down Expand Up @@ -60,6 +60,7 @@ func Load(
beatInfo beat.Info,
monitors Monitors,
config Config,
processors processing.Supporter,
makeOutput func(outputs.Observer) (string, outputs.Group, error),
) (*Pipeline, error) {
log := monitors.Logger
Expand All @@ -71,28 +72,11 @@ func Load(
log.Info("Dry run mode. All output types except the file based one are disabled.")
}

processors, err := processors.New(config.Processors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %v", err)
}

name := beatInfo.Name
settings := Settings{
WaitClose: 0,
WaitCloseMode: NoWaitOnClose,
Disabled: publishDisabled,
Processors: processors,
Annotations: Annotations{
Event: config.EventMetadata,
Builtin: common.MapStr{
"host": common.MapStr{
"name": name,
},
"ecs": common.MapStr{
"version": "1.0.0-beta2",
},
},
},
}

queueBuilder, err := createQueueBuilder(config.Queue, monitors)
Expand Down
Loading