Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/brown-ghosts-buy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#added OCR2 config option SampleTelemetry which enables telemetry sampling.
5 changes: 5 additions & 0 deletions .changeset/polite-cobras-make.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal #added Sampling of LLO telemetry.
5 changes: 5 additions & 0 deletions .changeset/tiny-moons-bathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#updated bumped chainlink-data-streams version
3 changes: 1 addition & 2 deletions config_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ var (
func TestConfigDocs(t *testing.T) {
config, err := docs.GenerateConfig()
assert.NoError(t, err, "invalid config docs")
assert.Equal(t, configMD, config, "docs/CONFIG.md is out of date. Run '"+
"' to regenerate.")
assert.Equal(t, configMD, config, "docs/CONFIG.md is out of date. Run 'make config-docs' to regenerate.")

secrets, err := docs.GenerateSecrets()
assert.NoError(t, err, "invalid secrets docs")
Expand Down
2 changes: 2 additions & 0 deletions core/config/docs/core.toml
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ DefaultTransactionQueueDepth = 1 # Default
SimulateTransactions = false # Default
# TraceLogging enables trace level logging.
TraceLogging = false # Default
# SampleTelemetry enables telemetry sampling.
SampleTelemetry = false # Default
# KeyValueStoreRootDir is the root directory for the key-value store used by OCR3.1.
# This directory must be writable by the Chainlink node process and should support long-term persistence.
KeyValueStoreRootDir = '~/.chainlink-data' # Default
Expand Down
1 change: 1 addition & 0 deletions core/config/ocr2_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type OCR2 interface {
KeyBundleID() (string, error)
// OCR2 config, cannot override in jobs
TraceLogging() bool
SampleTelemetry() bool
CaptureEATelemetry() bool
DefaultTransactionQueueDepth() uint32
SimulateTransactions() bool
Expand Down
4 changes: 4 additions & 0 deletions core/config/toml/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,7 @@ type OCR2 struct {
DefaultTransactionQueueDepth *uint32
SimulateTransactions *bool
TraceLogging *bool
SampleTelemetry *bool
KeyValueStoreRootDir *string
}

Expand Down Expand Up @@ -1395,6 +1396,9 @@ func (o *OCR2) setFrom(f *OCR2) {
if v := f.TraceLogging; v != nil {
o.TraceLogging = v
}
if v := f.SampleTelemetry; v != nil {
o.SampleTelemetry = v
}
if v := f.KeyValueStoreRootDir; v != nil {
o.KeyValueStoreRootDir = v
}
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-ccip v0.1.1-solana.0.20251128020529-88d93b01d749
github.com/smartcontractkit/chainlink-common v0.9.6-0.20251125103916-0b41e73b80c4
github.com/smartcontractkit/chainlink-data-streams v0.1.6
github.com/smartcontractkit/chainlink-data-streams v0.1.7-0.20251123170926-313d8827bd6f
github.com/smartcontractkit/chainlink-deployments-framework v0.70.0
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251201175512-af04e919ebfb
github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251022075638-49d961001d1b
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1640,8 +1640,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY=
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw=
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0=
github.com/smartcontractkit/chainlink-data-streams v0.1.6 h1:B3cwmJrVYoJVAjPOyQWTNaGD+V30HI1vFHhC2dQpWDo=
github.com/smartcontractkit/chainlink-data-streams v0.1.6/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o=
github.com/smartcontractkit/chainlink-data-streams v0.1.7-0.20251123170926-313d8827bd6f h1:SB1/gArhpTaItvde2ub6J5njc84ucw7/GLhAYCUpb3s=
github.com/smartcontractkit/chainlink-data-streams v0.1.7-0.20251123170926-313d8827bd6f/go.mod h1:GPsn6PKJvPe1UfRYyVxsDzOWq6NILzBstiiLq/w+kG0=
github.com/smartcontractkit/chainlink-deployments-framework v0.70.0 h1:wo2KL2viGZK/LhHLM8F88sRyhZF9wwWh+YDzW8hS00g=
github.com/smartcontractkit/chainlink-deployments-framework v0.70.0/go.mod h1:Cp7PuO7HUDugp7bWGP/TcDAvvvkFLdKOVrSm0zXlnhg=
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251201175512-af04e919ebfb h1:/oWqkc0mU63UNydxUiRZTv+ZYRzGF+9WHrpbuxDTBtY=
Expand Down
4 changes: 4 additions & 0 deletions core/services/chainlink/config_ocr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (o *ocr2Config) TraceLogging() bool {
return *o.c.TraceLogging
}

func (o *ocr2Config) SampleTelemetry() bool {
return *o.c.SampleTelemetry
}

func (o *ocr2Config) CaptureEATelemetry() bool {
return *o.c.CaptureEATelemetry
}
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/config_ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestOCR2Config(t *testing.T) {
require.Equal(t, expectedContractSubscribeInterval, ocr2Cfg.ContractSubscribeInterval())
require.False(t, ocr2Cfg.SimulateTransactions())
require.False(t, ocr2Cfg.TraceLogging())
require.False(t, ocr2Cfg.SampleTelemetry())
require.Equal(t, uint32(1), ocr2Cfg.DefaultTransactionQueueDepth())
require.False(t, ocr2Cfg.CaptureEATelemetry())
require.True(t, ocr2Cfg.CaptureAutomationCustomTelemetry())
Expand Down
2 changes: 2 additions & 0 deletions core/services/chainlink/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func TestConfig_Marshal(t *testing.T) {
DefaultTransactionQueueDepth: ptr[uint32](1),
SimulateTransactions: ptr(false),
TraceLogging: ptr(false),
SampleTelemetry: ptr(false),
KeyValueStoreRootDir: ptr("~/.chainlink-data"),
}
full.OCR = toml.OCR{
Expand Down Expand Up @@ -1129,6 +1130,7 @@ AllowNoBootstrappers = true
DefaultTransactionQueueDepth = 1
SimulateTransactions = false
TraceLogging = false
SampleTelemetry = false
KeyValueStoreRootDir = '~/.chainlink-data'
`},
{"JobDistributor", Config{Core: toml.Core{JobDistributor: full.JobDistributor}}, `[JobDistributor]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ AllowNoBootstrappers = false
DefaultTransactionQueueDepth = 1
SimulateTransactions = false
TraceLogging = false
SampleTelemetry = false
KeyValueStoreRootDir = '~/.chainlink-data'

[OCR]
Expand Down
1 change: 1 addition & 0 deletions core/services/chainlink/testdata/config-full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ AllowNoBootstrappers = true
DefaultTransactionQueueDepth = 1
SimulateTransactions = false
TraceLogging = false
SampleTelemetry = false
KeyValueStoreRootDir = '~/.chainlink-data'

[OCR]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ AllowNoBootstrappers = false
DefaultTransactionQueueDepth = 1
SimulateTransactions = false
TraceLogging = false
SampleTelemetry = false
KeyValueStoreRootDir = '~/.chainlink-data'

[OCR]
Expand Down
2 changes: 2 additions & 0 deletions core/services/feeds/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"

coreconfig "github.com/smartcontractkit/chainlink/v2/core/config"
)

Expand Down Expand Up @@ -42,4 +43,5 @@ type OCR2Config interface {
DefaultTransactionQueueDepth() uint32
SimulateTransactions() bool
TraceLogging() bool
SampleTelemetry() bool
}
2 changes: 2 additions & 0 deletions core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type DelegateConfig struct {

// OCR3
TraceLogging bool
SampleTelemetry bool
BinaryNetworkEndpointFactory ocr2types.BinaryNetworkEndpointFactory
V2Bootstrappers []ocrcommontypes.BootstrapperLocator
// One Oracle will be started for each ContractConfigTracker
Expand Down Expand Up @@ -118,6 +119,7 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) {
CaptureObservationTelemetry: cfg.CaptureObservationTelemetry,
CaptureOutcomeTelemetry: cfg.CaptureOutcomeTelemetry,
CaptureReportTelemetry: cfg.CaptureReportTelemetry,
SampleTelemetry: cfg.SampleTelemetry,
})

ds := observation.NewDataSource(
Expand Down
179 changes: 179 additions & 0 deletions core/services/llo/telem/sampling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package telem

import (
"context"
"encoding/hex"
"errors"
"strconv"
"strings"
"sync"
"time"

"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-data-streams/llo"

"github.com/smartcontractkit/chainlink/v2/core/services/synchronization"
)

const (
defaultPrunePeriod = 10 * time.Second

samplerDelimiter = "-"
)

var errUnsupportedTelemetryType = errors.New("unsupported telemetry type")

// sampler keeps track of what kind of telemetry has already been sent to the collection point and decides whether the
// next telemetry package will be sent or dropped.
type sampler struct {
// samples keeps track of the telemetry samples we've already sent (or approved for sending).
// The format is `map[fingerprint][observation timestamp in seconds]any`. We intentionally use int32 because it's
// enough for seconds but not enough for nanos, so we can't mix them up.
samples map[string]map[int32]any
samplesMu sync.Mutex

enabled bool
prunePeriod time.Duration // exists, so we can test pruning
lggr logger.Logger
}

func newSampler(lgger logger.SugaredLogger, samplingEnabled bool) *sampler {
return &sampler{
samples: make(map[string]map[int32]any),
enabled: samplingEnabled,
prunePeriod: defaultPrunePeriod,
lggr: lgger,
}
}

// Sample is the method which decides whether we're going to send the data downstream or not.
func (s *sampler) Sample(typ synchronization.TelemetryType, msg proto.Message) bool {
// If sampling is not enabled we want to send each and every telemetry message, so always return true.
if !s.enabled {
return true
}

fp, ots, err := fingerprint(typ, msg)
if err != nil {
if !errors.Is(err, errUnsupportedTelemetryType) {
s.lggr.Warnw("Couldn't determine fingerprint", "type", typ, "err", err)
}
return true
}

s.samplesMu.Lock()
defer s.samplesMu.Unlock()
// Do we have any records for this fingerprint?
if _, ok := s.samples[fp]; !ok {
s.samples[fp] = make(map[int32]any)
}
// Do we already have a record for this fingerprint and this second?
if _, ok := s.samples[fp][ots]; !ok {
s.samples[fp][ots] = struct{}{}
return true
}
// We already have a record, and we don't need to send another one.
return false
}

// StartPruningLoop starts a regular check routine which removes old entries from the sampling records.
//
// This method is non-blocking. It starts a goroutine and returns.
func (s *sampler) StartPruningLoop(ctx context.Context, wg *sync.WaitGroup) {
// We don't need pruning if sampling is not enabled.
if !s.enabled {
return
}

wg.Add(1)
go func() {
defer wg.Done()
t := time.NewTicker(s.prunePeriod)
defer t.Stop()

for {
select {
case <-t.C:
s.pruneStorage()
case <-ctx.Done():
return
}
}
}()
}

// pruneStorage removes all records which are older than a predefined period (s.prunePeriod).
func (s *sampler) pruneStorage() {
s.samplesMu.Lock()
defer s.samplesMu.Unlock()

cutoff := int32(time.Now().Add(-s.prunePeriod).Unix()) //nolint:gosec // G115
for _, ots := range s.samples {
for ts := range ots {
if ts < cutoff {
delete(ots, ts)
}
}
}
}

// fingerprint combines unique characteristics of each supported telemetry report type and constructs a string
// fingerprint of it. It returns the fingerprint, together with an observation timestamp in seconds.
// TODO improve encoding efficiency by switching from string to hex([]byte) or string([]byte).
func fingerprint(typ synchronization.TelemetryType, msg proto.Message) (string, int32, error) {
switch typ {
case synchronization.LLOObservation:
m, ok := msg.(*LLOObservationTelemetry)
if !ok || m == nil {
return "", 0, errors.New("invalid telemetry type, expected LLOObservation")
}
traits := []string{
strconv.FormatUint(uint64(m.DonId), 10),
strconv.FormatUint(uint64(m.GetStreamId()), 10),
hex.EncodeToString(m.ConfigDigest),
}
return strings.Join(traits, samplerDelimiter), nanosToSec(m.ObservationTimestamp), nil
case synchronization.LLOOutcome:
m, ok := msg.(*llo.LLOOutcomeTelemetry)
if !ok || m == nil {
return "", 0, errors.New("invalid telemetry type, expected LLOOutcomeTelemetry")
}
traits := []string{
strconv.FormatUint(uint64(m.DonId), 10),
hex.EncodeToString(m.ConfigDigest),
}
return strings.Join(traits, samplerDelimiter), nanosToSec(int64(m.ObservationTimestampNanoseconds)), nil //nolint:gosec // G115
case synchronization.LLOReport:
m, ok := msg.(*llo.LLOReportTelemetry)
if !ok || m == nil {
return "", 0, errors.New("invalid telemetry type, expected LLOReportTelemetry")
}
traits := []string{
strconv.FormatUint(uint64(m.DonId), 10),
strconv.FormatUint(uint64(m.ChannelId), 10),
hex.EncodeToString(m.ConfigDigest),
}
return strings.Join(traits, samplerDelimiter), nanosToSec(int64(m.ObservationTimestampNanoseconds)), nil //nolint:gosec // G115
case synchronization.PipelineBridge:
m, ok := msg.(*LLOBridgeTelemetry)
if !ok || m == nil {
return "", 0, errors.New("invalid telemetry type, expected LLOBridgeTelemetry")
}
traits := []string{
strconv.FormatUint(uint64(m.DonId), 10),
strconv.FormatUint(uint64(m.GetStreamId()), 10),
strconv.FormatUint(uint64(m.SpecId), 10), //nolint:gosec // G115
m.BridgeAdapterName,
hex.EncodeToString(m.ConfigDigest),
}
return strings.Join(traits, samplerDelimiter), nanosToSec(m.ObservationTimestamp), nil
default:
return "", 0, errUnsupportedTelemetryType
}
}

func nanosToSec(n int64) int32 {
return int32(n / int64(time.Second)) //nolint:gosec // G115
}
Loading
Loading