Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap/zapcore"
)

type Config struct {
Expand Down Expand Up @@ -47,7 +48,8 @@ type Config struct {
LogBatchProcessor bool // Enabled by default. Disable only for testing.
// Retry config for shared log exporter, used by Emitter and Logger
LogRetryConfig *RetryConfig
LogStreamingEnabled bool // Enable logs streaming to the OTel log exporter
LogStreamingEnabled bool // Enable logs streaming to the OTel log exporter
LogLevel zapcore.Level // Log level for telemetry streaming

// Auth
AuthPublicKeyHex string
Expand Down Expand Up @@ -117,6 +119,7 @@ func DefaultConfig() Config {
LogMaxQueueSize: 2048,
LogBatchProcessor: true,
LogStreamingEnabled: true, // Enable logs streaming by default
LogLevel: zapcore.InfoLevel,
// Auth
AuthHeadersTTL: 10 * time.Minute,
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/beholder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

otelattr "go.opentelemetry.io/otel/attribute"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
)
Expand Down Expand Up @@ -47,7 +48,8 @@ func ExampleConfig() {
LogExportInterval: 1 * time.Second,
LogMaxQueueSize: 2048,
LogBatchProcessor: true,
LogStreamingEnabled: false, // Disable streaming logs by default
LogStreamingEnabled: false, // Disable streaming logs by default
LogLevel: zapcore.InfoLevel, // Default log level
// Auth
AuthPublicKeyHex: "",
AuthHeaders: map[string]string{},
Expand All @@ -62,6 +64,6 @@ func ExampleConfig() {
}
fmt.Printf("%+v\n", *config.LogRetryConfig)
// Output:
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false AuthPublicKeyHex: AuthHeaders:map[] AuthKeySigner:<nil> AuthHeadersTTL:0s}
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false LogLevel:info AuthPublicKeyHex: AuthHeaders:map[] AuthKeySigner:<nil> AuthHeadersTTL:0s}
// {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
}
13 changes: 13 additions & 0 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/hashicorp/go-plugin"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/config"
)
Expand Down Expand Up @@ -62,6 +63,7 @@ const (
envTelemetryEmitterExportMaxBatchSize = "CL_TELEMETRY_EMITTER_EXPORT_MAX_BATCH_SIZE"
envTelemetryEmitterMaxQueueSize = "CL_TELEMETRY_EMITTER_MAX_QUEUE_SIZE"
envTelemetryLogStreamingEnabled = "CL_TELEMETRY_LOG_STREAMING_ENABLED"
envTelemetryLogLevel = "CL_TELEMETRY_LOG_LEVEL"

envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT"
envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION"
Expand Down Expand Up @@ -118,6 +120,7 @@ type EnvConfig struct {
TelemetryEmitterExportMaxBatchSize int
TelemetryEmitterMaxQueueSize int
TelemetryLogStreamingEnabled bool
TelemetryLogLevel zapcore.Level

ChipIngressEndpoint string
ChipIngressInsecureConnection bool
Expand Down Expand Up @@ -187,6 +190,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
add(envTelemetryEmitterExportMaxBatchSize, strconv.Itoa(e.TelemetryEmitterExportMaxBatchSize))
add(envTelemetryEmitterMaxQueueSize, strconv.Itoa(e.TelemetryEmitterMaxQueueSize))
add(envTelemetryLogStreamingEnabled, strconv.FormatBool(e.TelemetryLogStreamingEnabled))
add(envTelemetryLogLevel, e.TelemetryLogLevel.String())

add(envChipIngressEndpoint, e.ChipIngressEndpoint)
add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection))
Expand Down Expand Up @@ -351,6 +355,15 @@ func (e *EnvConfig) parse() error {
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryLogStreamingEnabled, err)
}
logLevelStr := os.Getenv(envTelemetryLogLevel)
if logLevelStr == "" {
logLevelStr = "info" // Default log level
}
var logLevel zapcore.Level
if err := logLevel.Set(logLevelStr); err != nil {
logLevel = zapcore.InfoLevel // Fallback to info level on invalid input
}
e.TelemetryLogLevel = logLevel
// Optional
e.ChipIngressEndpoint = os.Getenv(envChipIngressEndpoint)
e.ChipIngressInsecureConnection, err = getBool(envChipIngressInsecureConnection)
Expand Down
20 changes: 7 additions & 13 deletions pkg/loop/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import (
"time"

"github.com/hashicorp/go-hclog"
otellog "go.opentelemetry.io/otel/log"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/slices"

otellog "go.opentelemetry.io/otel/log"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/logger/otelzap"
)
Expand Down Expand Up @@ -181,22 +180,17 @@ func configureHCLogEncoder(cfg *zap.Config) {
}

// NewOtelLogger returns a logger with two cores:
// 1. The primary JSON core configured via cfgFn (encoder keys changed to @level, @message, @timestamp).
// 2. The otel core (otelzap.NewCore) which receives the raw zap.Entry and fields.
// 1. Primary JSON core with hclog-compatible encoder keys (@level, @message, @timestamp)
// 2. OTEL core (otelzap.NewCore) that exports logs to OpenTelemetry at the specified level
//
// Important:
// The cfgFn only mutates the encoder config used to build the first core.
// otelzap.NewCore implements zapcore.Core and does NOT use that encoder; it derives attributes from the zap.Entry
// (Message, Level, Time, etc.) and zap.Fields directly. Therefore changing encoder keys here does NOT affect how
// the otel core extracts data, and only the first core's JSON output format is altered.
// This preserves backward compatibility for OTEL export while allowing hclog-compatible key names in the primary output.
func NewOtelLogger(otelLogger otellog.Logger) (logger.Logger, error) {
// The encoder config only affects the primary core's JSON output.
// The OTEL core extracts data directly from zap.Entry and fields, independent of encoder settings.
func NewOtelLogger(otelLogger otellog.Logger, level zapcore.Level) (logger.Logger, error) {
primaryCore, err := logger.NewCore(configureHCLogEncoder)
if err != nil {
return nil, err
}
// set debug level from primaryCore to match otelzap.NewCore
return logger.NewWithCores(primaryCore, otelzap.NewCore(otelLogger, otelzap.WithLevel(zapcore.DebugLevel))), nil
return logger.NewWithCores(primaryCore, otelzap.NewCore(otelLogger, otelzap.WithLevel(level))), nil
}

// onceValue returns a function that invokes f only once and returns the value
Expand Down
69 changes: 52 additions & 17 deletions pkg/loop/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stretchr/testify/assert"
sdklog "go.opentelemetry.io/otel/sdk/log"
"go.uber.org/zap/zapcore"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/logger/otelzap"
Expand All @@ -21,21 +22,53 @@ func Test_removeArg(t *testing.T) {
wantArgs []any
wantVal string
}{
{"empty", nil, "logger",
nil, ""},
{"simple", []any{"logger", "foo"}, "logger",
[]any{}, "foo"},
{"multi", []any{"logger", "foo", "bar", "baz"}, "logger",
[]any{"bar", "baz"}, "foo"},
{"reorder", []any{"bar", "baz", "logger", "foo"}, "logger",
[]any{"bar", "baz"}, "foo"},
{
"empty", nil, "logger",
nil, "",
},
{
"simple",
[]any{"logger", "foo"},
"logger",
[]any{},
"foo",
},
{
"multi",
[]any{"logger", "foo", "bar", "baz"},
"logger",
[]any{"bar", "baz"},
"foo",
},
{
"reorder",
[]any{"bar", "baz", "logger", "foo"},
"logger",
[]any{"bar", "baz"},
"foo",
},

{"invalid", []any{"logger"}, "logger",
[]any{"logger"}, ""},
{"invalid-multi", []any{"foo", "bar", "logger"}, "logger",
[]any{"foo", "bar", "logger"}, ""},
{"value", []any{"foo", "logger", "bar", "baz"}, "logger",
[]any{"foo", "logger", "bar", "baz"}, ""},
{
"invalid",
[]any{"logger"},
"logger",
[]any{"logger"},
"",
},
{
"invalid-multi",
[]any{"foo", "bar", "logger"},
"logger",
[]any{"foo", "bar", "logger"},
"",
},
{
"value",
[]any{"foo", "logger", "bar", "baz"},
"logger",
[]any{"foo", "logger", "bar", "baz"},
"",
},
} {
t.Run(tt.name, func(t *testing.T) {
args, val := removeArg(tt.args, tt.key)
Expand Down Expand Up @@ -74,7 +107,7 @@ func TestNewOtelLogger(t *testing.T) {
)
otelLggr := lp.Logger("test-" + tt.name)

lggr, err := NewOtelLogger(otelLggr)
lggr, err := NewOtelLogger(otelLggr, zapcore.DebugLevel)
if err != nil {
t.Fatalf("NewOtelLogger error: %v", err)
}
Expand Down Expand Up @@ -113,5 +146,7 @@ func (r *recordingExporter) Shutdown(context.Context) error { return nil }

// Compile-time assertion that otelzap.NewCore still satisfies zapcore.Core usage pattern.
// (Guards against accidental API break causing this test file to silently compile with stubs.)
var _ = otelzap.NewCore
var _ logger.Logger // silence unused import of logger in case future refactors remove usage
var (
_ = otelzap.NewCore
_ logger.Logger // silence unused import of logger in case future refactors remove usage
)
3 changes: 2 additions & 1 deletion pkg/loop/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (s *Server) start() error {
EmitterExportMaxBatchSize: s.EnvConfig.TelemetryEmitterExportMaxBatchSize,
EmitterMaxQueueSize: s.EnvConfig.TelemetryEmitterMaxQueueSize,
LogStreamingEnabled: s.EnvConfig.TelemetryLogStreamingEnabled,
LogLevel: s.EnvConfig.TelemetryLogLevel,
ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "",
ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint,
ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection,
Expand Down Expand Up @@ -169,7 +170,7 @@ func (s *Server) start() error {
beholder.SetGlobalOtelProviders()

if beholderCfg.LogStreamingEnabled {
otelLogger, err := NewOtelLogger(beholderClient.Logger)
otelLogger, err := NewOtelLogger(beholderClient.Logger, beholderCfg.LogLevel)
if err != nil {
return fmt.Errorf("failed to enable log streaming: %w", err)
}
Expand Down
Loading