diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index 0a7c82a46a..c0ee09aa4e 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -133,6 +133,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro } else { loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter) } + loggerAttributes := []attribute.KeyValue{ attribute.String("beholder_data_type", "zap_log_message"), } @@ -147,6 +148,12 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro sdklog.WithResource(loggerResource), sdklog.WithProcessor(loggerProcessor), ) + + // If log streaming is disabled, use a noop logger provider + if !cfg.LogStreamingEnabled { + loggerProvider = BeholderNoopLoggerProvider() + } + logger := loggerProvider.Logger(defaultPackageName) // Tracer diff --git a/pkg/beholder/client_test.go b/pkg/beholder/client_test.go index b473fcf9ee..b27aac9eaa 100644 --- a/pkg/beholder/client_test.go +++ b/pkg/beholder/client_test.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" + "go.opentelemetry.io/otel/log" otellog "go.opentelemetry.io/otel/log" sdklog "go.opentelemetry.io/otel/sdk/log" @@ -290,6 +291,40 @@ func TestNewClientWithChipIngressConfig(t *testing.T) { assert.IsType(t, &beholder.DualSourceEmitter{}, client.Emitter) }) + + t.Run("LogStreamingEnabled true creates logger", func(t *testing.T) { + cfg := beholder.Config{ + OtelExporterGRPCEndpoint: "grpc-endpoint", + LogStreamingEnabled: true, + } + client, err := beholder.NewClient(cfg) + require.NoError(t, err) + assert.NotNil(t, client) + assert.NotNil(t, client.LoggerProvider) + assert.NotNil(t, client.Logger) + }) + + t.Run("LogStreamingEnabled false disables logger", func(t *testing.T) { + cfg := beholder.Config{ + OtelExporterGRPCEndpoint: "grpc-endpoint", + LogStreamingEnabled: false, + } + client, err := beholder.NewClient(cfg) + require.NoError(t, err) + // LoggerProvider and Logger should NOT be nil, but should be no-op implementations + assert.NotNil(t, client.LoggerProvider) + assert.NotNil(t, client.Logger) + + // Optionally, check that using the logger does not panic + defer func() { + if r := recover(); r != nil { + t.Errorf("Logger panicked when LogStreamingEnabled is false: %v", r) + } + }() + client.Logger.Emit(context.Background(), log.Record{}) + }) + + t.Run("creates client with ChipIngress insecure endpoint", func(t *testing.T) { client, err := beholder.NewClient(beholder.Config{ OtelExporterGRPCEndpoint: "grpc-endpoint", diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index 82ae8b44d1..d08226961d 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -46,7 +46,8 @@ type Config struct { LogMaxQueueSize int LogBatchProcessor bool // Enabled by default. Disable only for testing. // Retry config for shared log exporter, used by Emitter and Logger - LogRetryConfig *RetryConfig + LogRetryConfig *RetryConfig + LogStreamingEnabled bool // Enable logs streaming to the OTel log exporter // Auth AuthPublicKeyHex string @@ -113,6 +114,7 @@ func DefaultConfig() Config { LogExportInterval: 1 * time.Second, LogMaxQueueSize: 2048, LogBatchProcessor: true, + LogStreamingEnabled: true, // Enable logs streaming by default } } diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go index 59d3a17ea6..2739580993 100644 --- a/pkg/beholder/config_test.go +++ b/pkg/beholder/config_test.go @@ -47,6 +47,7 @@ func ExampleConfig() { LogExportInterval: 1 * time.Second, LogMaxQueueSize: 2048, LogBatchProcessor: true, + LogStreamingEnabled: false, // Disable streaming logs by default } fmt.Printf("%+v\n", config) config.LogRetryConfig = &beholder.RetryConfig{ @@ -56,6 +57,6 @@ func ExampleConfig() { } fmt.Printf("%+v\n", *config.LogRetryConfig) // Output: - // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: AuthPublicKeyHex: AuthHeaders:map[]} + // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false AuthPublicKeyHex: AuthHeaders:map[]} // {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s} } diff --git a/pkg/beholder/httpclient.go b/pkg/beholder/httpclient.go index c36f4df606..3c80ac0615 100644 --- a/pkg/beholder/httpclient.go +++ b/pkg/beholder/httpclient.go @@ -76,6 +76,7 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro } // Logger + var loggerProcessor sdklog.Processor if cfg.LogBatchProcessor { batchProcessorOpts := []sdklog.BatchProcessorOption{} @@ -112,6 +113,12 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro sdklog.WithResource(loggerResource), sdklog.WithProcessor(loggerProcessor), ) + + // If log streaming is disabled, use a noop logger provider + if !cfg.LogStreamingEnabled { + loggerProvider = BeholderNoopLoggerProvider() + } + logger := loggerProvider.Logger(defaultPackageName) // Tracer diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go index 3012c03048..14226001f0 100644 --- a/pkg/beholder/noop.go +++ b/pkg/beholder/noop.go @@ -128,3 +128,16 @@ func (cfg *writerClientConfig) WithWriter(w io.Writer) { cfg.TraceOptions = append(cfg.TraceOptions, stdouttrace.WithWriter(w)) cfg.MetricOptions = append(cfg.MetricOptions, stdoutmetric.WithWriter(w)) } + +type beholderNoopLogExporter struct{} + +func (beholderNoopLogExporter) Export(ctx context.Context, records []sdklog.Record) error { return nil } +func (beholderNoopLogExporter) Shutdown(ctx context.Context) error { return nil } +func (beholderNoopLogExporter) ForceFlush(ctx context.Context) error { return nil } + +// BeholderNoopLoggerProvider returns a *sdklog.LoggerProvider (the same type as sdklog.NewLoggerProvider) that drops all logs. +func BeholderNoopLoggerProvider() *sdklog.LoggerProvider { + return sdklog.NewLoggerProvider( + sdklog.WithProcessor(sdklog.NewSimpleProcessor(beholderNoopLogExporter{})), + ) +}