Skip to content

Commit

Permalink
Publisher pipeline: pass logger and metrics registry (elastic#8091) (e…
Browse files Browse the repository at this point in the history
…lastic#8147)

We should strive to not have dependencies on globals in beats. The
publisher pipeline rewrite made sure we don't work with globals
internally. Yet some globals have been introduced since, and even though
the library didn't use globals internally, initialization still did use
globals at some points.
This change removes globals for logging/metrics/telemetry, by requiring
the beat instance to pass down required instances.

(cherry picked from commit 8c15e6e)
  • Loading branch information
Steffen Siering authored Oct 22, 2018
1 parent 9b10c2a commit cf09452
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff]
- Report number of open file handles on Windows. {pull}8329[8329]
- Added the `add_process_metadata` processor to enrich events with process information. {pull}6789[6789]
- Add Beats Central Management {pull}8559[8559]
- Report configured queue type. {pull}8091[8091]

*Auditbeat*

Expand Down
9 changes: 8 additions & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,14 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
}

debugf("Initializing output plugins")
pipeline, err := pipeline.Load(b.Info, reg, b.Config.Pipeline, b.Config.Output)
pipeline, err := pipeline.Load(b.Info,
pipeline.Monitors{
Metrics: reg,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Logger: logp.L().Named("publisher"),
},
b.Config.Pipeline,
b.Config.Output)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %+v", err)
}
Expand Down
40 changes: 26 additions & 14 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import (
)

type reporter struct {
done *stopper
done *stopper
logger *logp.Logger

checkRetry time.Duration

Expand All @@ -58,7 +59,9 @@ type reporter struct {
out []outputs.NetworkClient
}

var debugf = logp.MakeDebug("monitoring")
const selector = "monitoring"

var debugf = logp.MakeDebug(selector)

var errNoMonitoring = errors.New("xpack monitoring not available")

Expand Down Expand Up @@ -104,7 +107,9 @@ func defaultConfig(settings report.Settings) config {
}

func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config) (report.Reporter, error) {
log := logp.L().Named(selector)
config := defaultConfig(settings)

if err := cfg.Unpack(&config); err != nil {
return nil, err
}
Expand All @@ -121,7 +126,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
return nil, err
}
if proxyURL != nil {
logp.Info("Using proxy URL: %s", proxyURL)
log.Infof("Using proxy URL: %s", proxyURL)
}
tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
Expand Down Expand Up @@ -154,10 +159,11 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
}

queueFactory := func(e queue.Eventer) (queue.Queue, error) {
return memqueue.NewBroker(memqueue.Settings{
Eventer: e,
Events: 20,
}), nil
return memqueue.NewBroker(log,
memqueue.Settings{
Eventer: e,
Events: 20,
}), nil
}

monitoring := monitoring.Default.GetRegistry("xpack.monitoring")
Expand All @@ -167,6 +173,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)

pipeline, err := pipeline.New(
beat,
pipeline.Monitors{},
monitoring,
queueFactory,
outputs.Group{
Expand All @@ -189,6 +196,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
}

r := &reporter{
logger: log,
done: newStopper(),
beatMeta: makeMeta(beat),
tags: config.Tags,
Expand All @@ -211,18 +219,20 @@ func (r *reporter) initLoop(c config) {
debugf("Start monitoring endpoint init loop.")
defer debugf("Finish monitoring endpoint init loop.")

log := r.logger

logged := false

for {
// Select one configured endpoint by random and check if xpack is available
client := r.out[rand.Intn(len(r.out))]
err := client.Connect()
if err == nil {
closing(client)
closing(log, client)
break
} else {
if !logged {
logp.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying.")
log.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying.")
logged = true
}
debugf("Monitoring could not connect to elasticsearch, failed with %v", err)
Expand All @@ -235,7 +245,7 @@ func (r *reporter) initLoop(c config) {
}
}

logp.Info("Successfully connected to X-Pack Monitoring endpoint.")
log.Info("Successfully connected to X-Pack Monitoring endpoint.")

// Start collector and send loop if monitoring endpoint has been found.
go r.snapshotLoop("state", "state", c.StatePeriod)
Expand All @@ -247,8 +257,10 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration)
ticker := time.NewTicker(period)
defer ticker.Stop()

logp.Info("Start monitoring %s metrics snapshot loop with period %s.", namespace, period)
defer logp.Info("Stop monitoring %s metrics snapshot loop.", namespace)
log := r.logger

log.Infof("Start monitoring %s metrics snapshot loop with period %s.", namespace, period)
defer log.Infof("Stop monitoring %s metrics snapshot loop.", namespace)

for {
var ts time.Time
Expand Down Expand Up @@ -317,9 +329,9 @@ func makeClient(
return newPublishClient(esClient, params), nil
}

func closing(c io.Closer) {
func closing(log *logp.Logger, c io.Closer) {
if err := c.Close(); err != nil {
logp.Warn("Closed failed with: %v", err)
log.Warnf("Closed failed with: %v", err)
}
}

Expand Down
10 changes: 5 additions & 5 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/reload"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/publisher/queue"
)
Expand All @@ -33,9 +32,10 @@ import (
// - reload
type outputController struct {
beat beat.Info
monitors Monitors

logger *logp.Logger
observer outputObserver
reg *monitoring.Registry

queue queue.Queue

Expand Down Expand Up @@ -63,14 +63,14 @@ type outputWorker interface {

func newOutputController(
beat beat.Info,
reg *monitoring.Registry,
monitors Monitors,
log *logp.Logger,
observer outputObserver,
b queue.Queue,
) *outputController {
c := &outputController{
beat: beat,
reg: reg,
monitors: monitors,
logger: log,
observer: observer,
queue: b,
Expand Down Expand Up @@ -158,7 +158,7 @@ func (c *outputController) Reload(cfg *reload.ConfigWithMeta) error {
return err
}

output, err := loadOutput(c.beat, c.reg, outputCfg)
output, err := loadOutput(c.beat, c.monitors, outputCfg)
if err != nil {
return err
}
Expand Down
72 changes: 46 additions & 26 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ var publishDisabled = false

const defaultQueueType = "mem"

// Monitors configures visibility for observing state and progress of the
// pipeline.
type Monitors struct {
Metrics *monitoring.Registry
Telemetry *monitoring.Registry
Logger *logp.Logger
}

func init() {
flag.BoolVar(&publishDisabled, "N", false, "Disable actual publishing for testing")
}
Expand All @@ -46,12 +54,17 @@ func init() {
// configured queue and outputs.
func Load(
beatInfo beat.Info,
reg *monitoring.Registry,
monitors Monitors,
config Config,
outcfg common.ConfigNamespace,
) (*Pipeline, error) {
log := monitors.Logger
if log == nil {
log = logp.L()
}

if publishDisabled {
logp.Info("Dry run mode. All output types except the file based one are disabled.")
log.Info("Dry run mode. All output types except the file based one are disabled.")
}

processors, err := processors.New(config.Processors)
Expand Down Expand Up @@ -80,72 +93,74 @@ func Load(
},
}

queueBuilder, err := createQueueBuilder(config.Queue)
queueBuilder, err := createQueueBuilder(config.Queue, monitors)
if err != nil {
return nil, err
}

out, err := loadOutput(beatInfo, reg, outcfg)
out, err := loadOutput(beatInfo, monitors, outcfg)
if err != nil {
return nil, err
}

p, err := New(beatInfo, reg, queueBuilder, out, settings)
p, err := New(beatInfo, monitors, monitors.Metrics, queueBuilder, out, settings)
if err != nil {
return nil, err
}

logp.Info("Beat name: %s", name)
log.Info("Beat name: %s", name)
return p, err
}

func loadOutput(
beatInfo beat.Info,
reg *monitoring.Registry,
monitors Monitors,
outcfg common.ConfigNamespace,
) (outputs.Group, error) {
log := monitors.Logger
if log == nil {
log = logp.L()
}

if publishDisabled {
return outputs.Group{}, nil
}

if !outcfg.IsSet() {
msg := "No outputs are defined. Please define one under the output section."
logp.Info(msg)
log.Info(msg)
return outputs.Fail(errors.New(msg))
}

var (
outReg *monitoring.Registry
metrics *monitoring.Registry
outStats outputs.Observer
)

if reg != nil {
outReg = reg.GetRegistry("output")
if outReg != nil {
outReg.Clear()
} else {
outReg = reg.NewRegistry("output")
}
outStats = outputs.NewStats(outReg)
if monitors.Metrics != nil {
metrics = monitors.Metrics.NewRegistry("output")
outStats = outputs.NewStats(metrics)
}

out, err := outputs.Load(beatInfo, outStats, outcfg.Name(), outcfg.Config())
if err != nil {
return outputs.Fail(err)
}

if outReg != nil {
monitoring.NewString(outReg, "type").Set(outcfg.Name())
if metrics != nil {
monitoring.NewString(metrics, "type").Set(outcfg.Name())
}
if monitors.Telemetry != nil {
telemetry := monitors.Telemetry.NewRegistry("output")
monitoring.NewString(telemetry, "name").Set(outcfg.Name())
}

stateRegistry := monitoring.GetNamespace("state").GetRegistry()
outputRegistry := stateRegistry.NewRegistry("output")
monitoring.NewString(outputRegistry, "name").Set(outcfg.Name())

return out, nil
}

func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (queue.Queue, error), error) {
func createQueueBuilder(
config common.ConfigNamespace,
monitors Monitors,
) (func(queue.Eventer) (queue.Queue, error), error) {
queueType := defaultQueueType
if b := config.Name(); b != "" {
queueType = b
Expand All @@ -161,7 +176,12 @@ func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (que
queueConfig = common.NewConfig()
}

if monitors.Telemetry != nil {
queueReg := monitors.Telemetry.NewRegistry("queue")
monitoring.NewString(queueReg, "name").Set(queueType)
}

return func(eventer queue.Eventer) (queue.Queue, error) {
return queueFactory(eventer, queueConfig)
return queueFactory(eventer, monitors.Logger, queueConfig)
}, nil
}
5 changes: 3 additions & 2 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
"time"

"github.com/elastic/beats/libbeat/common/reload"
"github.com/elastic/beats/libbeat/monitoring"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher"
Expand Down Expand Up @@ -153,6 +153,7 @@ type queueFactory func(queue.Eventer) (queue.Queue, error)
// queue and outputs will be closed.
func New(
beat beat.Info,
monitors Monitors,
metrics *monitoring.Registry,
queueFactory queueFactory,
out outputs.Group,
Expand Down Expand Up @@ -205,7 +206,7 @@ func New(
}
p.eventSema = newSema(maxEvents)

p.output = newOutputController(beat, metrics, log, p.observer, p.queue)
p.output = newOutputController(beat, monitors, log, p.observer, p.queue)
p.output.Set(out)

return p, nil
Expand Down
9 changes: 7 additions & 2 deletions libbeat/publisher/pipeline/stress/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ func RunTests(
return fmt.Errorf("unpacking config failed: %v", err)
}

// reg := monitoring.NewRegistry()
pipeline, err := pipeline.Load(info, nil, config.Pipeline, config.Output)
pipeline, err := pipeline.Load(info, pipeline.Monitors{
Metrics: nil,
Telemetry: nil,
Logger: logp.L(),
},
config.Pipeline,
config.Output)
if err != nil {
return fmt.Errorf("loading pipeline failed: %+v", err)
}
Expand Down
Loading

0 comments on commit cf09452

Please sign in to comment.