diff --git a/.changeset/quick-kiwis-tease.md b/.changeset/quick-kiwis-tease.md new file mode 100644 index 00000000000..02c24999f74 --- /dev/null +++ b/.changeset/quick-kiwis-tease.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#internal LLO Observation loop diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 8933c416d3c..cb4f6c54afc 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -48,7 +48,7 @@ require ( github.com/smartcontractkit/chainlink-automation v0.8.1 github.com/smartcontractkit/chainlink-ccip v0.1.1-solana.0.20251009203201-900123a5c46a github.com/smartcontractkit/chainlink-common v0.9.6-0.20251016213956-9a6afcd1532a - github.com/smartcontractkit/chainlink-data-streams v0.1.5 + github.com/smartcontractkit/chainlink-data-streams v0.1.6 github.com/smartcontractkit/chainlink-deployments-framework v0.56.0 github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251017190323-e749d4a05491 github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251015115541-729ba0b2b1c1 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 44849b8e59b..65d9c029c8f 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1605,8 +1605,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 h1:INTd6uKc/ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= -github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= +github.com/smartcontractkit/chainlink-data-streams v0.1.6 h1:B3cwmJrVYoJVAjPOyQWTNaGD+V30HI1vFHhC2dQpWDo= +github.com/smartcontractkit/chainlink-data-streams v0.1.6/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-deployments-framework v0.56.0 h1:VkzslEC/a7Ze5qqLdX1ZNL7ug0TwVd5w3hL5jA/DvWE= github.com/smartcontractkit/chainlink-deployments-framework v0.56.0/go.mod h1:ObH5HJ4yXzTmQLc6Af+ufrTVcQ+ocasHJ0YBZjw5ZCM= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251017190323-e749d4a05491 h1:X+c+LdCI9Dc4EfpitByVnGODtwoV+rAdFKCG0y4gAag= diff --git a/core/services/llo/delegate.go b/core/services/llo/delegate.go index f2b586fd9a5..4765c6fc361 100644 --- a/core/services/llo/delegate.go +++ b/core/services/llo/delegate.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "strconv" - "time" "github.com/prometheus/client_golang/prometheus" ocrcommontypes "github.com/smartcontractkit/libocr/commontypes" @@ -121,20 +120,15 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) { CaptureReportTelemetry: cfg.CaptureReportTelemetry, }) - cache := observation.NewCache(500*time.Millisecond, time.Minute) ds := observation.NewDataSource( logger.Named(lggr, "DataSource"), cfg.Registry, t, - cache, ) notifier, ok := cfg.ContractTransmitter.(TransmitNotifier) if ok { notifier.OnTransmit(t.TrackSeqNr) - notifier.OnTransmit(func(digest ocr2types.ConfigDigest, seqNr uint64) { - cache.SetLastTransmissionSeqNr(seqNr) - }) } return &delegate{services.StateMachine{}, cfg, reportCodecs, cfg.ShouldRetireCache, ds, t, []Closer{}}, nil @@ -223,6 +217,9 @@ func (d *delegate) Close() error { for _, oracle := range d.oracles { merr = errors.Join(merr, oracle.Close()) } + if closer, ok := d.ds.(Closer); ok { + merr = errors.Join(merr, closer.Close()) + } merr = errors.Join(merr, d.telem.Close()) return merr }) diff --git a/core/services/llo/observation/cache.go b/core/services/llo/observation/cache.go index f4a45bdcb76..f7c7d1484fd 100644 --- a/core/services/llo/observation/cache.go +++ b/core/services/llo/observation/cache.go @@ -4,7 +4,6 @@ import ( "runtime" "strconv" "sync" - "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -41,19 +40,15 @@ var ( // The cache is cleaned up periodically to remove decommissioned stream values // if the provided cleanupInterval is greater than 0. type Cache struct { - mu sync.RWMutex - + mu sync.RWMutex values map[llotypes.StreamID]item - maxAge time.Duration cleanupInterval time.Duration - lastTransmissionSeqNr atomic.Uint64 - closeChan chan struct{} + closeChan chan struct{} } type item struct { value llo.StreamValue - seqNr uint64 expiresAt time.Time } @@ -61,10 +56,9 @@ type item struct { // // maxAge is the maximum age of a stream value to keep in the cache. // cleanupInterval is the interval to clean up the cache. -func NewCache(maxAge time.Duration, cleanupInterval time.Duration) *Cache { +func NewCache(cleanupInterval time.Duration) *Cache { c := &Cache{ values: make(map[llotypes.StreamID]item), - maxAge: maxAge, cleanupInterval: cleanupInterval, closeChan: make(chan struct{}), } @@ -91,31 +85,19 @@ func NewCache(maxAge time.Duration, cleanupInterval time.Duration) *Cache { return c } -// SetLastTransmissionSeqNr sets the last transmission sequence number. -func (c *Cache) SetLastTransmissionSeqNr(seqNr uint64) { - if c == nil { - return - } - - c.lastTransmissionSeqNr.Store(seqNr) -} - // Add adds a stream value to the cache. -func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue, seqNr uint64) { - if c == nil { - return +func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue, ttl time.Duration) { + var expiresAt time.Time + if ttl > 0 { + expiresAt = time.Now().Add(ttl) } c.mu.Lock() defer c.mu.Unlock() - c.values[id] = item{value: value, seqNr: seqNr, expiresAt: time.Now().Add(c.maxAge)} + c.values[id] = item{value: value, expiresAt: expiresAt} } -func (c *Cache) Get(id llotypes.StreamID) llo.StreamValue { - if c == nil { - return nil - } - +func (c *Cache) Get(id llotypes.StreamID) (llo.StreamValue, time.Time) { c.mu.RLock() defer c.mu.RUnlock() @@ -123,31 +105,28 @@ func (c *Cache) Get(id llotypes.StreamID) llo.StreamValue { item, ok := c.values[id] if !ok { promCacheMissCount.WithLabelValues(label, "notFound").Inc() - return nil - } - - if item.seqNr <= c.lastTransmissionSeqNr.Load() { - promCacheMissCount.WithLabelValues(label, "seqNr").Inc() - return nil + return nil, time.Time{} } if time.Now().After(item.expiresAt) { promCacheMissCount.WithLabelValues(label, "maxAge").Inc() - return nil + return nil, time.Time{} } promCacheHitCount.WithLabelValues(label).Inc() - return item.value + return item.value, item.expiresAt } func (c *Cache) cleanup() { c.mu.Lock() defer c.mu.Unlock() - lastTransmissionSeqNr := c.lastTransmissionSeqNr.Load() - now := time.Now() for id, item := range c.values { - if item.seqNr <= lastTransmissionSeqNr || now.After(item.expiresAt) { + if item.expiresAt.IsZero() { + continue + } + + if time.Now().After(item.expiresAt) { delete(c.values, id) } } diff --git a/core/services/llo/observation/cache_test.go b/core/services/llo/observation/cache_test.go index 080074f7d1a..f4ec3da6dc9 100644 --- a/core/services/llo/observation/cache_test.go +++ b/core/services/llo/observation/cache_test.go @@ -50,19 +50,16 @@ func (m *mockStreamValue) Type() llo.LLOStreamValue_Type { func TestNewCache(t *testing.T) { tests := []struct { name string - maxAge time.Duration cleanupInterval time.Duration wantErr bool }{ { name: "valid cache with cleanup", - maxAge: time.Second, cleanupInterval: time.Millisecond * 100, wantErr: false, }, { name: "valid cache without cleanup", - maxAge: time.Second, cleanupInterval: 0, wantErr: false, }, @@ -70,9 +67,8 @@ func TestNewCache(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cache := NewCache(tt.maxAge, tt.cleanupInterval) + cache := NewCache(tt.cleanupInterval) require.NotNil(t, cache) - assert.Equal(t, tt.maxAge, cache.maxAge) assert.Equal(t, tt.cleanupInterval, cache.cleanupInterval) assert.NotNil(t, cache.values) assert.NotNil(t, cache.closeChan) @@ -85,50 +81,29 @@ func TestCache_Add_Get(t *testing.T) { name string streamID llotypes.StreamID value llo.StreamValue - seqNr uint64 - maxAge time.Duration + ttl time.Duration wantValue llo.StreamValue - wantFound bool beforeGet func(cache *Cache) }{ { name: "get existing value", streamID: 1, value: &mockStreamValue{value: []byte{42}}, - seqNr: 10, - maxAge: time.Second, + ttl: time.Second, wantValue: &mockStreamValue{value: []byte{42}}, - wantFound: true, }, { name: "get non-existent value", streamID: 1, - value: &mockStreamValue{value: []byte{42}}, - seqNr: 10, - maxAge: time.Second, - wantValue: nil, - wantFound: false, - }, - { - name: "get expired by sequence number", - streamID: 1, - value: &mockStreamValue{value: []byte{42}}, - seqNr: 5, - maxAge: time.Second, + ttl: time.Second, wantValue: nil, - wantFound: false, - beforeGet: func(cache *Cache) { - cache.SetLastTransmissionSeqNr(10) - }, }, { name: "get expired by age", streamID: 1, value: &mockStreamValue{value: []byte{42}}, - seqNr: 10, - maxAge: time.Nanosecond * 100, + ttl: time.Nanosecond * 100, wantValue: nil, - wantFound: false, beforeGet: func(_ *Cache) { time.Sleep(time.Millisecond) }, @@ -137,33 +112,36 @@ func TestCache_Add_Get(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cache := NewCache(tt.maxAge, 0) + cache := NewCache(0) - if tt.wantFound { - cache.Add(tt.streamID, tt.value, tt.seqNr) + if tt.value != nil { + cache.Add(tt.streamID, tt.value, tt.ttl) } if tt.beforeGet != nil { tt.beforeGet(cache) } - assert.Equal(t, tt.wantValue, cache.Get(tt.streamID)) + val, _ := cache.Get(tt.streamID) + assert.Equal(t, tt.wantValue, val) }) } } func TestCache_Cleanup(t *testing.T) { - cache := NewCache(time.Nanosecond*100, time.Millisecond) + cache := NewCache(time.Millisecond) streamID := llotypes.StreamID(1) value := &mockStreamValue{value: []byte{42}} - cache.Add(streamID, value, 10) + cache.Add(streamID, value, time.Nanosecond*100) time.Sleep(time.Millisecond * 2) - assert.Nil(t, cache.Get(streamID)) + + gotValue, _ := cache.Get(streamID) + assert.Nil(t, gotValue) } func TestCache_ConcurrentAccess(t *testing.T) { - cache := NewCache(time.Second, 0) + cache := NewCache(0) const numGoroutines = 10 const numOperations = uint32(1000) @@ -176,7 +154,7 @@ func TestCache_ConcurrentAccess(t *testing.T) { defer wg.Done() for j := range numOperations { streamID := id*numOperations + j - cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, 1) + cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, time.Second) } }(i) } @@ -186,13 +164,14 @@ func TestCache_ConcurrentAccess(t *testing.T) { for i := range uint32(numGoroutines) { for j := range numOperations { streamID := i*numOperations + j - assert.Equal(t, &mockStreamValue{value: []byte{byte(i)}}, cache.Get(streamID)) + val, _ := cache.Get(streamID) + assert.Equal(t, &mockStreamValue{value: []byte{byte(i)}}, val) } } } func TestCache_ConcurrentReadWrite(t *testing.T) { - cache := NewCache(time.Second, 0) + cache := NewCache(0) const numGoroutines = 10 const numOperations = uint32(1000) @@ -205,7 +184,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) { defer wg.Done() for j := range numOperations { streamID := id*numOperations + j - cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, uint64(j)) + cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, time.Second) } }(i) } @@ -216,7 +195,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) { defer wg.Done() for j := range numOperations { streamID := id*numOperations + j - cache.Get(streamID) + _, _ = cache.Get(streamID) } }(i) } @@ -225,7 +204,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) { } func TestCache_ConcurrentAddGet(t *testing.T) { - cache := NewCache(time.Second, 0) + cache := NewCache(0) const numGoroutines = 10 const numOperations = uint32(1000) @@ -238,7 +217,7 @@ func TestCache_ConcurrentAddGet(t *testing.T) { defer wg.Done() for j := range numOperations { streamID := id*numOperations + j - cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, 1) + cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, time.Second) } }(i) } @@ -249,7 +228,7 @@ func TestCache_ConcurrentAddGet(t *testing.T) { defer wg.Done() for j := range numOperations { streamID := id*numOperations + j - cache.Get(streamID) + _, _ = cache.Get(streamID) } }(i) } diff --git a/core/services/llo/observation/data_source.go b/core/services/llo/observation/data_source.go index 41821c7a4ab..901df646bd1 100644 --- a/core/services/llo/observation/data_source.go +++ b/core/services/llo/observation/data_source.go @@ -8,11 +8,14 @@ import ( "sort" "strconv" "sync" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" @@ -38,6 +41,17 @@ var ( }, []string{"streamID"}, ) + promObservationLoopDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "llo", + Subsystem: "datasource", + Name: "observation_loop_duration_ms", + Help: "Duration of the observation loop", + Buckets: []float64{ + 10, 25, 50, 100, 250, 500, 750, 1000, + }, + }, + []string{"configDigest"}, + ) ) type ErrObservationFailed struct { @@ -70,99 +84,154 @@ func (e *ErrObservationFailed) Unwrap() error { var _ llo.DataSource = &dataSource{} type dataSource struct { - lggr logger.Logger - registry Registry - t Telemeter - - activeSeqNrMu sync.Mutex - activeSeqNr uint64 - cache *Cache + lggr logger.Logger + registry Registry + t Telemeter + cache *Cache + observationLoopStarted atomic.Bool + observationLoopCloseCh services.StopChan + observationLoopDoneCh chan struct{} // will be closed when we exit the observation loop + + observableStreamsMu sync.Mutex + observableStreams *observableStreamValues } -func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter, c *Cache) llo.DataSource { - return newDataSource(lggr, registry, t, c) +func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter) llo.DataSource { + return newDataSource(lggr, registry, t) } -func newDataSource(lggr logger.Logger, registry Registry, t Telemeter, c *Cache) *dataSource { +func newDataSource(lggr logger.Logger, registry Registry, t Telemeter) *dataSource { return &dataSource{ - lggr: logger.Named(lggr, "DataSource"), - registry: registry, - t: t, - cache: c, + lggr: logger.Named(lggr, "DataSource"), + registry: registry, + t: t, + cache: NewCache(time.Minute), + observationLoopCloseCh: make(chan struct{}), + observationLoopDoneCh: make(chan struct{}), } } // Observe looks up all streams in the registry and populates a map of stream ID => value func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error { - now := time.Now() - lggr := logger.With(d.lggr, "observationTimestamp", opts.ObservationTimestamp(), "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) + // Observation loop logic + { + // Update the list of streams to observe for this config digest and set the timeout + // StreamValues needs a copy to avoid concurrent access + d.setObservableStreams(ctx, streamValues, opts) + + if !d.observationLoopStarted.Load() { + loopStartedCh := make(chan struct{}) + go d.startObservationLoop(loopStartedCh) + <-loopStartedCh + } + } - // stream ids to observe - streamIDs := make([]streams.StreamID, 0, len(streamValues)) + // Fetch the cached observations for all streams. for streamID := range streamValues { - streamIDs = append(streamIDs, streamID) + streamValues[streamID], _ = d.cache.Get(streamID) } - if opts.VerboseLogging() { - slices.Sort(streamIDs) - lggr = logger.With(lggr, "streamIDs", streamIDs) - lggr.Debugw("Observing streams") + return nil +} + +// startObservationLoop continuously makes observations for the streams in this data source +// caching them in memory making the Observe call duration and performance independent +// of the underlying resources providing the observations. +// Based on the expected maxObservationDuration determine the pace of the observation loop +// and for how long to cache the observations. +func (d *dataSource) startObservationLoop(loopStartedCh chan struct{}) { + if !d.observationLoopStarted.CompareAndSwap(false, true) { + close(loopStartedCh) + return } - // update the active seq nr - // we track the transmitting sequence number to ensure observations - // are cached at the sequence number of the active plugin ocr instance (blue/green) - // but can also be used by the passive instance. - // In case of cache misses for the passive instance we still run the pipeline - // but cache at the last sequence number of the active instance. - // this ensures that they are still invalidated at the next transmission. - activeSeqNr := d.updateActiveSeqNr(opts) + loopStarting := true + var elapsed time.Duration + stopChanCtx, stopChanCancel := d.observationLoopCloseCh.NewCtx() + defer stopChanCancel() - var wg sync.WaitGroup - wg.Add(len(streamValues)) + for { + if stopChanCtx.Err() != nil { + close(d.observationLoopDoneCh) + return + } - var mu sync.Mutex - successfulStreamIDs := make([]streams.StreamID, 0, len(streamValues)) - var errs []ErrObservationFailed + osv := d.getObservableStreams() + if osv == nil || len(osv.streamValues) == 0 { + // There is nothing to observe, exit and let the next Observe() call reinitialize the loop. + d.lggr.Warnw("observation loop: no streams to observe") - // oc only lives for the duration of this Observe call - oc := NewObservationContext(lggr, d.registry, d.t) + // still at the loop initialization, notify the caller and return + if loopStarting { + close(loopStartedCh) + } + return + } - // Telemetry - { - // Size needs to accommodate the max number of telemetry events that could be generated - // Standard case might be about 3 bridge requests per spec and one stream<=>spec - // Overallocate for safety (to avoid dropping packets) - telemCh := d.t.MakeObservationScopedTelemetryCh(opts, 10*len(streamValues)) - if telemCh != nil { - if d.t.CaptureEATelemetry() { - ctx = pipeline.WithTelemetryCh(ctx, telemCh) + if d.observationLoopStarted.Load() { + time.Sleep(osv.observationTimeout) + } + + startTS := time.Now() + ctx, cancel := context.WithTimeout(stopChanCtx, osv.observationTimeout) + lggr := logger.With(d.lggr, "observationTimestamp", osv.opts.ObservationTimestamp(), "configDigest", osv.opts.ConfigDigest(), "seqNr", osv.opts.OutCtx().SeqNr) + + if osv.opts.VerboseLogging() { + streamIDs := make([]streams.StreamID, 0, len(osv.streamValues)) + for streamID := range osv.streamValues { + streamIDs = append(streamIDs, streamID) } - if d.t.CaptureObservationTelemetry() { - ctx = WithObservationTelemetryCh(ctx, telemCh) + sort.Slice(streamIDs, func(i, j int) bool { return streamIDs[i] < streamIDs[j] }) + lggr = logger.With(lggr, "streamIDs", streamIDs) + lggr.Debugw("Observing streams") + } + + // Telemetry + var telemCh chan<- interface{} + { + // Size needs to accommodate the max number of telemetry events that could be generated + // Standard case might be about 3 bridge requests per spec and one stream<=>spec + // Overallocate for safety (to avoid dropping packets) + telemCh = d.t.MakeObservationScopedTelemetryCh(osv.opts, 10*len(osv.streamValues)) + if telemCh != nil { + if d.t.CaptureEATelemetry() { + ctx = pipeline.WithTelemetryCh(ctx, telemCh) + } + if d.t.CaptureObservationTelemetry() { + ctx = WithObservationTelemetryCh(ctx, telemCh) + } } - // After all Observations have returned, nothing else will be sent to the - // telemetry channel, so it can safely be closed - defer close(telemCh) } - } - // Observe all streams concurrently - for _, streamID := range streamIDs { - go func(streamID llotypes.StreamID) { - defer wg.Done() - var val llo.StreamValue - var err error - - // check for valid cached value before observing - if val = d.cache.Get(streamID); val == nil { - // no valid cached value, observe the stream - if val, err = oc.Observe(ctx, streamID, opts); err != nil { - strmIDStr := strconv.FormatUint(uint64(streamID), 10) + var mu sync.Mutex + successfulStreamIDs := make([]streams.StreamID, 0, len(osv.streamValues)) + var errs []ErrObservationFailed + + var wg sync.WaitGroup + oc := NewObservationContext(lggr, d.registry, d.t) + + for streamID := range osv.streamValues { + if val, expiresAt := d.cache.Get(streamID); val != nil { + if time.Until(expiresAt) > 2*osv.observationTimeout { + d.lggr.Debugw("cached stream observation still valid, skipping", "streamID", + streamID, "expiresAt", expiresAt.Format(time.RFC3339)) + continue + } + } + + wg.Add(1) + go func(streamID llotypes.StreamID) { + defer wg.Done() + var val llo.StreamValue + var err error + + // Observe the stream + if val, err = oc.Observe(ctx, streamID, osv.opts); err != nil { + streamIDStr := strconv.FormatUint(uint64(streamID), 10) if errors.As(err, &MissingStreamError{}) { - promMissingStreamCount.WithLabelValues(strmIDStr).Inc() + promMissingStreamCount.WithLabelValues(streamIDStr).Inc() } - promObservationErrorCount.WithLabelValues(strmIDStr).Inc() + promObservationErrorCount.WithLabelValues(streamIDStr).Inc() mu.Lock() errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"}) mu.Unlock() @@ -170,69 +239,133 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, } // cache the observed value - d.cache.Add(streamID, val, activeSeqNr) - } + d.cache.Add(streamID, val, 4*osv.observationTimeout) - mu.Lock() - defer mu.Unlock() + mu.Lock() + successfulStreamIDs = append(successfulStreamIDs, streamID) + mu.Unlock() + }(streamID) + } - successfulStreamIDs = append(successfulStreamIDs, streamID) - if val != nil { - streamValues[streamID] = val - } - }(streamID) - } + wg.Wait() + elapsed = time.Since(startTS) - // Wait for all Observations to complete - wg.Wait() + // notify the caller that we've completed our first round of observations. + if loopStarting { + loopStarting = false + close(loopStartedCh) + } - // Only log on errors or if VerboseLogging is turned on - if len(errs) > 0 || opts.VerboseLogging() { - elapsed := time.Since(now) + // After all Observations have returned, nothing else will be sent to the + // telemetry channel, so it can safely be closed + if telemCh != nil { + close(telemCh) + } - slices.Sort(successfulStreamIDs) - sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID }) + // Only log on errors or if VerboseLogging is turned on + if len(errs) > 0 || osv.opts.VerboseLogging() { + slices.Sort(successfulStreamIDs) + sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID }) - failedStreamIDs := make([]streams.StreamID, len(errs)) - errStrs := make([]string, len(errs)) - for i, e := range errs { - errStrs[i] = e.String() - failedStreamIDs[i] = e.streamID - } + failedStreamIDs := make([]streams.StreamID, len(errs)) + errStrs := make([]string, len(errs)) + for i, e := range errs { + errStrs[i] = e.String() + failedStreamIDs[i] = e.streamID + } - lggr = logger.With(lggr, "elapsed", elapsed, "nSuccessfulStreams", - len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "errs", errStrs) + lggr = logger.With(lggr, "elapsed", elapsed, "nSuccessfulStreams", + len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "errs", errStrs) - if opts.VerboseLogging() { - lggr = logger.With(lggr, "streamValues", streamValues) + if osv.opts.VerboseLogging() { + lggr = logger.With(lggr, "streamValues", osv.streamValues) + } } - if len(errs) == 0 && opts.VerboseLogging() { - lggr.Infow("Observation succeeded for all streams") - } else if len(errs) > 0 { - lggr.Warnw("Observation failed for streams") - } + promObservationLoopDuration.WithLabelValues( + osv.opts.ConfigDigest().String()).Observe(float64(elapsed.Milliseconds())) + + lggr.Debugw("Observation loop", "elapsed_ms", elapsed.Milliseconds()) + + // context cancellation + cancel() } +} + +func (d *dataSource) Close() error { + close(d.observationLoopCloseCh) + d.observationLoopStarted.Store(false) + <-d.observationLoopDoneCh return nil } -func (d *dataSource) updateActiveSeqNr(opts llo.DSOpts) uint64 { - if opts.OutcomeCodec() == nil { - return opts.OutCtx().SeqNr +type observableStreamValues struct { + opts llo.DSOpts + streamValues llo.StreamValues + observationTimeout time.Duration +} + +// setObservableStreams sets the observable streams for the given config digest. +func (d *dataSource) setObservableStreams(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) { + if opts == nil || len(streamValues) == 0 { + d.lggr.Warnw("setObservableStreams: no observable streams to set", + "opts", opts, "observable_streams", len(streamValues)) + return } - outcome, err := opts.OutcomeCodec().Decode(opts.OutCtx().PreviousOutcome) + outCtx := opts.OutCtx() + outcome, err := opts.OutcomeCodec().Decode(outCtx.PreviousOutcome) if err != nil { - d.lggr.Warnf("failed to decode previous outcome, err: %v", err) - return opts.OutCtx().SeqNr + d.lggr.Errorw("setObservableStreams: failed to decode outcome", "error", err) + return + } + + if outcome.LifeCycleStage != llo.LifeCycleStageProduction { + d.lggr.Debugw( + "setObservableStreams: LLO OCR instance is not in production lifecycle stage", + "configDigest", opts.ConfigDigest().String(), "stage", outcome.LifeCycleStage) + return } - d.activeSeqNrMu.Lock() - defer d.activeSeqNrMu.Unlock() - if outcome.LifeCycleStage == llo.LifeCycleStageProduction { - d.activeSeqNr = opts.OutCtx().SeqNr + osv := &observableStreamValues{ + opts: opts, + streamValues: make(llo.StreamValues, len(streamValues)), + observationTimeout: 250 * time.Millisecond, } - return d.activeSeqNr + for streamID := range streamValues { + osv.streamValues[streamID] = nil + } + + if deadline, ok := ctx.Deadline(); ok { + osv.observationTimeout = time.Until(deadline) + } + + d.lggr.Debugw("setObservableStreams", + "timeout_millis", osv.observationTimeout.Milliseconds(), + "observable_streams", len(osv.streamValues)) + + d.observableStreamsMu.Lock() + defer d.observableStreamsMu.Unlock() + + if d.observableStreams == nil || + len(d.observableStreams.streamValues) != len(osv.streamValues) || + d.observableStreams.observationTimeout != osv.observationTimeout { + d.lggr.Infow("setObservableStreams: observable streams changed", + "timeout_millis", osv.observationTimeout.Milliseconds(), + "observable_streams", len(osv.streamValues), + ) + } + + d.observableStreams = osv +} + +// getObservableStreams returns the active plugin data source options, the streams to observe and the observation interval +// the observation interval is the maximum time we can spend observing streams. We ensure that we don't exceed this time and +// we wait for the remaining time in the observation loop. +func (d *dataSource) getObservableStreams() *observableStreamValues { + d.observableStreamsMu.Lock() + defer d.observableStreamsMu.Unlock() + return d.observableStreams } diff --git a/core/services/llo/observation/data_source_test.go b/core/services/llo/observation/data_source_test.go index 344b8ef1703..881032d8c04 100644 --- a/core/services/llo/observation/data_source_test.go +++ b/core/services/llo/observation/data_source_test.go @@ -2,7 +2,6 @@ package observation import ( "context" - "crypto/rand" "encoding/hex" "errors" "fmt" @@ -13,7 +12,6 @@ import ( "testing" "time" - "github.com/prometheus/client_golang/prometheus/testutil" "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -24,6 +22,7 @@ import ( llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" + "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest" @@ -55,10 +54,13 @@ func (m *mockPipeline) StreamIDs() []streams.StreamID { } type mockRegistry struct { + mu sync.Mutex pipelines map[streams.StreamID]*mockPipeline } func (m *mockRegistry) Get(streamID streams.StreamID) (p streams.Pipeline, exists bool) { + m.mu.Lock() + defer m.mu.Unlock() p, exists = m.pipelines[streamID] return } @@ -66,17 +68,24 @@ func (m *mockRegistry) Get(streamID streams.StreamID) (p streams.Pipeline, exist func makePipelineWithSingleResult[T any](runID int64, res T, err error) *mockPipeline { return &mockPipeline{ run: &pipeline.Run{ID: runID}, - trrs: []pipeline.TaskRunResult{pipeline.TaskRunResult{Task: &pipeline.MemoTask{}, Result: pipeline.Result{Value: res}}}, + trrs: []pipeline.TaskRunResult{{Task: &pipeline.MemoTask{}, Result: pipeline.Result{Value: res}}}, err: err, } } -func makeStreamValues() llo.StreamValues { - return llo.StreamValues{ - 1: nil, - 2: nil, - 3: nil, +func makeStreamValues(streamIDs ...llotypes.StreamID) llo.StreamValues { + if len(streamIDs) == 0 { + return llo.StreamValues{ + 1: nil, + 2: nil, + 3: nil, + } } + vals := llo.StreamValues{} + for _, streamID := range streamIDs { + vals[streamID] = nil + } + return vals } type mockOpts struct { @@ -96,7 +105,7 @@ func (m *mockOpts) SeqNr() uint64 { } func (m *mockOpts) OutCtx() ocr3types.OutcomeContext { if m.outCtx.SeqNr == 0 { - return ocr3types.OutcomeContext{SeqNr: 1042, PreviousOutcome: ocr3types.Outcome([]byte("foo"))} + return ocr3types.OutcomeContext{SeqNr: 1042, PreviousOutcome: []byte("foo")} } return m.outCtx } @@ -112,9 +121,19 @@ func (m *mockOpts) ObservationTimestamp() time.Time { } return m.observationTimestamp } - func (m *mockOpts) OutcomeCodec() llo.OutcomeCodec { - return nil + return mockOutputCodec{} +} + +type mockOutputCodec struct{} + +func (oc mockOutputCodec) Encode(outcome llo.Outcome) (ocr3types.Outcome, error) { + return ocr3types.Outcome{}, nil +} +func (oc mockOutputCodec) Decode(encoded ocr3types.Outcome) (outcome llo.Outcome, err error) { + return llo.Outcome{ + LifeCycleStage: llo.LifeCycleStageProduction, + }, nil } type mockTelemeter struct { @@ -140,7 +159,10 @@ func (m *mockTelemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline. m.v3PremiumLegacyPackets = append(m.v3PremiumLegacyPackets, v3PremiumLegacyPacket{run, trrs, streamID, opts, val, err}) } func (m *mockTelemeter) MakeObservationScopedTelemetryCh(opts llo.DSOpts, size int) (ch chan<- any) { + m.mu.Lock() + defer m.mu.Unlock() m.ch = make(chan any, size) + return m.ch } func (m *mockTelemeter) GetOutcomeTelemetryCh() chan<- *llo.LLOOutcomeTelemetry { @@ -150,94 +172,139 @@ func (m *mockTelemeter) GetReportTelemetryCh() chan<- *llo.LLOReportTelemetry { func (m *mockTelemeter) CaptureEATelemetry() bool { return true } func (m *mockTelemeter) CaptureObservationTelemetry() bool { return true } +var observationTimeout = 100 * time.Millisecond + func Test_DataSource(t *testing.T) { - lggr := logger.TestLogger(t) - reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, telem.NullTelemeter, nil) - ctx := testutils.Context(t) + lggr := logger.NullLogger + mainCtx := testutils.Context(t) opts := &mockOpts{} t.Run("Observe", func(t *testing.T) { t.Run("doesn't set any values if no streams are defined", func(t *testing.T) { + reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, telem.NullTelemeter) + vals := makeStreamValues() + ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) + defer cancel() err := ds.Observe(ctx, vals, opts) assert.NoError(t, err) assert.Equal(t, makeStreamValues(), vals) + ds.Close() }) + t.Run("observes each stream with success and returns values matching map argument", func(t *testing.T) { + reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, telem.NullTelemeter) + + reg.mu.Lock() reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil) reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](3, big.NewInt(15), nil) + reg.mu.Unlock() vals := makeStreamValues() + ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) + defer cancel() err := ds.Observe(ctx, vals, opts) assert.NoError(t, err) assert.Equal(t, llo.StreamValues{ - 2: llo.ToDecimal(decimal.NewFromInt(40602)), 1: llo.ToDecimal(decimal.NewFromInt(2181)), + 2: llo.ToDecimal(decimal.NewFromInt(40602)), 3: llo.ToDecimal(decimal.NewFromInt(15)), - }, vals) + }, vals, fmt.Sprintf("vals: %v", vals)) + ds.Close() }) + t.Run("observes each stream and returns success/errors", func(t *testing.T) { - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), errors.New("something exploded")) - reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) - reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](3, nil, errors.New("something exploded 2")) + reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, telem.NullTelemeter) + + reg.mu.Lock() + reg.pipelines[11] = makePipelineWithSingleResult[*big.Int](11, big.NewInt(21810), errors.New("something exploded")) + reg.pipelines[12] = makePipelineWithSingleResult[*big.Int](12, big.NewInt(40602), nil) + reg.pipelines[13] = makePipelineWithSingleResult[*big.Int](13, nil, errors.New("something exploded 2")) + reg.mu.Unlock() + + vals := makeStreamValues(11, 12, 13) + ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) + defer cancel() - vals := makeStreamValues() err := ds.Observe(ctx, vals, opts) assert.NoError(t, err) assert.Equal(t, llo.StreamValues{ - 1: nil, - 2: llo.ToDecimal(decimal.NewFromInt(40602)), - 3: nil, - }, vals) + 11: nil, + 12: llo.ToDecimal(decimal.NewFromInt(40602)), + 13: nil, + }, vals, fmt.Sprintf("vals: %v", vals)) + ds.Close() }) t.Run("records telemetry", func(t *testing.T) { tm := &mockTelemeter{} - ds.t = tm + reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, tm) - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), nil) - reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil) - reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](102, big.NewInt(15), nil) + reg.mu.Lock() + reg.pipelines[21] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), nil) + reg.pipelines[22] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil) + reg.pipelines[23] = makePipelineWithSingleResult[*big.Int](102, big.NewInt(15), nil) + reg.mu.Unlock() + + vals := makeStreamValues(21, 22, 23) + ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) + defer cancel() - vals := makeStreamValues() err := ds.Observe(ctx, vals, opts) + tm.mu.Lock() + ch := tm.ch + tm.mu.Unlock() + + ds.Close() require.NoError(t, err) assert.Equal(t, llo.StreamValues{ - 2: llo.ToDecimal(decimal.NewFromInt(40602)), - 1: llo.ToDecimal(decimal.NewFromInt(2181)), - 3: llo.ToDecimal(decimal.NewFromInt(15)), - }, vals) - - require.Len(t, tm.v3PremiumLegacyPackets, 3) + 21: llo.ToDecimal(decimal.NewFromInt(2181)), + 22: llo.ToDecimal(decimal.NewFromInt(40602)), + 23: llo.ToDecimal(decimal.NewFromInt(15)), + }, vals, fmt.Sprintf("vals: %v", vals)) + + // Get only the last 3 packets, as those would be the result of the first round of observations. + tm.mu.Lock() + packets := tm.v3PremiumLegacyPackets[:3] + tm.mu.Unlock() m := make(map[int]v3PremiumLegacyPacket) - for _, pkt := range tm.v3PremiumLegacyPackets { + for _, pkt := range packets { m[int(pkt.run.ID)] = pkt } + pkt := m[100] assert.Equal(t, 100, int(pkt.run.ID)) assert.Len(t, pkt.trrs, 1) - assert.Equal(t, 1, int(pkt.streamID)) + assert.Equal(t, 21, int(pkt.streamID)) assert.Equal(t, opts, pkt.opts) assert.Equal(t, "2181", pkt.val.(*llo.Decimal).String()) require.NoError(t, pkt.err) telems := []any{} - for p := range tm.ch { + + for p := range ch { telems = append(telems, p) + if len(telems) >= 3 { + break + } } - require.Len(t, telems, 3) + + require.Len(t, telems[:3], 3) sort.Slice(telems, func(i, j int) bool { return telems[i].(*telem.LLOObservationTelemetry).StreamId < telems[j].(*telem.LLOObservationTelemetry).StreamId }) require.IsType(t, &telem.LLOObservationTelemetry{}, telems[0]) obsTelem := telems[0].(*telem.LLOObservationTelemetry) - assert.Equal(t, uint32(1), obsTelem.StreamId) + assert.Equal(t, uint32(21), obsTelem.StreamId) assert.Equal(t, int32(llo.LLOStreamValue_Decimal), obsTelem.StreamValueType) assert.Equal(t, "00000000020885", hex.EncodeToString(obsTelem.StreamValueBinary)) assert.Equal(t, "2181", obsTelem.StreamValueText) @@ -251,55 +318,62 @@ func Test_DataSource(t *testing.T) { t.Run("records telemetry for errors", func(t *testing.T) { tm := &mockTelemeter{} - ds.t = tm - - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), errors.New("something exploded")) - reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil) - reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](102, nil, errors.New("something exploded 2")) - - vals := makeStreamValues() + reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, tm) + + reg.mu.Lock() + reg.pipelines[31] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), errors.New("something exploded")) + reg.pipelines[32] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil) + reg.pipelines[33] = makePipelineWithSingleResult[*big.Int](102, nil, errors.New("something exploded 2")) + reg.mu.Unlock() + + vals := makeStreamValues(31, 32, 33) + ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) + defer cancel() err := ds.Observe(ctx, vals, opts) require.NoError(t, err) assert.Equal(t, llo.StreamValues{ - 2: llo.ToDecimal(decimal.NewFromInt(40602)), - 1: nil, - 3: nil, - }, vals) + 31: nil, + 32: llo.ToDecimal(decimal.NewFromInt(40602)), + 33: nil, + }, vals, fmt.Sprintf("vals: %v", vals)) - require.Len(t, tm.v3PremiumLegacyPackets, 3) m := make(map[int]v3PremiumLegacyPacket) + tm.mu.Lock() for _, pkt := range tm.v3PremiumLegacyPackets { m[int(pkt.run.ID)] = pkt } + tm.mu.Unlock() pkt := m[100] assert.Equal(t, 100, int(pkt.run.ID)) assert.Len(t, pkt.trrs, 1) - assert.Equal(t, 1, int(pkt.streamID)) + assert.Equal(t, 31, int(pkt.streamID)) assert.Equal(t, opts, pkt.opts) assert.Nil(t, pkt.val) assert.Error(t, pkt.err) + ds.Close() }) t.Run("uses cached values when available", func(t *testing.T) { - ds := newDataSource(lggr, reg, telem.NullTelemeter, - NewCache(time.Millisecond*500, time.Minute)) + reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, telem.NullTelemeter) // First observation to populate cache + reg.mu.Lock() reg.pipelines[10001] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil) reg.pipelines[20001] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) + reg.mu.Unlock() vals := llo.StreamValues{ 10001: nil, 20001: nil, 30001: nil, } - key := make([]byte, 32) - _, err := rand.Read(key) - require.NoError(t, err) - opts2 := &mockOpts{configDigest: ocr2types.ConfigDigest(key)} - err = ds.Observe(ctx, vals, opts2) + ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) + defer cancel() + err := ds.Observe(ctx, vals, opts) require.NoError(t, err) // Verify initial values @@ -310,8 +384,10 @@ func Test_DataSource(t *testing.T) { }, vals) // Change pipeline results + reg.mu.Lock() reg.pipelines[10001] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(9999), nil) reg.pipelines[20001] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(8888), nil) + reg.mu.Unlock() // Second observation should use cached values vals = llo.StreamValues{ @@ -319,7 +395,9 @@ func Test_DataSource(t *testing.T) { 20001: nil, 30001: nil, } - err = ds.Observe(ctx, vals, opts2) + ctx2, cancel := context.WithTimeout(mainCtx, observationTimeout) + defer cancel() + err = ds.Observe(ctx2, vals, opts) require.NoError(t, err) // Should still have original values from cache @@ -328,57 +406,57 @@ func Test_DataSource(t *testing.T) { 20001: llo.ToDecimal(decimal.NewFromInt(40602)), 30001: nil, }, vals) - - // Verify cache metrics - assert.InEpsilon(t, float64(1), testutil.ToFloat64( - promCacheHitCount.WithLabelValues("10001")), 0.0001) - assert.InEpsilon(t, float64(1), testutil.ToFloat64( - promCacheHitCount.WithLabelValues("20001")), 0.0001) - assert.InEpsilon(t, float64(1), testutil.ToFloat64( - promCacheMissCount.WithLabelValues("10001", "notFound")), 0.0001) - assert.InEpsilon(t, float64(1), testutil.ToFloat64( - promCacheMissCount.WithLabelValues("20001", "notFound")), 0.0001) }) t.Run("refreshes cache after expiration", func(t *testing.T) { - ds := newDataSource(lggr, reg, telem.NullTelemeter, - NewCache(time.Millisecond*500, time.Minute)) + reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, telem.NullTelemeter) // First observation - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) - vals := llo.StreamValues{1: nil} + reg.mu.Lock() + reg.pipelines[50002] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) + reg.mu.Unlock() + vals := llo.StreamValues{50002: nil} - opts2 := &mockOpts{configDigest: ocr2types.ConfigDigest{6, 5, 9}} - err := ds.Observe(ctx, vals, opts2) + ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) + defer cancel() + err := ds.Observe(ctx, vals, opts) require.NoError(t, err) - // Wait for cache to expire - time.Sleep(501 * time.Millisecond) - // Change pipeline result - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(200), nil) + reg.mu.Lock() + reg.pipelines[50002] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(200), nil) + reg.mu.Unlock() + + // Wait for cache to expire + time.Sleep(observationTimeout * 3) // Second observation should use new value - vals = llo.StreamValues{1: nil} - err = ds.Observe(ctx, vals, opts) + vals = llo.StreamValues{50002: nil} + ctx2, cancel := context.WithTimeout(mainCtx, observationTimeout*5) + defer cancel() + err = ds.Observe(ctx2, vals, opts) require.NoError(t, err) - assert.Equal(t, llo.StreamValues{1: llo.ToDecimal(decimal.NewFromInt(200))}, vals) + assert.Equal(t, llo.StreamValues{50002: llo.ToDecimal(decimal.NewFromInt(200))}, vals) }) t.Run("handles concurrent cache access", func(t *testing.T) { // Create a new data source - ds := newDataSource(lggr, reg, telem.NullTelemeter, - NewCache(time.Millisecond*500, time.Minute)) + reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, telem.NullTelemeter) // Set up pipeline to return different values + reg.mu.Lock() reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) + reg.mu.Unlock() // First observation to cache vals := llo.StreamValues{1: nil} - opts2 := &mockOpts{configDigest: ocr2types.ConfigDigest{6, 5, 6}} - err := ds.Observe(ctx, vals, opts2) + ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) + defer cancel() + err := ds.Observe(ctx, vals, opts) require.NoError(t, err) // Run multiple observations concurrently @@ -388,7 +466,7 @@ func Test_DataSource(t *testing.T) { go func() { defer wg.Done() vals := llo.StreamValues{1: nil} - err := ds.Observe(ctx, vals, opts2) + err := ds.Observe(ctx, vals, opts) assert.NoError(t, err) assert.Equal(t, llo.StreamValues{1: llo.ToDecimal(decimal.NewFromInt(100))}, vals) }() @@ -400,27 +478,38 @@ func Test_DataSource(t *testing.T) { }) t.Run("handles cache errors gracefully", func(t *testing.T) { - ds := newDataSource(lggr, reg, telem.NullTelemeter, - NewCache(time.Millisecond*500, time.Minute)) + reg := &mockRegistry{pipelines: make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, telem.NullTelemeter) // First observation with error + reg.mu.Lock() reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, nil, errors.New("pipeline error")) + reg.mu.Unlock() vals := makeStreamValues() - opts2 := &mockOpts{configDigest: ocr2types.ConfigDigest{6, 5, 2}} - err := ds.Observe(ctx, vals, opts2) - require.NoError(t, err) // Observe returns nil error even if some streams fail + ctx, cancel := context.WithTimeout(mainCtx, observationTimeout) + defer cancel() - time.Sleep(501 * time.Millisecond) + err := ds.Observe(ctx, vals, opts) + require.NoError(t, err) // Observe returns nil error even if some streams fail // Second observation should try again (not use cache for error case) + reg.mu.Lock() reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) + reg.mu.Unlock() + time.Sleep(observationTimeout * 3) + vals = llo.StreamValues{1: nil} - err = ds.Observe(ctx, vals, opts2) + ctx2, cancel := context.WithTimeout(mainCtx, observationTimeout*5) + defer cancel() + err = ds.Observe(ctx2, vals, opts) require.NoError(t, err) assert.Equal(t, llo.StreamValues{1: llo.ToDecimal(decimal.NewFromInt(100))}, vals) }) }) + + promCacheHitCount.Reset() + promCacheMissCount.Reset() } func BenchmarkObserve(b *testing.B) { @@ -486,7 +575,7 @@ result3 -> result3_parse -> multiply3; require.NoError(b, err) } - ds := newDataSource(lggr, r, telem.NullTelemeter, nil) + ds := newDataSource(lggr, r, telem.NullTelemeter) vals := make(map[llotypes.StreamID]llo.StreamValue) for i := uint32(0); i < 4*n; i++ { vals[i] = nil @@ -495,4 +584,5 @@ result3 -> result3_parse -> multiply3; b.ResetTimer() err := ds.Observe(ctx, vals, opts) require.NoError(b, err) + ds.Close() } diff --git a/deployment/go.mod b/deployment/go.mod index 646f6a2a9c5..586482d9771 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -405,7 +405,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/chainlink-automation v0.8.1 // indirect github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.5 // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.6 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135 // indirect github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250818175541-3389ac08a563 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250717121125-2350c82883e2 // indirect diff --git a/deployment/go.sum b/deployment/go.sum index fba6a81a885..151a631da3c 100644 --- a/deployment/go.sum +++ b/deployment/go.sum @@ -1342,8 +1342,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 h1:INTd6uKc/ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= -github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= +github.com/smartcontractkit/chainlink-data-streams v0.1.6 h1:B3cwmJrVYoJVAjPOyQWTNaGD+V30HI1vFHhC2dQpWDo= +github.com/smartcontractkit/chainlink-data-streams v0.1.6/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-deployments-framework v0.56.0 h1:VkzslEC/a7Ze5qqLdX1ZNL7ug0TwVd5w3hL5jA/DvWE= github.com/smartcontractkit/chainlink-deployments-framework v0.56.0/go.mod h1:ObH5HJ4yXzTmQLc6Af+ufrTVcQ+ocasHJ0YBZjw5ZCM= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251017190323-e749d4a05491 h1:X+c+LdCI9Dc4EfpitByVnGODtwoV+rAdFKCG0y4gAag= diff --git a/go.mod b/go.mod index ec63e12e73e..7ed6c8ae85d 100644 --- a/go.mod +++ b/go.mod @@ -85,7 +85,7 @@ require ( github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250912190424-fd2e35d7deb5 github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20250912190424-fd2e35d7deb5 github.com/smartcontractkit/chainlink-common v0.9.6-0.20251016213956-9a6afcd1532a - github.com/smartcontractkit/chainlink-data-streams v0.1.5 + github.com/smartcontractkit/chainlink-data-streams v0.1.6 github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251017190323-e749d4a05491 github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251015115541-729ba0b2b1c1 github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135 diff --git a/go.sum b/go.sum index 72516291b38..74c995eca15 100644 --- a/go.sum +++ b/go.sum @@ -1119,8 +1119,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 h1:INTd6uKc/ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= -github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= +github.com/smartcontractkit/chainlink-data-streams v0.1.6 h1:B3cwmJrVYoJVAjPOyQWTNaGD+V30HI1vFHhC2dQpWDo= +github.com/smartcontractkit/chainlink-data-streams v0.1.6/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251017190323-e749d4a05491 h1:X+c+LdCI9Dc4EfpitByVnGODtwoV+rAdFKCG0y4gAag= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251017190323-e749d4a05491/go.mod h1:4mn3j3dRgz2NXyVkvKgn6DOvrVvdN5tJ3K0Ka8QYLPM= github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251015115541-729ba0b2b1c1 h1:U0/wYRzESvhGnzbQy2Q/18gTP2UGp1DtZ19TL43NP5k= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index d5415f42ec0..16b6df530c9 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -486,7 +486,7 @@ require ( github.com/smartcontractkit/ccip-contract-examples/chains/evm v0.0.0-20250826190403-aed7f5f33cde // indirect github.com/smartcontractkit/ccip-owner-contracts v0.1.0 // indirect github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.5 // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.6 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135 // indirect github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250818175541-3389ac08a563 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250717121125-2350c82883e2 // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index b3de9d47b4d..9776eb8ab06 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1586,8 +1586,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 h1:INTd6uKc/ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= -github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= +github.com/smartcontractkit/chainlink-data-streams v0.1.6 h1:B3cwmJrVYoJVAjPOyQWTNaGD+V30HI1vFHhC2dQpWDo= +github.com/smartcontractkit/chainlink-data-streams v0.1.6/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-deployments-framework v0.56.0 h1:VkzslEC/a7Ze5qqLdX1ZNL7ug0TwVd5w3hL5jA/DvWE= github.com/smartcontractkit/chainlink-deployments-framework v0.56.0/go.mod h1:ObH5HJ4yXzTmQLc6Af+ufrTVcQ+ocasHJ0YBZjw5ZCM= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251017190323-e749d4a05491 h1:X+c+LdCI9Dc4EfpitByVnGODtwoV+rAdFKCG0y4gAag= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 69f2cf644a8..c80b1224695 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -472,7 +472,7 @@ require ( github.com/smartcontractkit/ccip-owner-contracts v0.1.0 // indirect github.com/smartcontractkit/chainlink-automation v0.8.1 // indirect github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.5 // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.6 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135 // indirect github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250818175541-3389ac08a563 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250717121125-2350c82883e2 // indirect diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 0d05b6c3da9..43ef00b30bd 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1565,8 +1565,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 h1:INTd6uKc/ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= -github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= +github.com/smartcontractkit/chainlink-data-streams v0.1.6 h1:B3cwmJrVYoJVAjPOyQWTNaGD+V30HI1vFHhC2dQpWDo= +github.com/smartcontractkit/chainlink-data-streams v0.1.6/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-deployments-framework v0.56.0 h1:VkzslEC/a7Ze5qqLdX1ZNL7ug0TwVd5w3hL5jA/DvWE= github.com/smartcontractkit/chainlink-deployments-framework v0.56.0/go.mod h1:ObH5HJ4yXzTmQLc6Af+ufrTVcQ+ocasHJ0YBZjw5ZCM= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251017190323-e749d4a05491 h1:X+c+LdCI9Dc4EfpitByVnGODtwoV+rAdFKCG0y4gAag= diff --git a/plugins/plugins.public.yaml b/plugins/plugins.public.yaml index 4957a71185e..8b352d5d097 100644 --- a/plugins/plugins.public.yaml +++ b/plugins/plugins.public.yaml @@ -45,7 +45,7 @@ plugins: streams: - moduleURI: "github.com/smartcontractkit/chainlink-data-streams" - gitRef: "v0.1.5" + gitRef: "v0.1.6" installPath: "./mercury/cmd/chainlink-mercury" ton: diff --git a/system-tests/lib/go.mod b/system-tests/lib/go.mod index 5d5743e181a..e0febe75aba 100644 --- a/system-tests/lib/go.mod +++ b/system-tests/lib/go.mod @@ -450,7 +450,7 @@ require ( github.com/smartcontractkit/chainlink-ccip v0.1.1-solana.0.20251009203201-900123a5c46a // indirect github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20250912190424-fd2e35d7deb5 // indirect github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.5 // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.6 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135 // indirect github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250818175541-3389ac08a563 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250717121125-2350c82883e2 // indirect diff --git a/system-tests/lib/go.sum b/system-tests/lib/go.sum index 0cb261ad784..feea464e6ee 100644 --- a/system-tests/lib/go.sum +++ b/system-tests/lib/go.sum @@ -1583,8 +1583,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 h1:INTd6uKc/ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= -github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= +github.com/smartcontractkit/chainlink-data-streams v0.1.6 h1:B3cwmJrVYoJVAjPOyQWTNaGD+V30HI1vFHhC2dQpWDo= +github.com/smartcontractkit/chainlink-data-streams v0.1.6/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-deployments-framework v0.56.0 h1:VkzslEC/a7Ze5qqLdX1ZNL7ug0TwVd5w3hL5jA/DvWE= github.com/smartcontractkit/chainlink-deployments-framework v0.56.0/go.mod h1:ObH5HJ4yXzTmQLc6Af+ufrTVcQ+ocasHJ0YBZjw5ZCM= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251017190323-e749d4a05491 h1:X+c+LdCI9Dc4EfpitByVnGODtwoV+rAdFKCG0y4gAag= diff --git a/system-tests/tests/go.mod b/system-tests/tests/go.mod index 40df425461e..1edafc225f1 100644 --- a/system-tests/tests/go.mod +++ b/system-tests/tests/go.mod @@ -45,7 +45,7 @@ require ( github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.73 github.com/smartcontractkit/chainlink-common v0.9.6-0.20251016213956-9a6afcd1532a - github.com/smartcontractkit/chainlink-data-streams v0.1.5 + github.com/smartcontractkit/chainlink-data-streams v0.1.6 github.com/smartcontractkit/chainlink-deployments-framework v0.56.0 github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251015115541-729ba0b2b1c1 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20251008161434-22d9bd439bba diff --git a/system-tests/tests/go.sum b/system-tests/tests/go.sum index 0d082b31c04..dfbc2949a87 100644 --- a/system-tests/tests/go.sum +++ b/system-tests/tests/go.sum @@ -1786,8 +1786,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 h1:INTd6uKc/ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= -github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= +github.com/smartcontractkit/chainlink-data-streams v0.1.6 h1:B3cwmJrVYoJVAjPOyQWTNaGD+V30HI1vFHhC2dQpWDo= +github.com/smartcontractkit/chainlink-data-streams v0.1.6/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-deployments-framework v0.56.0 h1:VkzslEC/a7Ze5qqLdX1ZNL7ug0TwVd5w3hL5jA/DvWE= github.com/smartcontractkit/chainlink-deployments-framework v0.56.0/go.mod h1:ObH5HJ4yXzTmQLc6Af+ufrTVcQ+ocasHJ0YBZjw5ZCM= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251017190323-e749d4a05491 h1:X+c+LdCI9Dc4EfpitByVnGODtwoV+rAdFKCG0y4gAag=