Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/afraid-doors-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#changed LLO's observations now run in a loop, so their cache is always warm.
3 changes: 3 additions & 0 deletions core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ func (d *delegate) Close() error {
for _, oracle := range d.oracles {
merr = errors.Join(merr, oracle.Close())
}
if closer, ok := d.ds.(Closer); ok {
merr = errors.Join(merr, closer.Close())
}
merr = errors.Join(merr, d.telem.Close())
return merr
})
Expand Down
2 changes: 0 additions & 2 deletions core/services/llo/mercurytransmitter/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/config"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/grpc"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/observation"
)

const (
Expand Down Expand Up @@ -269,7 +268,6 @@ func (mt *transmitter) Transmit(
err = fmt.Errorf("failed to add transmission to commit channel: %w", ctx.Err())
}
}
observation.GetCache(digest).SetLastTransmissionSeqNr(seqNr)
})

if !ok {
Expand Down
56 changes: 11 additions & 45 deletions core/services/llo/observation/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,34 @@ import (
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-data-streams/llo"
)

var (
registryMu = sync.Mutex{}
registry = map[ocr2types.ConfigDigest]*Cache{}

promCacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "llo",
Subsystem: "datasource",
Name: "cache_hit_count",
Help: "Number of local observation cache hits",
},
[]string{"configDigest", "streamID"},
[]string{"streamID"},
)
promCacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "llo",
Subsystem: "datasource",
Name: "cache_miss_count",
Help: "Number of local observation cache misses",
},
[]string{"configDigest", "streamID", "reason"},
[]string{"streamID", "reason"},
)
)

func GetCache(configDigest ocr2types.ConfigDigest) *Cache {
registryMu.Lock()
defer registryMu.Unlock()

cache, ok := registry[configDigest]
if !ok {
cache = NewCache(configDigest, 500*time.Millisecond, time.Minute)
registry[configDigest] = cache
}

return cache
}

// Cache of stream values.
// It maintains a cache of stream values fetched from adapters until the last
// transmission sequence number is greater or equal the sequence number at which
Expand All @@ -58,30 +40,25 @@ func GetCache(configDigest ocr2types.ConfigDigest) *Cache {
// The cache is cleaned up periodically to remove decommissioned stream values
// if the provided cleanupInterval is greater than 0.
type Cache struct {
mu sync.RWMutex

configDigestStr string
mu sync.RWMutex
values map[llotypes.StreamID]item
maxAge time.Duration
cleanupInterval time.Duration

lastTransmissionSeqNr atomic.Uint64
closeChan chan struct{}
closeChan chan struct{}
}

type item struct {
value llo.StreamValue
seqNr uint64
createdAt time.Time
}

// NewCache creates a new cache.
//
// maxAge is the maximum age of a stream value to keep in the cache.
// cleanupInterval is the interval to clean up the cache.
func NewCache(configDigest ocr2types.ConfigDigest, maxAge time.Duration, cleanupInterval time.Duration) *Cache {
func NewCache(maxAge time.Duration, cleanupInterval time.Duration) *Cache {
c := &Cache{
configDigestStr: configDigest.Hex(),
values: make(map[llotypes.StreamID]item),
maxAge: maxAge,
cleanupInterval: cleanupInterval,
Expand Down Expand Up @@ -110,16 +87,11 @@ func NewCache(configDigest ocr2types.ConfigDigest, maxAge time.Duration, cleanup
return c
}

// SetLastTransmissionSeqNr sets the last transmission sequence number.
func (c *Cache) SetLastTransmissionSeqNr(seqNr uint64) {
c.lastTransmissionSeqNr.Store(seqNr)
}

// Add adds a stream value to the cache.
func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue, seqNr uint64) {
func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue) {
c.mu.Lock()
defer c.mu.Unlock()
c.values[id] = item{value: value, seqNr: seqNr, createdAt: time.Now()}
c.values[id] = item{value: value, createdAt: time.Now()}
}

func (c *Cache) Get(id llotypes.StreamID) (llo.StreamValue, bool) {
Expand All @@ -129,31 +101,25 @@ func (c *Cache) Get(id llotypes.StreamID) (llo.StreamValue, bool) {
label := strconv.FormatUint(uint64(id), 10)
item, ok := c.values[id]
if !ok {
promCacheMissCount.WithLabelValues(c.configDigestStr, label, "notFound").Inc()
return nil, false
}

if item.seqNr <= c.lastTransmissionSeqNr.Load() {
promCacheMissCount.WithLabelValues(c.configDigestStr, label, "seqNr").Inc()
promCacheMissCount.WithLabelValues(label, "notFound").Inc()
return nil, false
}

if time.Since(item.createdAt) >= c.maxAge {
promCacheMissCount.WithLabelValues(c.configDigestStr, label, "maxAge").Inc()
promCacheMissCount.WithLabelValues(label, "maxAge").Inc()
return nil, false
}

promCacheHitCount.WithLabelValues(c.configDigestStr, label).Inc()
promCacheHitCount.WithLabelValues(label).Inc()
return item.value, true
}

func (c *Cache) cleanup() {
c.mu.Lock()
defer c.mu.Unlock()

lastTransmissionSeqNr := c.lastTransmissionSeqNr.Load()
for id, item := range c.values {
if item.seqNr <= lastTransmissionSeqNr || time.Since(item.createdAt) >= c.maxAge {
if time.Since(item.createdAt) >= c.maxAge {
delete(c.values, id)
}
}
Expand Down
40 changes: 11 additions & 29 deletions core/services/llo/observation/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
"github.com/smartcontractkit/chainlink-data-streams/llo"
)
Expand Down Expand Up @@ -72,7 +70,7 @@ func TestNewCache(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, tt.maxAge, tt.cleanupInterval)
cache := NewCache(tt.maxAge, tt.cleanupInterval)
require.NotNil(t, cache)
assert.Equal(t, tt.maxAge, cache.maxAge)
assert.Equal(t, tt.cleanupInterval, cache.cleanupInterval)
Expand All @@ -87,7 +85,6 @@ func TestCache_Add_Get(t *testing.T) {
name string
streamID llotypes.StreamID
value llo.StreamValue
seqNr uint64
maxAge time.Duration
wantValue llo.StreamValue
wantFound bool
Expand All @@ -97,7 +94,6 @@ func TestCache_Add_Get(t *testing.T) {
name: "get existing value",
streamID: 1,
value: &mockStreamValue{value: []byte{42}},
seqNr: 10,
maxAge: time.Second,
wantValue: &mockStreamValue{value: []byte{42}},
wantFound: true,
Expand All @@ -106,28 +102,14 @@ func TestCache_Add_Get(t *testing.T) {
name: "get non-existent value",
streamID: 1,
value: &mockStreamValue{value: []byte{42}},
seqNr: 10,
maxAge: time.Second,
wantValue: nil,
wantFound: false,
},
{
name: "get expired by sequence number",
streamID: 1,
value: &mockStreamValue{value: []byte{42}},
seqNr: 5,
maxAge: time.Second,
wantValue: nil,
wantFound: false,
beforeGet: func(cache *Cache) {
cache.SetLastTransmissionSeqNr(10)
},
},
{
name: "get expired by age",
streamID: 1,
value: &mockStreamValue{value: []byte{42}},
seqNr: 10,
maxAge: time.Nanosecond * 100,
wantValue: nil,
wantFound: false,
Expand All @@ -139,10 +121,10 @@ func TestCache_Add_Get(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, tt.maxAge, 0)
cache := NewCache(tt.maxAge, 0)

if tt.wantFound {
cache.Add(tt.streamID, tt.value, tt.seqNr)
cache.Add(tt.streamID, tt.value)
}

if tt.beforeGet != nil {
Expand All @@ -159,11 +141,11 @@ func TestCache_Add_Get(t *testing.T) {
}

func TestCache_Cleanup(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, time.Nanosecond*100, time.Millisecond)
cache := NewCache(time.Nanosecond*100, time.Millisecond)
streamID := llotypes.StreamID(1)
value := &mockStreamValue{value: []byte{42}}

cache.Add(streamID, value, 10)
cache.Add(streamID, value)
time.Sleep(time.Millisecond * 2)

gotValue, gotFound := cache.Get(streamID)
Expand All @@ -172,7 +154,7 @@ func TestCache_Cleanup(t *testing.T) {
}

func TestCache_ConcurrentAccess(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0)
cache := NewCache(time.Second, 0)
const numGoroutines = 10
const numOperations = uint32(1000)

Expand All @@ -185,7 +167,7 @@ func TestCache_ConcurrentAccess(t *testing.T) {
defer wg.Done()
for j := uint32(0); j < numOperations; j++ {
streamID := id*numOperations + j
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, 1)
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}})
}
}(i)
}
Expand All @@ -203,7 +185,7 @@ func TestCache_ConcurrentAccess(t *testing.T) {
}

func TestCache_ConcurrentReadWrite(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0)
cache := NewCache(time.Second, 0)
const numGoroutines = 10
const numOperations = uint32(1000)

Expand All @@ -216,7 +198,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) {
defer wg.Done()
for j := uint32(0); j < numOperations; j++ {
streamID := id*numOperations + j
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, uint64(j))
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}})
}
}(i)
}
Expand All @@ -236,7 +218,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) {
}

func TestCache_ConcurrentAddGet(t *testing.T) {
cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0)
cache := NewCache(time.Second, 0)
const numGoroutines = 10
const numOperations = uint32(1000)

Expand All @@ -249,7 +231,7 @@ func TestCache_ConcurrentAddGet(t *testing.T) {
defer wg.Done()
for j := uint32(0); j < numOperations; j++ {
streamID := id*numOperations + j
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, 1)
cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}})
}
}(i)
}
Expand Down
Loading
Loading