Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
be81532
Remove seqNr eviction policy from the observation cache.
ro-tex Aug 6, 2025
701a084
Perform observations in a loop.
ro-tex Aug 6, 2025
22ee766
Adjust tests to endlessly looping observations.
ro-tex Aug 6, 2025
7a53341
Minor adjustments.
ro-tex Aug 6, 2025
3e729da
Lint
ro-tex Aug 7, 2025
b5eef0e
Fix data race
ro-tex Aug 7, 2025
c659549
Rollback changes and try a simpler approach - Observe() only uses the…
ro-tex Aug 7, 2025
0d361fd
Remove the concurrency from Observe().
ro-tex Aug 7, 2025
8d4253d
Fix the tests.
ro-tex Aug 8, 2025
c4753f9
Lint
ro-tex Aug 9, 2025
2260d6e
We don't need that defere, that was the whole point of moving it.
ro-tex Aug 9, 2025
f78ba8e
wip
brunotm Aug 11, 2025
5a92ae9
Clean up logging. Mock the OutputCodec.
ro-tex Aug 11, 2025
3080587
Address lint issues.
ro-tex Aug 12, 2025
7ee6aa6
Fix context lifetime, add closer implementation.
ro-tex Aug 12, 2025
3e4657a
Pointer receiver.
ro-tex Aug 12, 2025
7dddab0
concurrency fixes
brunotm Aug 12, 2025
5e2acad
add comment for values copy
brunotm Aug 12, 2025
e9cca97
bump
ro-tex Aug 12, 2025
7c9980a
ensure maxObservationDuration is respected, add metrics
brunotm Aug 14, 2025
d9af569
lint fix
brunotm Aug 14, 2025
d269a19
w
brunotm Aug 15, 2025
df9adbe
setObservableStreams
brunotm Aug 15, 2025
4e3590e
Merge branch 'develop' into ivo/observation_loop
ro-tex Aug 18, 2025
096208e
Test race
ro-tex Aug 18, 2025
a99356e
Remove the temporary version bump.
ro-tex Aug 18, 2025
0a4ebed
Clean up the close.
ro-tex Aug 19, 2025
ce5d9c2
Use a context based on the stop channel.
ro-tex Aug 21, 2025
6cbe7d9
lint
ro-tex Aug 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/afraid-doors-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#changed LLO's observations now run in a loop, so their cache is always warm.
3 changes: 3 additions & 0 deletions core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ func (d *delegate) Close() error {
for _, oracle := range d.oracles {
merr = errors.Join(merr, oracle.Close())
}
if closer, ok := d.ds.(Closer); ok {
merr = errors.Join(merr, closer.Close())
}
merr = errors.Join(merr, d.telem.Close())
return merr
})
Expand Down
2 changes: 0 additions & 2 deletions core/services/llo/mercurytransmitter/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/grpc"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/observation"
)

const (
Expand Down Expand Up @@ -269,7 +268,6 @@ func (mt *transmitter) Transmit(
err = fmt.Errorf("failed to add transmission to commit channel: %w", ctx.Err())
}
}
observation.GetCache(digest).SetLastTransmissionSeqNr(seqNr)
})

if !ok {
Expand Down
56 changes: 11 additions & 45 deletions core/services/llo/observation/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,34 @@ import (
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-data-streams/llo"
)

var (
registryMu = sync.Mutex{}
registry = map[ocr2types.ConfigDigest]*Cache{}

promCacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "llo",
Subsystem: "datasource",
Name: "cache_hit_count",
Help: "Number of local observation cache hits",
},
[]string{"configDigest", "streamID"},
[]string{"streamID"},
)
promCacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "llo",
Subsystem: "datasource",
Name: "cache_miss_count",
Help: "Number of local observation cache misses",
},
[]string{"configDigest", "streamID", "reason"},
[]string{"streamID", "reason"},
)
)

func GetCache(configDigest ocr2types.ConfigDigest) *Cache {
registryMu.Lock()
defer registryMu.Unlock()

cache, ok := registry[configDigest]
if !ok {
cache = NewCache(configDigest, 500*time.Millisecond, time.Minute)
registry[configDigest] = cache
}

return cache
}

// Cache of stream values.
// It maintains a cache of stream values fetched from adapters until the last
// transmission sequence number is greater or equal the sequence number at which
Expand All @@ -58,30 +40,25 @@ func GetCache(configDigest ocr2types.ConfigDigest) *Cache {
// The cache is cleaned up periodically to remove decommissioned stream values
// if the provided cleanupInterval is greater than 0.
type Cache struct {
mu sync.RWMutex

configDigestStr string
mu sync.RWMutex
values map[llotypes.StreamID]item
maxAge time.Duration
cleanupInterval time.Duration

lastTransmissionSeqNr atomic.Uint64
closeChan chan struct{}
closeChan chan struct{}
}

type item struct {
value llo.StreamValue
seqNr uint64
createdAt time.Time
}

// NewCache creates a new cache.
//
// maxAge is the maximum age of a stream value to keep in the cache.
// cleanupInterval is the interval to clean up the cache.
func NewCache(configDigest ocr2types.ConfigDigest, maxAge time.Duration, cleanupInterval time.Duration) *Cache {
func NewCache(maxAge time.Duration, cleanupInterval time.Duration) *Cache {
c := &Cache{
configDigestStr: configDigest.Hex(),
values: make(map[llotypes.StreamID]item),
maxAge: maxAge,
cleanupInterval: cleanupInterval,
Expand Down Expand Up @@ -110,16 +87,11 @@ func NewCache(configDigest ocr2types.ConfigDigest, maxAge time.Duration, cleanup
return c
}

// SetLastTransmissionSeqNr sets the last transmission sequence number.
func (c *Cache) SetLastTransmissionSeqNr(seqNr uint64) {
c.lastTransmissionSeqNr.Store(seqNr)
}

// Add adds a stream value to the cache.
func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue, seqNr uint64) {
func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue) {
c.mu.Lock()
defer c.mu.Unlock()
c.values[id] = item{value: value, seqNr: seqNr, createdAt: time.Now()}
c.values[id] = item{value: value, createdAt: time.Now()}
}

func (c *Cache) Get(id llotypes.StreamID) (llo.StreamValue, bool) {
Expand All @@ -129,31 +101,25 @@ func (c *Cache) Get(id llotypes.StreamID) (llo.StreamValue, bool) {
label := strconv.FormatUint(uint64(id), 10)
item, ok := c.values[id]
if !ok {
promCacheMissCount.WithLabelValues(c.configDigestStr, label, "notFound").Inc()
return nil, false
}

if item.seqNr <= c.lastTransmissionSeqNr.Load() {
promCacheMissCount.WithLabelValues(c.configDigestStr, label, "seqNr").Inc()
promCacheMissCount.WithLabelValues(label, "notFound").Inc()
return nil, false
}

if time.Since(item.createdAt) >= c.maxAge {
promCacheMissCount.WithLabelValues(c.configDigestStr, label, "maxAge").Inc()
promCacheMissCount.WithLabelValues(label, "maxAge").Inc()
return nil, false
}

promCacheHitCount.WithLabelValues(c.configDigestStr, label).Inc()
promCacheHitCount.WithLabelValues(label).Inc()
return item.value, true
}

func (c *Cache) cleanup() {
c.mu.Lock()
defer c.mu.Unlock()

lastTransmissionSeqNr := c.lastTransmissionSeqNr.Load()
for id, item := range c.values {
if item.seqNr <= lastTransmissionSeqNr || time.Since(item.createdAt) >= c.maxAge {
if time.Since(item.createdAt) >= c.maxAge {
delete(c.values, id)
}
}
Expand Down
40 changes: 11 additions & 29 deletions core/services/llo/observation/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-data-streams/llo"
)
Expand Down Expand Up @@ -72,7 +70,7 @@ func TestNewCache(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, tt.maxAge, tt.cleanupInterval)
cache := NewCache(tt.maxAge, tt.cleanupInterval)
require.NotNil(t, cache)
assert.Equal(t, tt.maxAge, cache.maxAge)
assert.Equal(t, tt.cleanupInterval, cache.cleanupInterval)
Expand All @@ -87,7 +85,6 @@ func TestCache_Add_Get(t *testing.T) {
name string
streamID llotypes.StreamID
value llo.StreamValue
seqNr uint64
maxAge time.Duration
wantValue llo.StreamValue
wantFound bool
Expand All @@ -97,7 +94,6 @@ func TestCache_Add_Get(t *testing.T) {
name: "get existing value",
streamID: 1,
value: &mockStreamValue{value: []byte{42}},
seqNr: 10,
maxAge: time.Second,
wantValue: &mockStreamValue{value: []byte{42}},
wantFound: true,
Expand All @@ -106,28 +102,14 @@ func TestCache_Add_Get(t *testing.T) {
name: "get non-existent value",
streamID: 1,
value: &mockStreamValue{value: []byte{42}},
seqNr: 10,
maxAge: time.Second,
wantValue: nil,
wantFound: false,
},
{
name: "get expired by sequence number",
streamID: 1,
value: &mockStreamValue{value: []byte{42}},
seqNr: 5,
maxAge: time.Second,
wantValue: nil,
wantFound: false,
beforeGet: func(cache *Cache) {
cache.SetLastTransmissionSeqNr(10)
},
},
{
name: "get expired by age",
streamID: 1,
value: &mockStreamValue{value: []byte{42}},
seqNr: 10,
maxAge: time.Nanosecond * 100,
wantValue: nil,
wantFound: false,
Expand All @@ -139,10 +121,10 @@ func TestCache_Add_Get(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, tt.maxAge, 0)
cache := NewCache(tt.maxAge, 0)

if tt.wantFound {
cache.Add(tt.streamID, tt.value, tt.seqNr)
cache.Add(tt.streamID, tt.value)
}

if tt.beforeGet != nil {
Expand All @@ -159,11 +141,11 @@ func TestCache_Add_Get(t *testing.T) {
}

func TestCache_Cleanup(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, time.Nanosecond*100, time.Millisecond)
cache := NewCache(time.Nanosecond*100, time.Millisecond)
streamID := llotypes.StreamID(1)
value := &mockStreamValue{value: []byte{42}}

cache.Add(streamID, value, 10)
cache.Add(streamID, value)
time.Sleep(time.Millisecond * 2)

gotValue, gotFound := cache.Get(streamID)
Expand All @@ -172,7 +154,7 @@ func TestCache_Cleanup(t *testing.T) {
}

func TestCache_ConcurrentAccess(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0)
cache := NewCache(time.Second, 0)
const numGoroutines = 10
const numOperations = uint32(1000)

Expand All @@ -185,7 +167,7 @@ func TestCache_ConcurrentAccess(t *testing.T) {
defer wg.Done()
for j := uint32(0); j < numOperations; j++ {
streamID := id*numOperations + j
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, 1)
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}})
}
}(i)
}
Expand All @@ -203,7 +185,7 @@ func TestCache_ConcurrentAccess(t *testing.T) {
}

func TestCache_ConcurrentReadWrite(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0)
cache := NewCache(time.Second, 0)
const numGoroutines = 10
const numOperations = uint32(1000)

Expand All @@ -216,7 +198,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) {
defer wg.Done()
for j := uint32(0); j < numOperations; j++ {
streamID := id*numOperations + j
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, uint64(j))
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}})
}
}(i)
}
Expand All @@ -236,7 +218,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) {
}

func TestCache_ConcurrentAddGet(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0)
cache := NewCache(time.Second, 0)
const numGoroutines = 10
const numOperations = uint32(1000)

Expand All @@ -249,7 +231,7 @@ func TestCache_ConcurrentAddGet(t *testing.T) {
defer wg.Done()
for j := uint32(0); j < numOperations; j++ {
streamID := id*numOperations + j
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, 1)
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}})
}
}(i)
}
Expand Down
Loading
Loading