Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
} else {
loggerProcessor = sdklog.NewSimpleProcessor(sharedLogExporter)
}

loggerAttributes := []attribute.KeyValue{
attribute.String("beholder_data_type", "zap_log_message"),
}
Expand All @@ -147,6 +148,12 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
sdklog.WithResource(loggerResource),
sdklog.WithProcessor(loggerProcessor),
)

// If log streaming is disabled, use a noop logger provider
if !cfg.LogStreamingEnabled {
loggerProvider = BeholderNoopLoggerProvider()
}

logger := loggerProvider.Logger(defaultPackageName)

// Tracer
Expand Down
35 changes: 35 additions & 0 deletions pkg/beholder/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
"go.opentelemetry.io/otel/log"
otellog "go.opentelemetry.io/otel/log"
sdklog "go.opentelemetry.io/otel/sdk/log"

Expand Down Expand Up @@ -290,6 +291,40 @@ func TestNewClientWithChipIngressConfig(t *testing.T) {
assert.IsType(t, &beholder.DualSourceEmitter{}, client.Emitter)
})


t.Run("LogStreamingEnabled true creates logger", func(t *testing.T) {
cfg := beholder.Config{
OtelExporterGRPCEndpoint: "grpc-endpoint",
LogStreamingEnabled: true,
}
client, err := beholder.NewClient(cfg)
require.NoError(t, err)
assert.NotNil(t, client)
assert.NotNil(t, client.LoggerProvider)
assert.NotNil(t, client.Logger)
})

t.Run("LogStreamingEnabled false disables logger", func(t *testing.T) {
cfg := beholder.Config{
OtelExporterGRPCEndpoint: "grpc-endpoint",
LogStreamingEnabled: false,
}
client, err := beholder.NewClient(cfg)
require.NoError(t, err)
// LoggerProvider and Logger should NOT be nil, but should be no-op implementations
assert.NotNil(t, client.LoggerProvider)
assert.NotNil(t, client.Logger)

// Optionally, check that using the logger does not panic
defer func() {
if r := recover(); r != nil {
t.Errorf("Logger panicked when LogStreamingEnabled is false: %v", r)
}
}()
client.Logger.Emit(context.Background(), log.Record{})
})


t.Run("creates client with ChipIngress insecure endpoint", func(t *testing.T) {
client, err := beholder.NewClient(beholder.Config{
OtelExporterGRPCEndpoint: "grpc-endpoint",
Expand Down
4 changes: 3 additions & 1 deletion pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ type Config struct {
LogMaxQueueSize int
LogBatchProcessor bool // Enabled by default. Disable only for testing.
// Retry config for shared log exporter, used by Emitter and Logger
LogRetryConfig *RetryConfig
LogRetryConfig *RetryConfig
LogStreamingEnabled bool // Enable logs streaming to the OTel log exporter

// Auth
AuthPublicKeyHex string
Expand Down Expand Up @@ -113,6 +114,7 @@ func DefaultConfig() Config {
LogExportInterval: 1 * time.Second,
LogMaxQueueSize: 2048,
LogBatchProcessor: true,
LogStreamingEnabled: true, // Enable logs streaming by default
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/beholder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func ExampleConfig() {
LogExportInterval: 1 * time.Second,
LogMaxQueueSize: 2048,
LogBatchProcessor: true,
LogStreamingEnabled: false, // Disable streaming logs by default
}
fmt.Printf("%+v\n", config)
config.LogRetryConfig = &beholder.RetryConfig{
Expand All @@ -56,6 +57,6 @@ func ExampleConfig() {
}
fmt.Printf("%+v\n", *config.LogRetryConfig)
// Output:
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> AuthPublicKeyHex: AuthHeaders:map[]}
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false AuthPublicKeyHex: AuthHeaders:map[]}
// {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
}
7 changes: 7 additions & 0 deletions pkg/beholder/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
}

// Logger

var loggerProcessor sdklog.Processor
if cfg.LogBatchProcessor {
batchProcessorOpts := []sdklog.BatchProcessorOption{}
Expand Down Expand Up @@ -112,6 +113,12 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
sdklog.WithResource(loggerResource),
sdklog.WithProcessor(loggerProcessor),
)

// If log streaming is disabled, use a noop logger provider
if !cfg.LogStreamingEnabled {
loggerProvider = BeholderNoopLoggerProvider()
}

logger := loggerProvider.Logger(defaultPackageName)

// Tracer
Expand Down
13 changes: 13 additions & 0 deletions pkg/beholder/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,16 @@ func (cfg *writerClientConfig) WithWriter(w io.Writer) {
cfg.TraceOptions = append(cfg.TraceOptions, stdouttrace.WithWriter(w))
cfg.MetricOptions = append(cfg.MetricOptions, stdoutmetric.WithWriter(w))
}

type beholderNoopLogExporter struct{}

func (beholderNoopLogExporter) Export(ctx context.Context, records []sdklog.Record) error { return nil }
func (beholderNoopLogExporter) Shutdown(ctx context.Context) error { return nil }
func (beholderNoopLogExporter) ForceFlush(ctx context.Context) error { return nil }

// BeholderNoopLoggerProvider returns a *sdklog.LoggerProvider (the same type as sdklog.NewLoggerProvider) that drops all logs.
func BeholderNoopLoggerProvider() *sdklog.LoggerProvider {
return sdklog.NewLoggerProvider(
sdklog.WithProcessor(sdklog.NewSimpleProcessor(beholderNoopLogExporter{})),
)
}
Loading