Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fresh-buttons-refuse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#updated llo plugin data source and telemetry performance improvements
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-ccip v0.1.1-solana.0.20250930202440-88c08e65d960
github.com/smartcontractkit/chainlink-common v0.9.6-0.20251001150007-98903c79c124
github.com/smartcontractkit/chainlink-data-streams v0.1.2
github.com/smartcontractkit/chainlink-data-streams v0.1.5
github.com/smartcontractkit/chainlink-deployments-framework v0.54.0
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251002155240-31abd326e293
github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250827130336-5922343458be
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1603,8 +1603,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 h1:hvqATtrZ0
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc=
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw=
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0=
github.com/smartcontractkit/chainlink-data-streams v0.1.2 h1:g/UmFJa/E1Zmc7NO20ob5SijxQen51DhnqTLr2f7BEc=
github.com/smartcontractkit/chainlink-data-streams v0.1.2/go.mod h1:lxY97sDlDorQAmLGFo6x1tl8SQ2E7adsS0/wU8+mmTc=
github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc=
github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o=
github.com/smartcontractkit/chainlink-deployments-framework v0.54.0 h1:KMkw64j2VUMWTGkWTSFcNrWXcY8189kTjc33n2FaA2w=
github.com/smartcontractkit/chainlink-deployments-framework v0.54.0/go.mod h1:KmwLKwDuiYo8SfzoGb9TVehbodBU+yj7HuCfzgU6jx0=
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251002155240-31abd326e293 h1:MVTDt2N5pYGIxxBMG0CCMu+bSsea+/ZOW9iyPVSkRCQ=
Expand Down
18 changes: 17 additions & 1 deletion core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
ocrcommontypes "github.com/smartcontractkit/libocr/commontypes"
Expand Down Expand Up @@ -119,7 +120,22 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) {
CaptureOutcomeTelemetry: cfg.CaptureOutcomeTelemetry,
CaptureReportTelemetry: cfg.CaptureReportTelemetry,
})
ds := observation.NewDataSource(logger.Named(lggr, "DataSource"), cfg.Registry, t)

cache := observation.NewCache(500*time.Millisecond, time.Minute)
ds := observation.NewDataSource(
logger.Named(lggr, "DataSource"),
cfg.Registry,
t,
cache,
)

notifier, ok := cfg.ContractTransmitter.(TransmitNotifier)
if ok {
notifier.OnTransmit(t.TrackSeqNr)
notifier.OnTransmit(func(digest ocr2types.ConfigDigest, seqNr uint64) {
cache.SetLastTransmissionSeqNr(seqNr)
})
}

return &delegate{services.StateMachine{}, cfg, reportCodecs, cfg.ShouldRetireCache, ds, t, []Closer{}}, nil
}
Expand Down
2 changes: 0 additions & 2 deletions core/services/llo/mercurytransmitter/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/grpc"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/observation"
)

const (
Expand Down Expand Up @@ -269,7 +268,6 @@ func (mt *transmitter) Transmit(
err = fmt.Errorf("failed to add transmission to commit channel: %w", ctx.Err())
}
}
observation.GetCache(digest).SetLastTransmissionSeqNr(seqNr)
})

if !ok {
Expand Down
64 changes: 29 additions & 35 deletions core/services/llo/observation/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,47 +9,30 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-data-streams/llo"
)

var (
registryMu = sync.Mutex{}
registry = map[ocr2types.ConfigDigest]*Cache{}

promCacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "llo",
Subsystem: "datasource",
Name: "cache_hit_count",
Help: "Number of local observation cache hits",
},
[]string{"configDigest", "streamID"},
[]string{"streamID"},
)
promCacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "llo",
Subsystem: "datasource",
Name: "cache_miss_count",
Help: "Number of local observation cache misses",
},
[]string{"configDigest", "streamID", "reason"},
[]string{"streamID", "reason"},
)
)

func GetCache(configDigest ocr2types.ConfigDigest) *Cache {
registryMu.Lock()
defer registryMu.Unlock()

cache, ok := registry[configDigest]
if !ok {
cache = NewCache(configDigest, 500*time.Millisecond, time.Minute)
registry[configDigest] = cache
}

return cache
}

// Cache of stream values.
// It maintains a cache of stream values fetched from adapters until the last
// transmission sequence number is greater or equal the sequence number at which
Expand All @@ -60,7 +43,6 @@ func GetCache(configDigest ocr2types.ConfigDigest) *Cache {
type Cache struct {
mu sync.RWMutex

configDigestStr string
values map[llotypes.StreamID]item
maxAge time.Duration
cleanupInterval time.Duration
Expand All @@ -72,16 +54,15 @@ type Cache struct {
type item struct {
value llo.StreamValue
seqNr uint64
createdAt time.Time
expiresAt time.Time
}

// NewCache creates a new cache.
//
// maxAge is the maximum age of a stream value to keep in the cache.
// cleanupInterval is the interval to clean up the cache.
func NewCache(configDigest ocr2types.ConfigDigest, maxAge time.Duration, cleanupInterval time.Duration) *Cache {
func NewCache(maxAge time.Duration, cleanupInterval time.Duration) *Cache {
c := &Cache{
configDigestStr: configDigest.Hex(),
values: make(map[llotypes.StreamID]item),
maxAge: maxAge,
cleanupInterval: cleanupInterval,
Expand Down Expand Up @@ -112,48 +93,61 @@ func NewCache(configDigest ocr2types.ConfigDigest, maxAge time.Duration, cleanup

// SetLastTransmissionSeqNr sets the last transmission sequence number.
func (c *Cache) SetLastTransmissionSeqNr(seqNr uint64) {
if c == nil {
return
}

c.lastTransmissionSeqNr.Store(seqNr)
}

// Add adds a stream value to the cache.
func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue, seqNr uint64) {
if c == nil {
return
}

c.mu.Lock()
defer c.mu.Unlock()
c.values[id] = item{value: value, seqNr: seqNr, createdAt: time.Now()}
c.values[id] = item{value: value, seqNr: seqNr, expiresAt: time.Now().Add(c.maxAge)}
}

func (c *Cache) Get(id llotypes.StreamID) (llo.StreamValue, bool) {
func (c *Cache) Get(id llotypes.StreamID) llo.StreamValue {
if c == nil {
return nil
}

c.mu.RLock()
defer c.mu.RUnlock()

label := strconv.FormatUint(uint64(id), 10)
item, ok := c.values[id]
if !ok {
promCacheMissCount.WithLabelValues(c.configDigestStr, label, "notFound").Inc()
return nil, false
promCacheMissCount.WithLabelValues(label, "notFound").Inc()
return nil
}

if item.seqNr <= c.lastTransmissionSeqNr.Load() {
promCacheMissCount.WithLabelValues(c.configDigestStr, label, "seqNr").Inc()
return nil, false
promCacheMissCount.WithLabelValues(label, "seqNr").Inc()
return nil
}

if time.Since(item.createdAt) >= c.maxAge {
promCacheMissCount.WithLabelValues(c.configDigestStr, label, "maxAge").Inc()
return nil, false
if time.Now().After(item.expiresAt) {
promCacheMissCount.WithLabelValues(label, "maxAge").Inc()
return nil
}

promCacheHitCount.WithLabelValues(c.configDigestStr, label).Inc()
return item.value, true
promCacheHitCount.WithLabelValues(label).Inc()
return item.value
}

func (c *Cache) cleanup() {
c.mu.Lock()
defer c.mu.Unlock()

lastTransmissionSeqNr := c.lastTransmissionSeqNr.Load()
now := time.Now()
for id, item := range c.values {
if item.seqNr <= lastTransmissionSeqNr || time.Since(item.createdAt) >= c.maxAge {
if item.seqNr <= lastTransmissionSeqNr || now.After(item.expiresAt) {
delete(c.values, id)
}
}
Expand Down
29 changes: 9 additions & 20 deletions core/services/llo/observation/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-data-streams/llo"
)
Expand Down Expand Up @@ -72,7 +70,7 @@ func TestNewCache(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, tt.maxAge, tt.cleanupInterval)
cache := NewCache(tt.maxAge, tt.cleanupInterval)
require.NotNil(t, cache)
assert.Equal(t, tt.maxAge, cache.maxAge)
assert.Equal(t, tt.cleanupInterval, cache.cleanupInterval)
Expand Down Expand Up @@ -139,7 +137,7 @@ func TestCache_Add_Get(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, tt.maxAge, 0)
cache := NewCache(tt.maxAge, 0)

if tt.wantFound {
cache.Add(tt.streamID, tt.value, tt.seqNr)
Expand All @@ -149,30 +147,23 @@ func TestCache_Add_Get(t *testing.T) {
tt.beforeGet(cache)
}

gotValue, gotFound := cache.Get(tt.streamID)
assert.Equal(t, tt.wantFound, gotFound)
if tt.wantFound {
assert.Equal(t, tt.wantValue, gotValue)
}
assert.Equal(t, tt.wantValue, cache.Get(tt.streamID))
})
}
}

func TestCache_Cleanup(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, time.Nanosecond*100, time.Millisecond)
cache := NewCache(time.Nanosecond*100, time.Millisecond)
streamID := llotypes.StreamID(1)
value := &mockStreamValue{value: []byte{42}}

cache.Add(streamID, value, 10)
time.Sleep(time.Millisecond * 2)

gotValue, gotFound := cache.Get(streamID)
assert.False(t, gotFound)
assert.Nil(t, gotValue)
assert.Nil(t, cache.Get(streamID))
}

func TestCache_ConcurrentAccess(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0)
cache := NewCache(time.Second, 0)
const numGoroutines = 10
const numOperations = uint32(1000)

Expand All @@ -195,15 +186,13 @@ func TestCache_ConcurrentAccess(t *testing.T) {
for i := uint32(0); i < numGoroutines; i++ {
for j := uint32(0); j < numOperations; j++ {
streamID := i*numOperations + j
value, found := cache.Get(streamID)
assert.True(t, found)
assert.Equal(t, &mockStreamValue{value: []byte{byte(i)}}, value)
assert.Equal(t, &mockStreamValue{value: []byte{byte(i)}}, cache.Get(streamID))
}
}
}

func TestCache_ConcurrentReadWrite(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0)
cache := NewCache(time.Second, 0)
const numGoroutines = 10
const numOperations = uint32(1000)

Expand Down Expand Up @@ -236,7 +225,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) {
}

func TestCache_ConcurrentAddGet(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0)
cache := NewCache(time.Second, 0)
const numGoroutines = 10
const numOperations = uint32(1000)

Expand Down
Loading
Loading