Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quick-kiwis-tease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal LLO Observation loop
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-ccip v0.1.1-solana.0.20251009203201-900123a5c46a
github.com/smartcontractkit/chainlink-common v0.9.6-0.20251016213956-9a6afcd1532a
github.com/smartcontractkit/chainlink-data-streams v0.1.5
github.com/smartcontractkit/chainlink-data-streams v0.1.6
github.com/smartcontractkit/chainlink-deployments-framework v0.56.0
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251017190323-e749d4a05491
github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251015115541-729ba0b2b1c1
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1605,8 +1605,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 h1:INTd6uKc/
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc=
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw=
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0=
github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc=
github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o=
github.com/smartcontractkit/chainlink-data-streams v0.1.6 h1:B3cwmJrVYoJVAjPOyQWTNaGD+V30HI1vFHhC2dQpWDo=
github.com/smartcontractkit/chainlink-data-streams v0.1.6/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o=
github.com/smartcontractkit/chainlink-deployments-framework v0.56.0 h1:VkzslEC/a7Ze5qqLdX1ZNL7ug0TwVd5w3hL5jA/DvWE=
github.com/smartcontractkit/chainlink-deployments-framework v0.56.0/go.mod h1:ObH5HJ4yXzTmQLc6Af+ufrTVcQ+ocasHJ0YBZjw5ZCM=
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251017190323-e749d4a05491 h1:X+c+LdCI9Dc4EfpitByVnGODtwoV+rAdFKCG0y4gAag=
Expand Down
9 changes: 3 additions & 6 deletions core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
ocrcommontypes "github.com/smartcontractkit/libocr/commontypes"
Expand Down Expand Up @@ -121,20 +120,15 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) {
CaptureReportTelemetry: cfg.CaptureReportTelemetry,
})

cache := observation.NewCache(500*time.Millisecond, time.Minute)
ds := observation.NewDataSource(
logger.Named(lggr, "DataSource"),
cfg.Registry,
t,
cache,
)

notifier, ok := cfg.ContractTransmitter.(TransmitNotifier)
if ok {
notifier.OnTransmit(t.TrackSeqNr)
notifier.OnTransmit(func(digest ocr2types.ConfigDigest, seqNr uint64) {
cache.SetLastTransmissionSeqNr(seqNr)
})
}

return &delegate{services.StateMachine{}, cfg, reportCodecs, cfg.ShouldRetireCache, ds, t, []Closer{}}, nil
Expand Down Expand Up @@ -223,6 +217,9 @@ func (d *delegate) Close() error {
for _, oracle := range d.oracles {
merr = errors.Join(merr, oracle.Close())
}
if closer, ok := d.ds.(Closer); ok {
merr = errors.Join(merr, closer.Close())
}
merr = errors.Join(merr, d.telem.Close())
return merr
})
Expand Down
55 changes: 17 additions & 38 deletions core/services/llo/observation/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -41,30 +40,25 @@ var (
// The cache is cleaned up periodically to remove decommissioned stream values
// if the provided cleanupInterval is greater than 0.
type Cache struct {
mu sync.RWMutex

mu sync.RWMutex
values map[llotypes.StreamID]item
maxAge time.Duration
cleanupInterval time.Duration

lastTransmissionSeqNr atomic.Uint64
closeChan chan struct{}
closeChan chan struct{}
}

type item struct {
value llo.StreamValue
seqNr uint64
expiresAt time.Time
}

// NewCache creates a new cache.
//
// maxAge is the maximum age of a stream value to keep in the cache.
// cleanupInterval is the interval to clean up the cache.
func NewCache(maxAge time.Duration, cleanupInterval time.Duration) *Cache {
func NewCache(cleanupInterval time.Duration) *Cache {
c := &Cache{
values: make(map[llotypes.StreamID]item),
maxAge: maxAge,
cleanupInterval: cleanupInterval,
closeChan: make(chan struct{}),
}
Expand All @@ -91,63 +85,48 @@ func NewCache(maxAge time.Duration, cleanupInterval time.Duration) *Cache {
return c
}

// SetLastTransmissionSeqNr sets the last transmission sequence number.
func (c *Cache) SetLastTransmissionSeqNr(seqNr uint64) {
if c == nil {
return
}

c.lastTransmissionSeqNr.Store(seqNr)
}

// Add adds a stream value to the cache.
func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue, seqNr uint64) {
if c == nil {
return
func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue, ttl time.Duration) {
var expiresAt time.Time
if ttl > 0 {
expiresAt = time.Now().Add(ttl)
}

c.mu.Lock()
defer c.mu.Unlock()
c.values[id] = item{value: value, seqNr: seqNr, expiresAt: time.Now().Add(c.maxAge)}
c.values[id] = item{value: value, expiresAt: expiresAt}
}

func (c *Cache) Get(id llotypes.StreamID) llo.StreamValue {
if c == nil {
return nil
}

func (c *Cache) Get(id llotypes.StreamID) (llo.StreamValue, time.Time) {
c.mu.RLock()
defer c.mu.RUnlock()

label := strconv.FormatUint(uint64(id), 10)
item, ok := c.values[id]
if !ok {
promCacheMissCount.WithLabelValues(label, "notFound").Inc()
return nil
}

if item.seqNr <= c.lastTransmissionSeqNr.Load() {
promCacheMissCount.WithLabelValues(label, "seqNr").Inc()
return nil
return nil, time.Time{}
}

if time.Now().After(item.expiresAt) {
promCacheMissCount.WithLabelValues(label, "maxAge").Inc()
return nil
return nil, time.Time{}
}

promCacheHitCount.WithLabelValues(label).Inc()
return item.value
return item.value, item.expiresAt
}

func (c *Cache) cleanup() {
c.mu.Lock()
defer c.mu.Unlock()

lastTransmissionSeqNr := c.lastTransmissionSeqNr.Load()
now := time.Now()
for id, item := range c.values {
if item.seqNr <= lastTransmissionSeqNr || now.After(item.expiresAt) {
if item.expiresAt.IsZero() {
continue
}

if time.Now().After(item.expiresAt) {
delete(c.values, id)
}
}
Expand Down
71 changes: 25 additions & 46 deletions core/services/llo/observation/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,25 @@ func (m *mockStreamValue) Type() llo.LLOStreamValue_Type {
func TestNewCache(t *testing.T) {
tests := []struct {
name string
maxAge time.Duration
cleanupInterval time.Duration
wantErr bool
}{
{
name: "valid cache with cleanup",
maxAge: time.Second,
cleanupInterval: time.Millisecond * 100,
wantErr: false,
},
{
name: "valid cache without cleanup",
maxAge: time.Second,
cleanupInterval: 0,
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewCache(tt.maxAge, tt.cleanupInterval)
cache := NewCache(tt.cleanupInterval)
require.NotNil(t, cache)
assert.Equal(t, tt.maxAge, cache.maxAge)
assert.Equal(t, tt.cleanupInterval, cache.cleanupInterval)
assert.NotNil(t, cache.values)
assert.NotNil(t, cache.closeChan)
Expand All @@ -85,50 +81,29 @@ func TestCache_Add_Get(t *testing.T) {
name string
streamID llotypes.StreamID
value llo.StreamValue
seqNr uint64
maxAge time.Duration
ttl time.Duration
wantValue llo.StreamValue
wantFound bool
beforeGet func(cache *Cache)
}{
{
name: "get existing value",
streamID: 1,
value: &mockStreamValue{value: []byte{42}},
seqNr: 10,
maxAge: time.Second,
ttl: time.Second,
wantValue: &mockStreamValue{value: []byte{42}},
wantFound: true,
},
{
name: "get non-existent value",
streamID: 1,
value: &mockStreamValue{value: []byte{42}},
seqNr: 10,
maxAge: time.Second,
wantValue: nil,
wantFound: false,
},
{
name: "get expired by sequence number",
streamID: 1,
value: &mockStreamValue{value: []byte{42}},
seqNr: 5,
maxAge: time.Second,
ttl: time.Second,
wantValue: nil,
wantFound: false,
beforeGet: func(cache *Cache) {
cache.SetLastTransmissionSeqNr(10)
},
},
{
name: "get expired by age",
streamID: 1,
value: &mockStreamValue{value: []byte{42}},
seqNr: 10,
maxAge: time.Nanosecond * 100,
ttl: time.Nanosecond * 100,
wantValue: nil,
wantFound: false,
beforeGet: func(_ *Cache) {
time.Sleep(time.Millisecond)
},
Expand All @@ -137,33 +112,36 @@ func TestCache_Add_Get(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewCache(tt.maxAge, 0)
cache := NewCache(0)

if tt.wantFound {
cache.Add(tt.streamID, tt.value, tt.seqNr)
if tt.value != nil {
cache.Add(tt.streamID, tt.value, tt.ttl)
}

if tt.beforeGet != nil {
tt.beforeGet(cache)
}

assert.Equal(t, tt.wantValue, cache.Get(tt.streamID))
val, _ := cache.Get(tt.streamID)
assert.Equal(t, tt.wantValue, val)
})
}
}

func TestCache_Cleanup(t *testing.T) {
cache := NewCache(time.Nanosecond*100, time.Millisecond)
cache := NewCache(time.Millisecond)
streamID := llotypes.StreamID(1)
value := &mockStreamValue{value: []byte{42}}

cache.Add(streamID, value, 10)
cache.Add(streamID, value, time.Nanosecond*100)
time.Sleep(time.Millisecond * 2)
assert.Nil(t, cache.Get(streamID))

gotValue, _ := cache.Get(streamID)
assert.Nil(t, gotValue)
}

func TestCache_ConcurrentAccess(t *testing.T) {
cache := NewCache(time.Second, 0)
cache := NewCache(0)
const numGoroutines = 10
const numOperations = uint32(1000)

Expand All @@ -176,7 +154,7 @@ func TestCache_ConcurrentAccess(t *testing.T) {
defer wg.Done()
for j := range numOperations {
streamID := id*numOperations + j
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, 1)
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, time.Second)
}
}(i)
}
Expand All @@ -186,13 +164,14 @@ func TestCache_ConcurrentAccess(t *testing.T) {
for i := range uint32(numGoroutines) {
for j := range numOperations {
streamID := i*numOperations + j
assert.Equal(t, &mockStreamValue{value: []byte{byte(i)}}, cache.Get(streamID))
val, _ := cache.Get(streamID)
assert.Equal(t, &mockStreamValue{value: []byte{byte(i)}}, val)
}
}
}

func TestCache_ConcurrentReadWrite(t *testing.T) {
cache := NewCache(time.Second, 0)
cache := NewCache(0)
const numGoroutines = 10
const numOperations = uint32(1000)

Expand All @@ -205,7 +184,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) {
defer wg.Done()
for j := range numOperations {
streamID := id*numOperations + j
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, uint64(j))
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, time.Second)
}
}(i)
}
Expand All @@ -216,7 +195,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) {
defer wg.Done()
for j := range numOperations {
streamID := id*numOperations + j
cache.Get(streamID)
_, _ = cache.Get(streamID)
}
}(i)
}
Expand All @@ -225,7 +204,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) {
}

func TestCache_ConcurrentAddGet(t *testing.T) {
cache := NewCache(time.Second, 0)
cache := NewCache(0)
const numGoroutines = 10
const numOperations = uint32(1000)

Expand All @@ -238,7 +217,7 @@ func TestCache_ConcurrentAddGet(t *testing.T) {
defer wg.Done()
for j := range numOperations {
streamID := id*numOperations + j
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, 1)
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, time.Second)
}
}(i)
}
Expand All @@ -249,7 +228,7 @@ func TestCache_ConcurrentAddGet(t *testing.T) {
defer wg.Done()
for j := range numOperations {
streamID := id*numOperations + j
cache.Get(streamID)
_, _ = cache.Get(streamID)
}
}(i)
}
Expand Down
Loading
Loading