Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tricky-actors-run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#updated Wire up CHIP ingress client in telemetry manager
2 changes: 2 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ SendInterval = '500ms' # Default
SendTimeout = '10s' # Default
# UseBatchSend toggles sending telemetry to the ingress server using the batch client.
UseBatchSend = true # Default
# ChipIngressEnabled enables sending telemetry to CHIP Ingress.
ChipIngressEnabled = false # Default

[[TelemetryIngress.Endpoints]] # Example
# Network aka EVM, Solana, Starknet
Expand Down
45 changes: 45 additions & 0 deletions core/config/mocks/telemetry_ingress.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/config/telemetry_ingress_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type TelemetryIngress interface {
SendTimeout() time.Duration
UseBatchSend() bool
Endpoints() []TelemetryIngressEndpoint
ChipIngressEnabled() bool
}

type TelemetryIngressEndpoint interface {
Expand Down
20 changes: 12 additions & 8 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,14 +730,15 @@ func (d *DatabaseBackup) setFrom(f *DatabaseBackup) {
}

type TelemetryIngress struct {
UniConn *bool
Logging *bool
BufferSize *uint16
MaxBatchSize *uint16
SendInterval *commonconfig.Duration
SendTimeout *commonconfig.Duration
UseBatchSend *bool
Endpoints []TelemetryIngressEndpoint `toml:",omitempty"`
UniConn *bool
Logging *bool
BufferSize *uint16
MaxBatchSize *uint16
SendInterval *commonconfig.Duration
SendTimeout *commonconfig.Duration
UseBatchSend *bool
Endpoints []TelemetryIngressEndpoint `toml:",omitempty"`
ChipIngressEnabled *bool
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flag to enable OTI telemetry ingress via ChIP Ingress

}

type TelemetryIngressEndpoint struct {
Expand Down Expand Up @@ -772,6 +773,9 @@ func (t *TelemetryIngress) setFrom(f *TelemetryIngress) {
if v := f.Endpoints; v != nil {
t.Endpoints = v
}
if v := f.ChipIngressEnabled; v != nil {
t.ChipIngressEnabled = v
}
}

type AuditLogger struct {
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/config_telemetry_ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func (t *telemetryIngressConfig) Endpoints() []config.TelemetryIngressEndpoint {
return endpoints
}

func (t *telemetryIngressConfig) ChipIngressEnabled() bool {
return *t.c.ChipIngressEnabled
}

func (t *telemetryIngressEndpointConfig) Network() string {
return *t.c.Network
}
Expand Down
24 changes: 24 additions & 0 deletions core/services/chainlink/config_telemetry_ingress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/config/toml"
)

func TestTelemetryIngressConfig(t *testing.T) {
Expand All @@ -32,3 +34,25 @@ func TestTelemetryIngressConfig(t *testing.T) {
assert.Equal(t, "prom.test", tec[0].URL().String())
assert.Equal(t, "test-pub-key", tec[0].ServerPubKey())
}

func TestTelemetryIngressConfig_ChipIngressEnabled(t *testing.T) {
t.Run("returns false when ChipIngressEnabled is explicitly false", func(t *testing.T) {
falseVal := false
config := &telemetryIngressConfig{
c: toml.TelemetryIngress{
ChipIngressEnabled: &falseVal,
},
}
assert.False(t, config.ChipIngressEnabled())
})

t.Run("returns true when ChipIngressEnabled is true", func(t *testing.T) {
trueVal := true
config := &telemetryIngressConfig{
c: toml.TelemetryIngress{
ChipIngressEnabled: &trueVal,
},
}
assert.True(t, config.ChipIngressEnabled())
})
}
16 changes: 9 additions & 7 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,14 @@ func TestConfig_Marshal(t *testing.T) {
},
}
full.TelemetryIngress = toml.TelemetryIngress{
UniConn: ptr(false),
Logging: ptr(true),
BufferSize: ptr[uint16](1234),
MaxBatchSize: ptr[uint16](4321),
SendInterval: commoncfg.MustNewDuration(time.Minute),
SendTimeout: commoncfg.MustNewDuration(5 * time.Second),
UseBatchSend: ptr(true),
UniConn: ptr(false),
Logging: ptr(true),
BufferSize: ptr[uint16](1234),
MaxBatchSize: ptr[uint16](4321),
SendInterval: commoncfg.MustNewDuration(time.Minute),
SendTimeout: commoncfg.MustNewDuration(5 * time.Second),
UseBatchSend: ptr(true),
ChipIngressEnabled: ptr(false),
Endpoints: []toml.TelemetryIngressEndpoint{{
Network: ptr("EVM"),
ChainID: ptr("1"),
Expand Down Expand Up @@ -992,6 +993,7 @@ MaxBatchSize = 4321
SendInterval = '1m0s'
SendTimeout = '5s'
UseBatchSend = true
ChipIngressEnabled = false

[[TelemetryIngress.Endpoints]]
Network = 'EVM'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ MaxBatchSize = 50
SendInterval = '500ms'
SendTimeout = '10s'
UseBatchSend = true
ChipIngressEnabled = false

[AuditLogger]
Enabled = false
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ MaxBatchSize = 4321
SendInterval = '1m0s'
SendTimeout = '5s'
UseBatchSend = true
ChipIngressEnabled = false

[[TelemetryIngress.Endpoints]]
Network = 'EVM'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ MaxBatchSize = 50
SendInterval = '500ms'
SendTimeout = '10s'
UseBatchSend = true
ChipIngressEnabled = false

[AuditLogger]
Enabled = true
Expand Down
34 changes: 22 additions & 12 deletions core/services/telemetry/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"github.com/pkg/errors"
"github.com/smartcontractkit/libocr/commontypes"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
common "github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
Expand All @@ -32,6 +33,8 @@ type Manager struct {
uniConn bool
useBatchSend bool
MonitoringEndpointGenerator MonitoringEndpointGenerator

chipIngressClient chipingress.Client
Copy link
Collaborator Author

@pkcll pkcll Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add reference to chip ingress client to be used for OTI telemetry ingress

}

type telemetryEndpoint struct {
Expand All @@ -43,16 +46,23 @@ type telemetryEndpoint struct {
}

// NewManager create a new telemetry manager that is responsible for configuring telemetry agents and generating the defined telemetry endpoints and monitoring endpoints
func NewManager(cfg config.TelemetryIngress, csaKeyStore keystore.CSA, lggr logger.Logger) *Manager {
func NewManager(cfg config.TelemetryIngress, csaKeyStore keystore.CSA, lggr common.Logger) *Manager {
var chipIngressClient chipingress.Client
if cfg.ChipIngressEnabled() {
lggr.Info("ChIP Ingress is enabled for telemetry")
chipIngressClient = beholder.GetClient().Chip
}

m := &Manager{
bufferSize: cfg.BufferSize(),
ks: csaKeyStore,
logging: cfg.Logging(),
maxBatchSize: cfg.MaxBatchSize(),
sendInterval: cfg.SendInterval(),
sendTimeout: cfg.SendTimeout(),
uniConn: cfg.UniConn(),
useBatchSend: cfg.UseBatchSend(),
bufferSize: cfg.BufferSize(),
ks: csaKeyStore,
logging: cfg.Logging(),
maxBatchSize: cfg.MaxBatchSize(),
sendInterval: cfg.SendInterval(),
sendTimeout: cfg.SendTimeout(),
uniConn: cfg.UniConn(),
useBatchSend: cfg.UseBatchSend(),
chipIngressClient: chipIngressClient,
}
m.Service, m.eng = services.Config{
Name: "TelemetryManager",
Expand Down Expand Up @@ -102,7 +112,7 @@ func (m *Manager) GenMultitypeMonitoringEndpoint(network string, chainID string,
return NewMultiIngressAgent(e.client, network, chainID, contractID)
}

func (m *Manager) newEndpoint(e config.TelemetryIngressEndpoint, lggr logger.Logger, cfg config.TelemetryIngress) (services.Service, error) {
func (m *Manager) newEndpoint(e config.TelemetryIngressEndpoint, lggr common.Logger, cfg config.TelemetryIngress) (services.Service, error) {
if e.Network() == "" {
return nil, errors.New("cannot add telemetry endpoint, network cannot be empty")
}
Expand All @@ -123,7 +133,7 @@ func (m *Manager) newEndpoint(e config.TelemetryIngressEndpoint, lggr logger.Log
return nil, errors.Errorf("cannot add telemetry endpoint for network %q and chainID %q, endpoint already exists", e.Network(), e.ChainID())
}

lggr = logger.Sugared(lggr).Named(e.Network()).Named(e.ChainID())
lggr = common.Sugared(lggr).Named(e.Network()).Named(e.ChainID())
var tClient synchronization.TelemetryService
if m.useBatchSend {
tClient = synchronization.NewTelemetryIngressBatchClient(e.URL(), e.ServerPubKey(), m.ks, cfg.Logging(), lggr, cfg.BufferSize(), cfg.MaxBatchSize(), cfg.SendInterval(), cfg.SendTimeout(), cfg.UniConn())
Expand Down
34 changes: 29 additions & 5 deletions core/services/telemetry/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
mocks2 "github.com/smartcontractkit/chainlink/v2/core/services/synchronization/mocks"
)

func setupMockConfig(t *testing.T, useBatchSend bool) *mocks.TelemetryIngress {
func setupMockConfig(t *testing.T, useBatchSend bool, chipIngressEnabled bool) *mocks.TelemetryIngress {
tic := mocks.NewTelemetryIngress(t)
tic.On("BufferSize").Return(uint(123))
tic.On("Logging").Return(true)
Expand All @@ -32,12 +32,13 @@ func setupMockConfig(t *testing.T, useBatchSend bool) *mocks.TelemetryIngress {
tic.On("SendTimeout").Return(time.Second * 7)
tic.On("UniConn").Return(true)
tic.On("UseBatchSend").Return(useBatchSend)
tic.On("ChipIngressEnabled").Return(chipIngressEnabled)

return tic
}

func TestManagerAgents(t *testing.T) {
tic := setupMockConfig(t, true)
tic := setupMockConfig(t, true, false)
te := mocks.NewTelemetryIngressEndpoint(t)
te.On("Network").Return("network-1")
te.On("ChainID").Return("network-1-chainID-1")
Expand All @@ -55,7 +56,7 @@ func TestManagerAgents(t *testing.T) {
me := tm.GenMonitoringEndpoint("network-1", "network-1-chainID-1", "", "")
assert.Equal(t, "*telemetry.TypedIngressAgentBatch", reflect.TypeOf(me).String())

tic = setupMockConfig(t, false)
tic = setupMockConfig(t, false, false)
tic.On("Endpoints").Return([]config.TelemetryIngressEndpoint{te})
tm = NewManager(tic, ks, lggr)
require.Equal(t, "*synchronization.telemetryIngressClient", reflect.TypeOf(tm.endpoints[0].client).String())
Expand Down Expand Up @@ -143,7 +144,7 @@ func TestNewManager(t *testing.T) {
mockEndpoints = append(mockEndpoints, te)
}

tic := setupMockConfig(t, true)
tic := setupMockConfig(t, true, false)
tic.On("Endpoints").Return(mockEndpoints)

lggr, logObs := logger.TestLoggerObserved(t, zapcore.InfoLevel)
Expand Down Expand Up @@ -196,7 +197,7 @@ func TestNewManager(t *testing.T) {
}

func TestCorrectEndpointRouting(t *testing.T) {
tic := setupMockConfig(t, true)
tic := setupMockConfig(t, true, false)
tic.On("Endpoints").Return(nil)

lggr, obsLogs := logger.TestLoggerObserved(t, zapcore.InfoLevel)
Expand Down Expand Up @@ -281,3 +282,26 @@ func TestCorrectEndpointRouting(t *testing.T) {
require.Equal(t, []byte(e.chainID), clientSent[i].Telemetry)
}
}

// add test for current changes in manager
func TestManager_ChipIngressClient(t *testing.T) {
t.Run("disabled chip ingress", func(t *testing.T) {
tic := setupMockConfig(t, true, false)
tic.On("Endpoints").Return(nil)

lggr, _ := logger.TestLoggerObserved(t, zapcore.InfoLevel)
ks := keymocks.NewCSA(t)
tm := NewManager(tic, ks, lggr)
assert.Nil(t, tm.chipIngressClient)
})

t.Run("enabled chip ingress", func(t *testing.T) {
tic := setupMockConfig(t, true, true)
tic.On("Endpoints").Return(nil)

lggr, _ := logger.TestLoggerObserved(t, zapcore.InfoLevel)
ks := keymocks.NewCSA(t)
tm := NewManager(tic, ks, lggr)
assert.NotNil(t, tm.chipIngressClient)
})
}
1 change: 1 addition & 0 deletions core/web/resolver/testdata/config-empty-effective.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ MaxBatchSize = 50
SendInterval = '500ms'
SendTimeout = '10s'
UseBatchSend = true
ChipIngressEnabled = false

[AuditLogger]
Enabled = false
Expand Down
1 change: 1 addition & 0 deletions core/web/resolver/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ MaxBatchSize = 4321
SendInterval = '1m0s'
SendTimeout = '5s'
UseBatchSend = true
ChipIngressEnabled = false

[[TelemetryIngress.Endpoints]]
Network = 'EVM'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ MaxBatchSize = 50
SendInterval = '500ms'
SendTimeout = '10s'
UseBatchSend = true
ChipIngressEnabled = false

[AuditLogger]
Enabled = true
Expand Down
Loading
Loading