Skip to content

Commit 76285b8

Browse files
Ensure beats receivers log the same metadata as beats processes (#8606) (#8711)
* Use the base logger for otel collector * Add E2E test comparing logs between beats processes and beats receivers * Clean up tests * Fix incorrect comment * Fix conflicts after rebase * Fix log parsing in integration test * Use different contexts for controlling agents * Fix incorrect test ordering * Fix incorrect test context handling * Fix bug in testcontext.WithDeadline (cherry picked from commit aa44f8b) Co-authored-by: Mikołaj Świątek <mail@mikolajswiatek.com>
1 parent b3edb4e commit 76285b8

File tree

6 files changed

+244
-63
lines changed

6 files changed

+244
-63
lines changed

internal/pkg/agent/application/application.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func New(
224224
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
225225
}
226226

227-
otelManager := otelmanager.NewOTelManager(log.Named("otel_manager"))
227+
otelManager := otelmanager.NewOTelManager(log.Named("otel_manager"), baseLogger)
228228
coord := coordinator.New(log, cfg, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, varsManager, caps, monitor, isManaged, otelManager, actionAcker, compModifiers...)
229229
if managed != nil {
230230
// the coordinator requires the config manager as well as in managed-mode the config manager requires the

internal/pkg/agent/application/coordinator/coordinator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1078,7 +1078,7 @@ func createCoordinator(t testing.TB, ctx context.Context, opts ...CoordinatorOpt
10781078
cfg.Port = 0
10791079
rm, err := runtime.NewManager(l, l, ai, apmtest.DiscardTracer, monitoringMgr, cfg)
10801080
require.NoError(t, err)
1081-
otelMgr := otelmanager.NewOTelManager(l)
1081+
otelMgr := otelmanager.NewOTelManager(l, l)
10821082

10831083
caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l)
10841084
require.NoError(t, err)

internal/pkg/otel/manager/manager.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ import (
2222

2323
// OTelManager is a manager that manages the lifecycle of the OTel collector inside of the Elastic Agent.
2424
type OTelManager struct {
25-
logger *logger.Logger
26-
errCh chan error
25+
// baseLogger is the base logger for the otel collector, and doesn't include any agent-specific fields.
26+
baseLogger *logger.Logger
27+
logger *logger.Logger
28+
errCh chan error
2729

2830
// The current configuration that the OTel collector is using. In the case that
2931
// the cfg is nil then the collector is not running.
@@ -42,13 +44,14 @@ type OTelManager struct {
4244
}
4345

4446
// NewOTelManager returns a OTelManager.
45-
func NewOTelManager(logger *logger.Logger) *OTelManager {
47+
func NewOTelManager(logger, baseLogger *logger.Logger) *OTelManager {
4648
return &OTelManager{
47-
logger: logger,
48-
errCh: make(chan error, 1), // holds at most one error
49-
cfgCh: make(chan *confmap.Conf),
50-
statusCh: make(chan *status.AggregateStatus),
51-
doneChan: make(chan struct{}),
49+
logger: logger,
50+
baseLogger: baseLogger,
51+
errCh: make(chan error, 1), // holds at most one error
52+
cfgCh: make(chan *confmap.Conf),
53+
statusCh: make(chan *status.AggregateStatus),
54+
doneChan: make(chan struct{}),
5255
}
5356
}
5457

@@ -196,7 +199,7 @@ func (m *OTelManager) startCollector(cfg *confmap.Conf, errCh chan error) (conte
196199
otel.WithExtensionFactory(NewAgentStatusFactory(m)))
197200
settings.DisableGracefulShutdown = true // managed by this manager
198201
settings.LoggingOptions = []zap.Option{zap.WrapCore(func(zapcore.Core) zapcore.Core {
199-
return m.logger.Core() // use same zap as agent
202+
return m.baseLogger.Core() // use the base logger also used for logs from the command runtime
200203
})}
201204
svc, err := otelcol.NewCollector(*settings)
202205
if err != nil {

internal/pkg/otel/manager/manager_test.go

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ var (
6767
func TestOTelManager_Run(t *testing.T) {
6868
ctx, cancel := context.WithCancel(context.Background())
6969
defer cancel()
70-
l, _ := loggertest.New("otel")
71-
m := NewOTelManager(l)
70+
base, _ := loggertest.New("otel")
71+
l, _ := loggertest.New("otel-manager")
72+
m := NewOTelManager(l, base)
7273

7374
var errMx sync.Mutex
7475
var err error
@@ -188,8 +189,9 @@ func TestOTelManager_Run(t *testing.T) {
188189
func TestOTelManager_ConfigError(t *testing.T) {
189190
ctx, cancel := context.WithCancel(context.Background())
190191
defer cancel()
191-
l, _ := loggertest.New("otel")
192-
m := NewOTelManager(l)
192+
base, _ := loggertest.New("otel")
193+
l, _ := loggertest.New("otel-manager")
194+
m := NewOTelManager(l, base)
193195

194196
go func() {
195197
err := m.Run(ctx)
@@ -240,6 +242,41 @@ outer:
240242
assert.Error(t, err, "otel manager should have returned an error")
241243
}
242244

245+
func TestOTelManager_Logging(t *testing.T) {
246+
ctx, cancel := context.WithCancel(context.Background())
247+
defer cancel()
248+
base, obs := loggertest.New("otel")
249+
l, _ := loggertest.New("otel-manager")
250+
m := NewOTelManager(l, base)
251+
252+
go func() {
253+
err := m.Run(ctx)
254+
assert.ErrorIs(t, err, context.Canceled, "otel manager should be cancelled")
255+
}()
256+
257+
// watch is synchronous, so we need to read from it to avoid blocking the manager
258+
go func() {
259+
for {
260+
select {
261+
case <-m.Watch():
262+
case <-ctx.Done():
263+
return
264+
}
265+
}
266+
}()
267+
268+
cfg := confmap.NewFromStringMap(testConfig)
269+
m.Update(cfg)
270+
271+
// the collector should log to the base logger
272+
assert.EventuallyWithT(t, func(collect *assert.CollectT) {
273+
logs := obs.All()
274+
require.NotEmpty(collect, logs, "Logs should not be empty")
275+
firstMessage := logs[0].Message
276+
assert.Equal(collect, firstMessage, "Setting up own telemetry...")
277+
}, time.Second*10, time.Second)
278+
}
279+
243280
func statusToYaml(s *status.AggregateStatus) string {
244281
printable := toSerializableStatus(s)
245282
yamlBytes, _ := yaml.Marshal(printable)

pkg/testing/tools/testcontext/testcontext.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ func WithDeadline(
1717
parent context.Context,
1818
deadline time.Time) (context.Context, context.CancelFunc) {
1919
if d, ok := t.Deadline(); ok {
20-
deadline = d
20+
if d.Before(deadline) {
21+
deadline = d
22+
}
2123
}
2224
ctx, cancel := context.WithDeadline(parent, deadline)
2325
return ctx, cancel

0 commit comments

Comments
 (0)