From cf09452e64206460c749c226fa944f2e79418f7c Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Mon, 22 Oct 2018 13:53:06 +0200 Subject: [PATCH] Publisher pipeline: pass logger and metrics registry (#8091) (#8147) We should strive to not have dependencies on globals in beats. The publisher pipeline rewrite made sure we don't work with globals internally. Yet some globals have been introduced since, and even though the library didn't use globals internally, initialization still did use globals at some points. This change removes globals for logging/metrics/telemetry, by requiring the beat instance to pass down required instances. (cherry picked from commit 8c15e6effbae3efa1ce910489d12b6ac75ffdffe) --- CHANGELOG.asciidoc | 1 + libbeat/cmd/instance/beat.go | 9 ++- .../report/elasticsearch/elasticsearch.go | 40 +++++++---- libbeat/publisher/pipeline/controller.go | 10 +-- libbeat/publisher/pipeline/module.go | 72 ++++++++++++------- libbeat/publisher/pipeline/pipeline.go | 5 +- libbeat/publisher/pipeline/stress/run.go | 9 ++- libbeat/publisher/queue/memqueue/broker.go | 15 +++- .../publisher/queue/memqueue/queue_test.go | 2 +- libbeat/publisher/queue/queue.go | 3 +- libbeat/publisher/queue/spool/module.go | 10 ++- 11 files changed, 119 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index d664a720b54..bca8a50833c 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -107,6 +107,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] - Report number of open file handles on Windows. {pull}8329[8329] - Added the `add_process_metadata` processor to enrich events with process information. {pull}6789[6789] - Add Beats Central Management {pull}8559[8559] +- Report configured queue type. {pull}8091[8091] *Auditbeat* diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index a6b44f67df1..3752a005ecc 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -309,7 +309,14 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { } debugf("Initializing output plugins") - pipeline, err := pipeline.Load(b.Info, reg, b.Config.Pipeline, b.Config.Output) + pipeline, err := pipeline.Load(b.Info, + pipeline.Monitors{ + Metrics: reg, + Telemetry: monitoring.GetNamespace("state").GetRegistry(), + Logger: logp.L().Named("publisher"), + }, + b.Config.Pipeline, + b.Config.Output) if err != nil { return nil, fmt.Errorf("error initializing publisher: %+v", err) } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index fb9772ed67f..a4be956afee 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -43,7 +43,8 @@ import ( ) type reporter struct { - done *stopper + done *stopper + logger *logp.Logger checkRetry time.Duration @@ -58,7 +59,9 @@ type reporter struct { out []outputs.NetworkClient } -var debugf = logp.MakeDebug("monitoring") +const selector = "monitoring" + +var debugf = logp.MakeDebug(selector) var errNoMonitoring = errors.New("xpack monitoring not available") @@ -104,7 +107,9 @@ func defaultConfig(settings report.Settings) config { } func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) (report.Reporter, error) { + log := logp.L().Named(selector) config := defaultConfig(settings) + if err := cfg.Unpack(&config); err != nil { return nil, err } @@ -121,7 +126,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) return nil, err } if proxyURL != nil { - logp.Info("Using proxy URL: %s", proxyURL) + log.Infof("Using proxy URL: %s", proxyURL) } tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS) if err != nil { @@ -154,10 +159,11 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) } queueFactory := func(e queue.Eventer) (queue.Queue, error) { - return memqueue.NewBroker(memqueue.Settings{ - Eventer: e, - Events: 20, - }), nil + return memqueue.NewBroker(log, + memqueue.Settings{ + Eventer: e, + Events: 20, + }), nil } monitoring := monitoring.Default.GetRegistry("xpack.monitoring") @@ -167,6 +173,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) pipeline, err := pipeline.New( beat, + pipeline.Monitors{}, monitoring, queueFactory, outputs.Group{ @@ -189,6 +196,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) } r := &reporter{ + logger: log, done: newStopper(), beatMeta: makeMeta(beat), tags: config.Tags, @@ -211,6 +219,8 @@ func (r *reporter) initLoop(c config) { debugf("Start monitoring endpoint init loop.") defer debugf("Finish monitoring endpoint init loop.") + log := r.logger + logged := false for { @@ -218,11 +228,11 @@ func (r *reporter) initLoop(c config) { client := r.out[rand.Intn(len(r.out))] err := client.Connect() if err == nil { - closing(client) + closing(log, client) break } else { if !logged { - logp.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying.") + log.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying.") logged = true } debugf("Monitoring could not connect to elasticsearch, failed with %v", err) @@ -235,7 +245,7 @@ func (r *reporter) initLoop(c config) { } } - logp.Info("Successfully connected to X-Pack Monitoring endpoint.") + log.Info("Successfully connected to X-Pack Monitoring endpoint.") // Start collector and send loop if monitoring endpoint has been found. go r.snapshotLoop("state", "state", c.StatePeriod) @@ -247,8 +257,10 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration) ticker := time.NewTicker(period) defer ticker.Stop() - logp.Info("Start monitoring %s metrics snapshot loop with period %s.", namespace, period) - defer logp.Info("Stop monitoring %s metrics snapshot loop.", namespace) + log := r.logger + + log.Infof("Start monitoring %s metrics snapshot loop with period %s.", namespace, period) + defer log.Infof("Stop monitoring %s metrics snapshot loop.", namespace) for { var ts time.Time @@ -317,9 +329,9 @@ func makeClient( return newPublishClient(esClient, params), nil } -func closing(c io.Closer) { +func closing(log *logp.Logger, c io.Closer) { if err := c.Close(); err != nil { - logp.Warn("Closed failed with: %v", err) + log.Warnf("Closed failed with: %v", err) } } diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index b19b84df4c3..0aaef9cf997 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -22,7 +22,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/reload" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/publisher/queue" ) @@ -33,9 +32,10 @@ import ( // - reload type outputController struct { beat beat.Info + monitors Monitors + logger *logp.Logger observer outputObserver - reg *monitoring.Registry queue queue.Queue @@ -63,14 +63,14 @@ type outputWorker interface { func newOutputController( beat beat.Info, - reg *monitoring.Registry, + monitors Monitors, log *logp.Logger, observer outputObserver, b queue.Queue, ) *outputController { c := &outputController{ beat: beat, - reg: reg, + monitors: monitors, logger: log, observer: observer, queue: b, @@ -158,7 +158,7 @@ func (c *outputController) Reload(cfg *reload.ConfigWithMeta) error { return err } - output, err := loadOutput(c.beat, c.reg, outputCfg) + output, err := loadOutput(c.beat, c.monitors, outputCfg) if err != nil { return err } diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index 4e0c318057f..e0b7d42a2a3 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -38,6 +38,14 @@ var publishDisabled = false const defaultQueueType = "mem" +// Monitors configures visibility for observing state and progress of the +// pipeline. +type Monitors struct { + Metrics *monitoring.Registry + Telemetry *monitoring.Registry + Logger *logp.Logger +} + func init() { flag.BoolVar(&publishDisabled, "N", false, "Disable actual publishing for testing") } @@ -46,12 +54,17 @@ func init() { // configured queue and outputs. func Load( beatInfo beat.Info, - reg *monitoring.Registry, + monitors Monitors, config Config, outcfg common.ConfigNamespace, ) (*Pipeline, error) { + log := monitors.Logger + if log == nil { + log = logp.L() + } + if publishDisabled { - logp.Info("Dry run mode. All output types except the file based one are disabled.") + log.Info("Dry run mode. All output types except the file based one are disabled.") } processors, err := processors.New(config.Processors) @@ -80,53 +93,52 @@ func Load( }, } - queueBuilder, err := createQueueBuilder(config.Queue) + queueBuilder, err := createQueueBuilder(config.Queue, monitors) if err != nil { return nil, err } - out, err := loadOutput(beatInfo, reg, outcfg) + out, err := loadOutput(beatInfo, monitors, outcfg) if err != nil { return nil, err } - p, err := New(beatInfo, reg, queueBuilder, out, settings) + p, err := New(beatInfo, monitors, monitors.Metrics, queueBuilder, out, settings) if err != nil { return nil, err } - logp.Info("Beat name: %s", name) + log.Info("Beat name: %s", name) return p, err } func loadOutput( beatInfo beat.Info, - reg *monitoring.Registry, + monitors Monitors, outcfg common.ConfigNamespace, ) (outputs.Group, error) { + log := monitors.Logger + if log == nil { + log = logp.L() + } + if publishDisabled { return outputs.Group{}, nil } if !outcfg.IsSet() { msg := "No outputs are defined. Please define one under the output section." - logp.Info(msg) + log.Info(msg) return outputs.Fail(errors.New(msg)) } var ( - outReg *monitoring.Registry + metrics *monitoring.Registry outStats outputs.Observer ) - - if reg != nil { - outReg = reg.GetRegistry("output") - if outReg != nil { - outReg.Clear() - } else { - outReg = reg.NewRegistry("output") - } - outStats = outputs.NewStats(outReg) + if monitors.Metrics != nil { + metrics = monitors.Metrics.NewRegistry("output") + outStats = outputs.NewStats(metrics) } out, err := outputs.Load(beatInfo, outStats, outcfg.Name(), outcfg.Config()) @@ -134,18 +146,21 @@ func loadOutput( return outputs.Fail(err) } - if outReg != nil { - monitoring.NewString(outReg, "type").Set(outcfg.Name()) + if metrics != nil { + monitoring.NewString(metrics, "type").Set(outcfg.Name()) + } + if monitors.Telemetry != nil { + telemetry := monitors.Telemetry.NewRegistry("output") + monitoring.NewString(telemetry, "name").Set(outcfg.Name()) } - - stateRegistry := monitoring.GetNamespace("state").GetRegistry() - outputRegistry := stateRegistry.NewRegistry("output") - monitoring.NewString(outputRegistry, "name").Set(outcfg.Name()) return out, nil } -func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (queue.Queue, error), error) { +func createQueueBuilder( + config common.ConfigNamespace, + monitors Monitors, +) (func(queue.Eventer) (queue.Queue, error), error) { queueType := defaultQueueType if b := config.Name(); b != "" { queueType = b @@ -161,7 +176,12 @@ func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (que queueConfig = common.NewConfig() } + if monitors.Telemetry != nil { + queueReg := monitors.Telemetry.NewRegistry("queue") + monitoring.NewString(queueReg, "name").Set(queueType) + } + return func(eventer queue.Eventer) (queue.Queue, error) { - return queueFactory(eventer, queueConfig) + return queueFactory(eventer, monitors.Logger, queueConfig) }, nil } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 90b2ce38e5b..a07727c8c8e 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -26,12 +26,12 @@ import ( "time" "github.com/elastic/beats/libbeat/common/reload" + "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/atomic" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/publisher" @@ -153,6 +153,7 @@ type queueFactory func(queue.Eventer) (queue.Queue, error) // queue and outputs will be closed. func New( beat beat.Info, + monitors Monitors, metrics *monitoring.Registry, queueFactory queueFactory, out outputs.Group, @@ -205,7 +206,7 @@ func New( } p.eventSema = newSema(maxEvents) - p.output = newOutputController(beat, metrics, log, p.observer, p.queue) + p.output = newOutputController(beat, monitors, log, p.observer, p.queue) p.output.Set(out) return p, nil diff --git a/libbeat/publisher/pipeline/stress/run.go b/libbeat/publisher/pipeline/stress/run.go index 3a71f8894bb..e21d3f29d26 100644 --- a/libbeat/publisher/pipeline/stress/run.go +++ b/libbeat/publisher/pipeline/stress/run.go @@ -57,8 +57,13 @@ func RunTests( return fmt.Errorf("unpacking config failed: %v", err) } - // reg := monitoring.NewRegistry() - pipeline, err := pipeline.Load(info, nil, config.Pipeline, config.Output) + pipeline, err := pipeline.Load(info, pipeline.Monitors{ + Metrics: nil, + Telemetry: nil, + Logger: logp.L(), + }, + config.Pipeline, + config.Output) if err != nil { return fmt.Errorf("loading pipeline failed: %+v", err) } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index b582155bc0a..34cefd6ef2a 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -87,13 +87,17 @@ func init() { queue.RegisterType("mem", create) } -func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) { +func create(eventer queue.Eventer, logger *logp.Logger, cfg *common.Config) (queue.Queue, error) { config := defaultConfig if err := cfg.Unpack(&config); err != nil { return nil, err } - return NewBroker(Settings{ + if logger == nil { + logger = logp.L() + } + + return NewBroker(logger, Settings{ Eventer: eventer, Events: config.Events, FlushMinEvents: config.FlushMinEvents, @@ -105,6 +109,7 @@ func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) { // If waitOnClose is set to true, the broker will block on Close, until all internal // workers handling incoming messages and ACKs have been shut down. func NewBroker( + logger logger, settings Settings, ) *Broker { // define internal channel size for producer/client requests @@ -128,9 +133,13 @@ func NewBroker( minEvents = sz } + if logger == nil { + logger = logp.NewLogger("memqueue") + } + b := &Broker{ done: make(chan struct{}), - logger: logp.NewLogger("memqueue"), + logger: logger, // broker API channels events: make(chan pushRequest, chanSize), diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 528cc39996e..cac0c68d3a8 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -72,7 +72,7 @@ func TestProducerCancelRemovesEvents(t *testing.T) { func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory { return func(_ *testing.T) queue.Queue { - return NewBroker(Settings{ + return NewBroker(nil, Settings{ Events: sz, FlushMinEvents: minEvents, FlushTimeout: flushTimeout, diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 201af049b94..eca5c0c499d 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -22,11 +22,12 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" ) // Factory for creating a queue used by a pipeline instance. -type Factory func(Eventer, *common.Config) (Queue, error) +type Factory func(Eventer, *logp.Logger, *common.Config) (Queue, error) // Eventer listens to special events to be send by queue implementations. type Eventer interface { diff --git a/libbeat/publisher/queue/spool/module.go b/libbeat/publisher/queue/spool/module.go index b53d1be42d4..71a43c421ec 100644 --- a/libbeat/publisher/queue/spool/module.go +++ b/libbeat/publisher/queue/spool/module.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/feature" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/libbeat/publisher/queue" "github.com/elastic/go-txfile" @@ -38,7 +39,7 @@ func init() { queue.RegisterType("spool", create) } -func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) { +func create(eventer queue.Eventer, logp *logp.Logger, cfg *common.Config) (queue.Queue, error) { cfgwarn.Beta("Spooling to disk is beta") config := defaultConfig() @@ -56,7 +57,12 @@ func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) { flushEvents = uint(count) } - return NewSpool(defaultLogger(), path, Settings{ + var log logger = logp + if logp == nil { + log = defaultLogger() + } + + return NewSpool(log, path, Settings{ Eventer: eventer, Mode: config.File.Permissions, WriteBuffer: uint(config.Write.BufferSize),