Skip to content

Commit

Permalink
refactor libbeat/common/reload to not use global variable (#40622)
Browse files Browse the repository at this point in the history
* refactor libbeat/common/reload to not use global variable
  • Loading branch information
leehinman committed Aug 29, 2024
1 parent 2491f08 commit 3b49a93
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 17 deletions.
3 changes: 1 addition & 2 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
Expand Down Expand Up @@ -405,7 +404,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

// Register reloadable list of inputs and modules
inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline)
reload.RegisterV2.MustRegisterInput(inputs)
b.Registry.MustRegisterInput(inputs)

modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline)

Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) {
})

inputs := cfgfile.NewRunnerList(management.DebugK, bt.monitorFactory, b.Publisher)
reload.RegisterV2.MustRegisterInput(inputs)
b.Registry.MustRegisterInput(inputs)
}

// RunReloadableMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present.
Expand Down
3 changes: 2 additions & 1 deletion libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ type Beat struct {

Instrumentation instrumentation.Instrumentation // instrumentation holds an APM agent for capturing and reporting traces

API *api.Server // API server. This is nil unless the http endpoint is enabled.
API *api.Server // API server. This is nil unless the http endpoint is enabled.
Registry *reload.Registry // input, & output registry for configuration manager, should be instantiated in NewBeat
}

// GenerateUserAgent populates the UserAgent field on the beat.Info struct
Expand Down
7 changes: 4 additions & 3 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool, initFuncs []func
StartTime: time.Now(),
EphemeralID: eid,
},
Fields: fields,
Fields: fields,
Registry: reload.NewRegistry(),
}

return &Beat{Beat: b}, nil
Expand Down Expand Up @@ -405,7 +406,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
return nil, fmt.Errorf("error initializing publisher: %w", err)
}

reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))
b.Registry.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))

b.Publisher = publisher
beater, err := bt(&b.Beat, sub)
Expand Down Expand Up @@ -861,7 +862,7 @@ func (b *Beat) configure(settings Settings) error {
}

// initialize config manager
m, err := management.NewManager(b.Config.Management, reload.RegisterV2)
m, err := management.NewManager(b.Config.Management, b.Registry)
if err != nil {
return err
}
Expand Down
3 changes: 0 additions & 3 deletions libbeat/common/reload/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

// RegisterV2 is the special registry used for the V2 controller
var RegisterV2 = NewRegistry()

// InputRegName is the registation name for V2 inputs
const InputRegName = "input"

Expand Down
3 changes: 1 addition & 2 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/metricbeat/mb"
Expand Down Expand Up @@ -248,7 +247,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
// Centrally managed modules
factory := module.NewFactory(b.Info, bt.registry, bt.moduleOptions...)
modules := cfgfile.NewRunnerList(management.DebugK, factory, b.Publisher)
reload.RegisterV2.MustRegisterInput(modules)
b.Registry.MustRegisterInput(modules)
wg.Add(1)
go func() {
defer wg.Done()
Expand Down
3 changes: 1 addition & 2 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
Expand Down Expand Up @@ -209,7 +208,7 @@ func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error {
// the runner by starting the beat's manager. It returns on the first fatal error.
func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error {
runner := newReloader(management.DebugK, factory, b.Publisher)
reload.RegisterV2.MustRegisterInput(runner)
b.Registry.MustRegisterInput(runner)
logp.Debug("main", "Waiting for the runner to finish")

// Start the manager after all the hooks are registered and terminates when
Expand Down
2 changes: 1 addition & 1 deletion x-pack/osquerybeat/beater/osquerybeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (bt *osquerybeat) Run(b *beat.Beat) error {
defer bt.close()

// Watch input configuration updates
inputConfigCh := config.WatchInputs(ctx, bt.log)
inputConfigCh := config.WatchInputs(ctx, bt.log, b.Registry)

// Install osqueryd if needed
err = installOsquery(ctx)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/osquerybeat/internal/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ func (r *reloader) Reload(configs []*reload.ConfigWithMeta) error {
return nil
}

func WatchInputs(ctx context.Context, log *logp.Logger) <-chan []InputConfig {
func WatchInputs(ctx context.Context, log *logp.Logger, registry *reload.Registry) <-chan []InputConfig {
ch := make(chan []InputConfig)
r := &reloader{
ctx: ctx,
log: log,
ch: ch,
}
reload.RegisterV2.MustRegisterInput(r)
registry.MustRegisterInput(r)

return ch
}

0 comments on commit 3b49a93

Please sign in to comment.