Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion filebeat/tests/integration/filebeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestFilebeatRunsAndLogsJSONToFile(t *testing.T) {
line := r.Bytes()
m := map[string]any{}
if err := json.Unmarshal(line, &m); err != nil {
t.Fatalf("line %d is not a valid JSON: %s", count, err)
t.Fatalf("line %d is not a valid JSON: %s: %s", count, err, string(line))
}
count++
}
Expand Down
4 changes: 0 additions & 4 deletions libbeat/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ type Server struct {

// New creates a new API Server with no routes attached.
func New(log *logp.Logger, config *config.C) (*Server, error) {
if log == nil {
log = logp.NewLogger("")
}

cfg := DefaultConfig
err := config.Unpack(&cfg)
if err != nil {
Expand Down
19 changes: 12 additions & 7 deletions libbeat/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp/logptest"
)

func TestConfiguration(t *testing.T) {
logger := logptest.NewTestingLogger(t, "")
if runtime.GOOS != "windows" {
t.Skip("Check for User and Security Descriptor")
return
Expand All @@ -44,7 +46,7 @@ func TestConfiguration(t *testing.T) {
"user": "admin",
})

_, err := New(nil, cfg)
_, err := New(logger, cfg)
require.Error(t, err)
})

Expand All @@ -54,12 +56,14 @@ func TestConfiguration(t *testing.T) {
"security_descriptor": "D:P(A;;GA;;;1234)",
})

_, err := New(nil, cfg)
_, err := New(logger, cfg)
require.Error(t, err)
})
}

func TestSocket(t *testing.T) {
logger := logptest.NewTestingLogger(t, "")

if runtime.GOOS == "windows" {
t.Skip("Unix Sockets don't work under windows")
return
Expand Down Expand Up @@ -87,7 +91,7 @@ func TestSocket(t *testing.T) {
"host": "unix://" + sockFile,
})

s, err := New(nil, cfg)
s, err := New(logger, cfg)
require.NoError(t, err)
attachEchoHelloHandler(t, s)
go s.Start()
Expand Down Expand Up @@ -130,7 +134,7 @@ func TestSocket(t *testing.T) {
"host": "unix://" + sockFile,
})

s, err := New(nil, cfg)
s, err := New(logger, cfg)
require.NoError(t, err)
attachEchoHelloHandler(t, s)
go s.Start()
Expand Down Expand Up @@ -166,8 +170,8 @@ func TestHTTP(t *testing.T) {
cfg := config.MustNewConfigFrom(map[string]interface{}{
"host": url,
})

s, err := New(nil, cfg)
logger := logptest.NewTestingLogger(t, "")
s, err := New(logger, cfg)
require.NoError(t, err)
attachEchoHelloHandler(t, s)
go s.Start()
Expand Down Expand Up @@ -198,7 +202,8 @@ func TestAttachHandler(t *testing.T) {
"host": "http://localhost:0",
})

s, err := New(nil, cfg)
logger := logptest.NewTestingLogger(t, "")
s, err := New(logger, cfg)
require.NoError(t, err)

req := httptest.NewRequest(http.MethodGet, "http://"+s.l.Addr().String()+"/test", nil)
Expand Down
2 changes: 2 additions & 0 deletions libbeat/beat/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/gofrs/uuid/v5"
"go.opentelemetry.io/collector/consumer"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
)

Expand All @@ -46,6 +47,7 @@ type Info struct {
Monitoring Monitoring
LogConsumer consumer.Logs // otel log consumer
UseDefaultProcessors bool // Whether to use the default processors
Logger *logp.Logger
}

type Monitoring struct {
Expand Down
90 changes: 47 additions & 43 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ func defaultCertReloadConfig() certReloadConfig {
}
}

var debugf = logp.MakeDebug("beat")

// Run initializes and runs a Beater implementation. name is the name of the
// Beat (e.g. packetbeat or metricbeat). version is version number of the Beater
// implementation. bt is the `Creator` callback for creating a new beater
Expand Down Expand Up @@ -329,9 +327,9 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
return nil, err
}

log := logp.NewLogger("beat")
log := b.Info.Logger.Named("beat")
log.Infof("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version)
b.logSystemInfo(log)
b.logSystemInfo()

err = b.registerESVersionCheckCallback()
if err != nil {
Expand All @@ -350,7 +348,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
reg = monitoring.Default.NewRegistry("libbeat")
}

err = metricreport.SetupMetrics(logp.NewLogger("metrics"), b.Info.Beat, version.GetDefaultVersion())
err = metricreport.SetupMetrics(b.Beat.Info.Logger.Named("metrics"), b.Info.Beat, version.GetDefaultVersion())
if err != nil {
return nil, err
}
Expand All @@ -359,14 +357,14 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
mgmt := b.Info.Monitoring.StateRegistry.NewRegistry("management")
monitoring.NewBool(mgmt, "enabled").Set(b.Manager.Enabled())

debugf("Initializing output plugins")
log.Debug("Initializing output plugins")
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
if !outputEnabled {
if b.Manager.Enabled() {
logp.Info("Output is configured through Central Management")
b.Info.Logger.Info("Output is configured through Central Management")
} else {
msg := "no outputs are defined, please define one under the output section"
logp.Info("%s", msg)
b.Info.Logger.Info("%s", msg)
return nil, errors.New(msg)
}
}
Expand All @@ -375,7 +373,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
monitors := pipeline.Monitors{
Metrics: reg,
Telemetry: b.Info.Monitoring.StateRegistry,
Logger: logp.L().Named("publisher"),
Logger: b.Info.Logger.Named("publisher"),
Tracer: b.Instrumentation.Tracer(),
}
outputFactory := b.MakeOutputFactory(b.Config.Output)
Expand Down Expand Up @@ -404,14 +402,15 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
}

func (b *Beat) launch(settings Settings, bt beat.Creator) error {
logger := b.Info.Logger
defer func() {
_ = logp.Sync()
_ = logger.Sync()
}()
defer logp.Info("%s stopped.", b.Info.Beat)
defer logger.Infof("%s stopped.", b.Info.Beat)

defer func() {
if err := b.processors.Close(); err != nil {
logp.Warn("Failed to close global processing: %v", err)
logger.Warnf("Failed to close global processing: %v", err)
}
}()

Expand All @@ -433,7 +432,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
_ = bl.Unlock()
}()
} else {
logp.Info("running under elastic-agent, per-beat lockfiles disabled")
logger.Info("running under elastic-agent, per-beat lockfiles disabled")
}

svc.BeforeRun()
Expand All @@ -446,7 +445,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
// that would be set at runtime.
if b.Config.HTTP.Enabled() {
var err error
b.API, err = api.NewWithDefaultRoutes(logp.NewLogger(""), b.Config.HTTP, api.NamespaceLookupFunc())
b.API, err = api.NewWithDefaultRoutes(logger, b.Config.HTTP, api.NamespaceLookupFunc())
if err != nil {
return fmt.Errorf("could not start the HTTP server for the API: %w", err)
}
Expand Down Expand Up @@ -530,7 +529,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
return err
}

logp.Info("%s start running.", b.Info.Beat)
logger.Infof("%s start running.", b.Info.Beat)

err = beater.Run(&b.Beat)
if b.shouldReexec {
Expand Down Expand Up @@ -776,12 +775,6 @@ func (b *Beat) configure(settings Settings) error {
config.OverwriteConfigOpts(configOptsWithKeystore(store))
}

instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version)
if err != nil {
return err
}
b.Beat.Instrumentation = instrumentation

b.keystore = store
b.Beat.Keystore = store
err = cloudid.OverwriteSettings(cfg)
Expand All @@ -795,7 +788,7 @@ func (b *Beat) configure(settings Settings) error {
return fmt.Errorf("error unpacking config data: %w", err)
}

if err := PromoteOutputQueueSettings(&b.Config); err != nil {
if err := PromoteOutputQueueSettings(b); err != nil {
return fmt.Errorf("could not promote output queue settings: %w", err)
}

Expand All @@ -814,20 +807,30 @@ func (b *Beat) configure(settings Settings) error {
return fmt.Errorf("error setting timestamp precision: %w", err)
}

if err := configure.LoggingWithTypedOutputs(b.Info.Beat, b.Config.Logging, b.Config.EventLogging, logp.TypeKey, logp.EventType); err != nil {
b.Info.Logger, err = configure.LoggingWithTypedOutputsLocal(b.Info.Beat, b.Config.Logging, b.Config.EventLogging, logp.TypeKey, logp.EventType)
if err != nil {
return fmt.Errorf("error initializing logging: %w", err)
}

// extracting here for ease of use
logger := b.Info.Logger

instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version, b.Info.Logger)
if err != nil {
return err
}
b.Beat.Instrumentation = instrumentation

// log paths values to help with troubleshooting
logp.Info("%s", paths.Paths.String())
logger.Infof("%s", paths.Paths.String())

metaPath := paths.Resolve(paths.Data, "meta.json")
err = b.LoadMeta(metaPath)
if err != nil {
return err
}

logp.Info("Beat ID: %v", b.Info.ID)
logger.Infof("Beat ID: %v", b.Info.ID)

// Try to get the host's FQDN and set it.
h, err := sysinfo.Host()
Expand All @@ -842,7 +845,7 @@ func (b *Beat) configure(settings Settings) error {
if err != nil {
// FQDN lookup is "best effort". We log the error, fallback to
// the OS-reported hostname, and move on.
logp.Warn("unable to lookup FQDN: %s, using hostname = %s as FQDN", err.Error(), b.Info.Hostname)
logger.Infof("unable to lookup FQDN: %s, using hostname = %s as FQDN", err.Error(), b.Info.Hostname)
b.Info.FQDN = b.Info.Hostname
} else {
b.Info.FQDN = fqdn
Expand Down Expand Up @@ -877,11 +880,11 @@ func (b *Beat) configure(settings Settings) error {
}

if maxProcs := b.Config.MaxProcs; maxProcs > 0 {
logp.Info("Set max procs limit: %v", maxProcs)
logger.Infof("Set max procs limit: %v", maxProcs)
runtime.GOMAXPROCS(maxProcs)
}
if gcPercent := b.Config.GCPercent; gcPercent > 0 {
logp.Info("Set gc percentage to: %v", gcPercent)
logger.Infof("Set gc percentage to: %v", gcPercent)
debug.SetGCPercent(gcPercent)
}

Expand All @@ -894,9 +897,9 @@ func (b *Beat) configure(settings Settings) error {

imFactory := settings.IndexManagement
if imFactory == nil {
imFactory = idxmgmt.MakeDefaultSupport(settings.ILM)
imFactory = idxmgmt.MakeDefaultSupport(settings.ILM, logger)
}
b.IdxSupporter, err = imFactory(nil, b.Beat.Info, b.RawConfig)
b.IdxSupporter, err = imFactory(logger, b.Beat.Info, b.RawConfig)
if err != nil {
return err
}
Expand All @@ -905,7 +908,7 @@ func (b *Beat) configure(settings Settings) error {
if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
}
b.processors, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig)
b.processors, err = processingFactory(b.Info, logger.Named("processors"), b.RawConfig)

b.Manager.RegisterDiagnosticHook("global processors", "a list of currently configured global beat processors",
"global_processors.txt", "text/plain", b.agentDiagnosticHook)
Expand All @@ -914,7 +917,7 @@ func (b *Beat) configure(settings Settings) error {
m := monitoring.CollectStructSnapshot(monitoring.Default, monitoring.Full, true)
data, err := json.MarshalIndent(m, "", " ")
if err != nil {
logp.L().Warnw("Failed to collect beat metric snapshot for Agent diagnostics.", "error", err)
logger.Warnw("Failed to collect beat metric snapshot for Agent diagnostics.", "error", err)
return []byte(err.Error())
}
return data
Expand Down Expand Up @@ -943,7 +946,7 @@ func (b *Beat) LoadMeta(metaPath string) error {
FirstStart time.Time `json:"first_start"`
}

logp.Debug("beat", "Beat metadata path: %v", metaPath)
b.Info.Logger.Debugf("beat", "Beat metadata path: %v", metaPath)

f, err := openRegular(metaPath)
if err != nil && !os.IsNotExist(err) {
Expand Down Expand Up @@ -1065,7 +1068,7 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error {
if err != nil {
return fmt.Errorf("error importing Kibana dashboards: %w", err)
}
logp.Info("Kibana dashboards successfully loaded.")
b.Info.Logger.Info("Kibana dashboards successfully loaded.")
}

return nil
Expand Down Expand Up @@ -1171,7 +1174,7 @@ func (b *Beat) MakeOutputFactory(
}

func (b *Beat) reloadOutputOnCertChange(cfg config.Namespace) error {
logger := logp.L().Named("ssl.cert.reloader")
logger := b.Info.Logger.Named("ssl.cert.reloader")
// Here the output is created and we have access to the Beat struct (with the manager)
// as a workaround we can unpack the new settings and trigger the reload-watcher from here

Expand Down Expand Up @@ -1360,8 +1363,9 @@ func handleError(err error) error {
// in debugging. This information includes data about the beat, build, go
// runtime, host, and process. If any of the data is not available it will be
// omitted.
func (b *Beat) logSystemInfo(log *logp.Logger) {
defer logp.Recover("An unexpected error occurred while collecting " +
func (b *Beat) logSystemInfo() {
log := b.Beat.Info.Logger
defer log.Recover("An unexpected error occurred while collecting " +
"information about the system.")
log = log.With(logp.Namespace("system_info"))

Expand Down Expand Up @@ -1527,21 +1531,21 @@ func sanitizeIPs(ips []string) []string {
return validIPs
}

// promoteOutputQueueSettings checks to see if the output
// PromoteOutputQueueSettings checks to see if the output
// configuration has queue settings defined and if so it promotes them
// to the top level queue settings. This is done to allow existing
// behavior of specifying queue settings at the top level or like
// elastic-agent that specifies queue settings under the output
func PromoteOutputQueueSettings(bc *beatConfig) error {
if bc.Output.IsSet() && bc.Output.Config().Enabled() {
func PromoteOutputQueueSettings(b *Beat) error {
if b.Config.Output.IsSet() && b.Config.Output.Config().Enabled() {
pc := pipeline.Config{}
err := bc.Output.Config().Unpack(&pc)
err := b.Config.Output.Config().Unpack(&pc)
if err != nil {
return fmt.Errorf("error unpacking output queue settings: %w", err)
}
if pc.Queue.IsSet() {
logp.Info("global queue settings replaced with output queue settings")
bc.Pipeline.Queue = pc.Queue
b.Info.Logger.Info("global queue settings replaced with output queue settings")
b.Config.Pipeline.Queue = pc.Queue
}
}
return nil
Expand Down
Loading
Loading