diff --git a/.changeset/afraid-doors-fold.md b/.changeset/afraid-doors-fold.md new file mode 100644 index 00000000000..cde121eb3e1 --- /dev/null +++ b/.changeset/afraid-doors-fold.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#changed LLO's observations now run in a loop, so their cache is always warm. diff --git a/core/services/llo/delegate.go b/core/services/llo/delegate.go index cd1b7dcdc9b..37308d86d37 100644 --- a/core/services/llo/delegate.go +++ b/core/services/llo/delegate.go @@ -207,6 +207,9 @@ func (d *delegate) Close() error { for _, oracle := range d.oracles { merr = errors.Join(merr, oracle.Close()) } + if closer, ok := d.ds.(Closer); ok { + merr = errors.Join(merr, closer.Close()) + } merr = errors.Join(merr, d.telem.Close()) return merr }) diff --git a/core/services/llo/mercurytransmitter/transmitter.go b/core/services/llo/mercurytransmitter/transmitter.go index 71de79681b1..077ac2f3e72 100644 --- a/core/services/llo/mercurytransmitter/transmitter.go +++ b/core/services/llo/mercurytransmitter/transmitter.go @@ -25,7 +25,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/services/llo/grpc" - "github.com/smartcontractkit/chainlink/v2/core/services/llo/observation" ) const ( @@ -269,7 +268,6 @@ func (mt *transmitter) Transmit( err = fmt.Errorf("failed to add transmission to commit channel: %w", ctx.Err()) } } - observation.GetCache(digest).SetLastTransmissionSeqNr(seqNr) }) if !ok { diff --git a/core/services/llo/observation/cache.go b/core/services/llo/observation/cache.go index 772258a33f3..80dc4147484 100644 --- a/core/services/llo/observation/cache.go +++ b/core/services/llo/observation/cache.go @@ -4,28 +4,23 @@ import ( "runtime" "strconv" "sync" - "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" ) var ( - registryMu = sync.Mutex{} - registry = map[ocr2types.ConfigDigest]*Cache{} - promCacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "llo", Subsystem: "datasource", Name: "cache_hit_count", Help: "Number of local observation cache hits", }, - []string{"configDigest", "streamID"}, + []string{"streamID"}, ) promCacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "llo", @@ -33,23 +28,10 @@ var ( Name: "cache_miss_count", Help: "Number of local observation cache misses", }, - []string{"configDigest", "streamID", "reason"}, + []string{"streamID", "reason"}, ) ) -func GetCache(configDigest ocr2types.ConfigDigest) *Cache { - registryMu.Lock() - defer registryMu.Unlock() - - cache, ok := registry[configDigest] - if !ok { - cache = NewCache(configDigest, 500*time.Millisecond, time.Minute) - registry[configDigest] = cache - } - - return cache -} - // Cache of stream values. // It maintains a cache of stream values fetched from adapters until the last // transmission sequence number is greater or equal the sequence number at which @@ -58,20 +40,16 @@ func GetCache(configDigest ocr2types.ConfigDigest) *Cache { // The cache is cleaned up periodically to remove decommissioned stream values // if the provided cleanupInterval is greater than 0. type Cache struct { - mu sync.RWMutex - - configDigestStr string + mu sync.RWMutex values map[llotypes.StreamID]item maxAge time.Duration cleanupInterval time.Duration - lastTransmissionSeqNr atomic.Uint64 - closeChan chan struct{} + closeChan chan struct{} } type item struct { value llo.StreamValue - seqNr uint64 createdAt time.Time } @@ -79,9 +57,8 @@ type item struct { // // maxAge is the maximum age of a stream value to keep in the cache. // cleanupInterval is the interval to clean up the cache. -func NewCache(configDigest ocr2types.ConfigDigest, maxAge time.Duration, cleanupInterval time.Duration) *Cache { +func NewCache(maxAge time.Duration, cleanupInterval time.Duration) *Cache { c := &Cache{ - configDigestStr: configDigest.Hex(), values: make(map[llotypes.StreamID]item), maxAge: maxAge, cleanupInterval: cleanupInterval, @@ -110,16 +87,11 @@ func NewCache(configDigest ocr2types.ConfigDigest, maxAge time.Duration, cleanup return c } -// SetLastTransmissionSeqNr sets the last transmission sequence number. -func (c *Cache) SetLastTransmissionSeqNr(seqNr uint64) { - c.lastTransmissionSeqNr.Store(seqNr) -} - // Add adds a stream value to the cache. -func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue, seqNr uint64) { +func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue) { c.mu.Lock() defer c.mu.Unlock() - c.values[id] = item{value: value, seqNr: seqNr, createdAt: time.Now()} + c.values[id] = item{value: value, createdAt: time.Now()} } func (c *Cache) Get(id llotypes.StreamID) (llo.StreamValue, bool) { @@ -129,21 +101,16 @@ func (c *Cache) Get(id llotypes.StreamID) (llo.StreamValue, bool) { label := strconv.FormatUint(uint64(id), 10) item, ok := c.values[id] if !ok { - promCacheMissCount.WithLabelValues(c.configDigestStr, label, "notFound").Inc() - return nil, false - } - - if item.seqNr <= c.lastTransmissionSeqNr.Load() { - promCacheMissCount.WithLabelValues(c.configDigestStr, label, "seqNr").Inc() + promCacheMissCount.WithLabelValues(label, "notFound").Inc() return nil, false } if time.Since(item.createdAt) >= c.maxAge { - promCacheMissCount.WithLabelValues(c.configDigestStr, label, "maxAge").Inc() + promCacheMissCount.WithLabelValues(label, "maxAge").Inc() return nil, false } - promCacheHitCount.WithLabelValues(c.configDigestStr, label).Inc() + promCacheHitCount.WithLabelValues(label).Inc() return item.value, true } @@ -151,9 +118,8 @@ func (c *Cache) cleanup() { c.mu.Lock() defer c.mu.Unlock() - lastTransmissionSeqNr := c.lastTransmissionSeqNr.Load() for id, item := range c.values { - if item.seqNr <= lastTransmissionSeqNr || time.Since(item.createdAt) >= c.maxAge { + if time.Since(item.createdAt) >= c.maxAge { delete(c.values, id) } } diff --git a/core/services/llo/observation/cache_test.go b/core/services/llo/observation/cache_test.go index 454c959a962..4f8242cf8e2 100644 --- a/core/services/llo/observation/cache_test.go +++ b/core/services/llo/observation/cache_test.go @@ -10,8 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types" - llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" ) @@ -72,7 +70,7 @@ func TestNewCache(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cache := NewCache(ocr2types.ConfigDigest{}, tt.maxAge, tt.cleanupInterval) + cache := NewCache(tt.maxAge, tt.cleanupInterval) require.NotNil(t, cache) assert.Equal(t, tt.maxAge, cache.maxAge) assert.Equal(t, tt.cleanupInterval, cache.cleanupInterval) @@ -87,7 +85,6 @@ func TestCache_Add_Get(t *testing.T) { name string streamID llotypes.StreamID value llo.StreamValue - seqNr uint64 maxAge time.Duration wantValue llo.StreamValue wantFound bool @@ -97,7 +94,6 @@ func TestCache_Add_Get(t *testing.T) { name: "get existing value", streamID: 1, value: &mockStreamValue{value: []byte{42}}, - seqNr: 10, maxAge: time.Second, wantValue: &mockStreamValue{value: []byte{42}}, wantFound: true, @@ -106,28 +102,14 @@ func TestCache_Add_Get(t *testing.T) { name: "get non-existent value", streamID: 1, value: &mockStreamValue{value: []byte{42}}, - seqNr: 10, maxAge: time.Second, wantValue: nil, wantFound: false, }, - { - name: "get expired by sequence number", - streamID: 1, - value: &mockStreamValue{value: []byte{42}}, - seqNr: 5, - maxAge: time.Second, - wantValue: nil, - wantFound: false, - beforeGet: func(cache *Cache) { - cache.SetLastTransmissionSeqNr(10) - }, - }, { name: "get expired by age", streamID: 1, value: &mockStreamValue{value: []byte{42}}, - seqNr: 10, maxAge: time.Nanosecond * 100, wantValue: nil, wantFound: false, @@ -139,10 +121,10 @@ func TestCache_Add_Get(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cache := NewCache(ocr2types.ConfigDigest{}, tt.maxAge, 0) + cache := NewCache(tt.maxAge, 0) if tt.wantFound { - cache.Add(tt.streamID, tt.value, tt.seqNr) + cache.Add(tt.streamID, tt.value) } if tt.beforeGet != nil { @@ -159,11 +141,11 @@ func TestCache_Add_Get(t *testing.T) { } func TestCache_Cleanup(t *testing.T) { - cache := NewCache(ocr2types.ConfigDigest{}, time.Nanosecond*100, time.Millisecond) + cache := NewCache(time.Nanosecond*100, time.Millisecond) streamID := llotypes.StreamID(1) value := &mockStreamValue{value: []byte{42}} - cache.Add(streamID, value, 10) + cache.Add(streamID, value) time.Sleep(time.Millisecond * 2) gotValue, gotFound := cache.Get(streamID) @@ -172,7 +154,7 @@ func TestCache_Cleanup(t *testing.T) { } func TestCache_ConcurrentAccess(t *testing.T) { - cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0) + cache := NewCache(time.Second, 0) const numGoroutines = 10 const numOperations = uint32(1000) @@ -185,7 +167,7 @@ func TestCache_ConcurrentAccess(t *testing.T) { defer wg.Done() for j := uint32(0); j < numOperations; j++ { streamID := id*numOperations + j - cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, 1) + cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}) } }(i) } @@ -203,7 +185,7 @@ func TestCache_ConcurrentAccess(t *testing.T) { } func TestCache_ConcurrentReadWrite(t *testing.T) { - cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0) + cache := NewCache(time.Second, 0) const numGoroutines = 10 const numOperations = uint32(1000) @@ -216,7 +198,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) { defer wg.Done() for j := uint32(0); j < numOperations; j++ { streamID := id*numOperations + j - cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, uint64(j)) + cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}) } }(i) } @@ -236,7 +218,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) { } func TestCache_ConcurrentAddGet(t *testing.T) { - cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0) + cache := NewCache(time.Second, 0) const numGoroutines = 10 const numOperations = uint32(1000) @@ -249,7 +231,7 @@ func TestCache_ConcurrentAddGet(t *testing.T) { defer wg.Done() for j := uint32(0); j < numOperations; j++ { streamID := id*numOperations + j - cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}, 1) + cache.Add(streamID, &mockStreamValue{value: []byte{byte(id)}}) } }(i) } diff --git a/core/services/llo/observation/data_source.go b/core/services/llo/observation/data_source.go index b77effd2a4d..ff1c4a1e899 100644 --- a/core/services/llo/observation/data_source.go +++ b/core/services/llo/observation/data_source.go @@ -8,13 +8,14 @@ import ( "sort" "strconv" "sync" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "golang.org/x/exp/maps" + "github.com/smartcontractkit/libocr/offchainreporting2/types" - ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" @@ -41,6 +42,17 @@ var ( }, []string{"streamID"}, ) + promObservationLoopDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "llo", + Subsystem: "datasource", + Name: "observation_loop_duration_ms", + Help: "Duration of the observation loop", + Buckets: []float64{ + 10, 25, 50, 100, 250, 500, 750, 1000, + }, + }, + []string{"configDigest"}, + ) ) type ErrObservationFailed struct { @@ -73,10 +85,15 @@ func (e *ErrObservationFailed) Unwrap() error { var _ llo.DataSource = &dataSource{} type dataSource struct { - lggr logger.Logger - registry Registry - t Telemeter - shouldCache bool + lggr logger.Logger + registry Registry + t Telemeter + cache *Cache + observationLoopStarted atomic.Bool + observationLoopCloseCh services.StopChan + + configDigestToStreamMu sync.Mutex + configDigestToStream map[types.ConfigDigest]observableStreamValues } func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter) llo.DataSource { @@ -85,73 +102,135 @@ func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter) llo.DataS func newDataSource(lggr logger.Logger, registry Registry, t Telemeter, shouldCache bool) *dataSource { return &dataSource{ - lggr: logger.Named(lggr, "DataSource"), - registry: registry, - t: t, - shouldCache: shouldCache, + lggr: logger.Named(lggr, "DataSource"), + registry: registry, + t: t, + cache: NewCache(500*time.Millisecond, time.Minute), + configDigestToStream: make(map[types.ConfigDigest]observableStreamValues), + observationLoopCloseCh: make(chan struct{}), } } // Observe looks up all streams in the registry and populates a map of stream ID => value func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) error { - now := time.Now() - lggr := logger.With(d.lggr, "observationTimestamp", opts.ObservationTimestamp(), "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) + // Observation loop logic + { + // Update the list of streams to observe for this config digest and set the timeout + d.configDigestToStreamMu.Lock() + // StreamValues needs a copy to avoid concurrent access + d.setObservableStreams(ctx, streamValues, opts) + d.configDigestToStreamMu.Unlock() + + if !d.observationLoopStarted.Load() { + loopStartedCh := make(chan struct{}) + go d.startObservationLoop(loopStartedCh) + <-loopStartedCh + } + } - if opts.VerboseLogging() { - streamIDs := make([]streams.StreamID, 0, len(streamValues)) - for streamID := range streamValues { - streamIDs = append(streamIDs, streamID) + // Fetch the cached observations for all streams. + for streamID := range streamValues { + val := d.fromCache(streamID) + if val != nil { + streamValues[streamID] = val } - sort.Slice(streamIDs, func(i, j int) bool { return streamIDs[i] < streamIDs[j] }) - lggr = logger.With(lggr, "streamIDs", streamIDs) - lggr.Debugw("Observing streams") } - var wg sync.WaitGroup - wg.Add(len(streamValues)) + return nil +} - var mu sync.Mutex - successfulStreamIDs := make([]streams.StreamID, 0, len(streamValues)) - var errs []ErrObservationFailed +func (d *dataSource) setObservableStreams(ctx context.Context, streamValues llo.StreamValues, opts llo.DSOpts) { + values := make(llo.StreamValues, len(streamValues)) + for streamID := range streamValues { + values[streamID] = nil + } - // oc only lives for the duration of this Observe call - oc := NewObservationContext(lggr, d.registry, d.t) + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(100 * time.Millisecond) + } - // Telemetry - { - // Size needs to accommodate the max number of telemetry events that could be generated - // Standard case might be about 3 bridge requests per spec and one stream<=>spec - // Overallocate for safety (to avoid dropping packets) - telemCh := d.t.MakeObservationScopedTelemetryCh(opts, 10*len(streamValues)) - if telemCh != nil { - if d.t.CaptureEATelemetry() { - ctx = pipeline.WithTelemetryCh(ctx, telemCh) + streams := make(llo.StreamValues) + for streamID := range values { + streams[streamID] = values[streamID] + } + + d.configDigestToStream[opts.ConfigDigest()] = observableStreamValues{ + opts: opts, + streamValues: streams, + observationInterval: time.Until(deadline), + } +} + +// startObservationLoop continuously makes observations for the streams in d.configDigestToStream and stores those in +// the cache. It does not check for cached versions, it always calculates fresh values. +// +// NOTE: This method needs to be run in a goroutine. +func (d *dataSource) startObservationLoop(loopStartedCh chan struct{}) { + var elapsed time.Duration + + stopChanCtx, stopChanCancel := d.observationLoopCloseCh.NewCtx() + defer stopChanCancel() + for { + if stopChanCtx.Err() != nil { + return + } + + loopStart := time.Now() + opts, streamValues, observationInterval := d.getObservableStreams() + + ctx, cancel := context.WithTimeout(stopChanCtx, observationInterval) + lggr := logger.With(d.lggr, "observationTimestamp", opts.ObservationTimestamp(), "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) + + if opts.VerboseLogging() { + streamIDs := make([]streams.StreamID, 0, len(streamValues)) + for streamID := range streamValues { + streamIDs = append(streamIDs, streamID) } - if d.t.CaptureObservationTelemetry() { - ctx = WithObservationTelemetryCh(ctx, telemCh) + sort.Slice(streamIDs, func(i, j int) bool { return streamIDs[i] < streamIDs[j] }) + lggr = logger.With(lggr, "streamIDs", streamIDs) + lggr.Debugw("Observing streams") + } + + // Telemetry + var telemCh chan<- interface{} + { + // Size needs to accommodate the max number of telemetry events that could be generated + // Standard case might be about 3 bridge requests per spec and one stream<=>spec + // Overallocate for safety (to avoid dropping packets) + telemCh = d.t.MakeObservationScopedTelemetryCh(opts, 10*len(streamValues)) + if telemCh != nil { + if d.t.CaptureEATelemetry() { + ctx = pipeline.WithTelemetryCh(ctx, telemCh) + } + if d.t.CaptureObservationTelemetry() { + ctx = WithObservationTelemetryCh(ctx, telemCh) + } } - // After all Observations have returned, nothing else will be sent to the - // telemetry channel, so it can safely be closed - defer close(telemCh) } - } - // Observe all streams concurrently - for _, streamID := range maps.Keys(streamValues) { - go func(streamID llotypes.StreamID) { - defer wg.Done() - var val llo.StreamValue - var err error + var mu sync.Mutex + successfulStreamIDs := make([]streams.StreamID, 0, len(streamValues)) + var errs []ErrObservationFailed + + var wg sync.WaitGroup + wg.Add(len(streamValues)) - // check for valid cached value before observing - if val = d.fromCache(opts.ConfigDigest(), streamID); val == nil { - // no valid cached value, observe the stream + oc := NewObservationContext(lggr, d.registry, d.t) + + for streamID := range streamValues { + go func(streamID llotypes.StreamID) { + defer wg.Done() + var val llo.StreamValue + var err error + + // Observe the stream if val, err = oc.Observe(ctx, streamID, opts); err != nil { - strmIDStr := strconv.FormatUint(uint64(streamID), 10) + streamIDStr := strconv.FormatUint(uint64(streamID), 10) if errors.As(err, &MissingStreamError{}) { - promMissingStreamCount.WithLabelValues(strmIDStr).Inc() + promMissingStreamCount.WithLabelValues(streamIDStr).Inc() } - promObservationErrorCount.WithLabelValues(strmIDStr).Inc() + promObservationErrorCount.WithLabelValues(streamIDStr).Inc() mu.Lock() errs = append(errs, ErrObservationFailed{inner: err, streamID: streamID, reason: "failed to observe stream"}) mu.Unlock() @@ -159,65 +238,133 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, } // cache the observed value - d.toCache(opts.ConfigDigest(), streamID, val, opts.OutCtx().SeqNr) - } + d.toCache(streamID, val) + }(streamID) + } - mu.Lock() - defer mu.Unlock() + wg.Wait() + elapsed = time.Since(loopStart) - successfulStreamIDs = append(successfulStreamIDs, streamID) - if val != nil { - streamValues[streamID] = val - } - }(streamID) - } + // Notify the caller that we've completed our first round of observations. + if !d.observationLoopStarted.Load() { + d.observationLoopStarted.Store(true) + close(loopStartedCh) + } - // Wait for all Observations to complete - wg.Wait() + // After all Observations have returned, nothing else will be sent to the + // telemetry channel, so it can safely be closed + if telemCh != nil { + close(telemCh) + } - // Only log on errors or if VerboseLogging is turned on - if len(errs) > 0 || opts.VerboseLogging() { - elapsed := time.Since(now) + // Only log on errors or if VerboseLogging is turned on + if len(errs) > 0 || opts.VerboseLogging() { + slices.Sort(successfulStreamIDs) + sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID }) - slices.Sort(successfulStreamIDs) - sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID }) + failedStreamIDs := make([]streams.StreamID, len(errs)) + errStrs := make([]string, len(errs)) + for i, e := range errs { + errStrs[i] = e.String() + failedStreamIDs[i] = e.streamID + } - failedStreamIDs := make([]streams.StreamID, len(errs)) - errStrs := make([]string, len(errs)) - for i, e := range errs { - errStrs[i] = e.String() - failedStreamIDs[i] = e.streamID - } + lggr = logger.With(lggr, "elapsed", elapsed, "nSuccessfulStreams", + len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "errs", errStrs) - lggr = logger.With(lggr, "elapsed", elapsed, "nSuccessfulStreams", - len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "errs", errStrs) + if opts.VerboseLogging() { + lggr = logger.With(lggr, "streamValues", streamValues) + } - if opts.VerboseLogging() { - lggr = logger.With(lggr, "streamValues", streamValues) + if len(errs) == 0 && opts.VerboseLogging() { + lggr.Infow("Observation succeeded for all streamsToObserve") + } else if len(errs) > 0 { + lggr.Warnw("Observation failed for streamsToObserve") + } } - if len(errs) == 0 && opts.VerboseLogging() { - lggr.Infow("Observation succeeded for all streams") - } else if len(errs) > 0 { - lggr.Warnw("Observation failed for streams") + promObservationLoopDuration.WithLabelValues( + opts.ConfigDigest().String()).Observe(float64(elapsed.Milliseconds())) + + if elapsed < observationInterval { + lggr.Debugw("Observation loop sleep", "elapsed_ms", elapsed.Milliseconds(), + "interval_ms", observationInterval.Milliseconds(), "sleep_ms", observationInterval-elapsed) + time.Sleep(observationInterval - elapsed) + } else { + lggr.Debugw("Observation loop", "elapsed_ms", elapsed.Milliseconds(), "interval_ms", observationInterval.Milliseconds()) } + + // Cancel the context, so the linter doesn't complain. + cancel() } +} + +func (d *dataSource) Close() error { + close(d.observationLoopCloseCh) + d.observationLoopStarted.Store(false) return nil } -func (d *dataSource) fromCache(configDigest ocrtypes.ConfigDigest, streamID llotypes.StreamID) llo.StreamValue { - if d.shouldCache { - if streamValue, found := GetCache(configDigest).Get(streamID); found && streamValue != nil { - return streamValue - } +func (d *dataSource) fromCache(streamID llotypes.StreamID) llo.StreamValue { + if streamValue, found := d.cache.Get(streamID); found && streamValue != nil { + return streamValue } return nil } -func (d *dataSource) toCache(configDigest ocrtypes.ConfigDigest, streamID llotypes.StreamID, val llo.StreamValue, seqNr uint64) { - if d.shouldCache && val != nil { - // Use the current sequence number as the cache key - GetCache(configDigest).Add(streamID, val, seqNr) +func (d *dataSource) toCache(streamID llotypes.StreamID, val llo.StreamValue) { + if val != nil { + d.cache.Add(streamID, val) + } +} + +type observableStreamValues struct { + opts llo.DSOpts + streamValues llo.StreamValues + observationInterval time.Duration +} + +func (o *observableStreamValues) IsActive() (bool, error) { + outCtx := o.opts.OutCtx() + outcome, err := o.opts.OutcomeCodec().Decode(outCtx.PreviousOutcome) + if err != nil { + return false, fmt.Errorf("observable stream value: failed to decode outcome: %w", err) } + + if outcome.LifeCycleStage == llo.LifeCycleStageProduction { + return true, nil + } + + return false, nil +} + +// getObservableStreams returns the active plugin data source options, the streams to observe and the observation interval +// the observation interval is the maximum time we can spend observing streams. We ensure that we don't exceed this time and +// we wait for the remaining time in the observation loop. +func (d *dataSource) getObservableStreams() (llo.DSOpts, llo.StreamValues, time.Duration) { + d.configDigestToStreamMu.Lock() + streamsToObserve := make([]observableStreamValues, 0, len(d.configDigestToStream)) + for _, vals := range d.configDigestToStream { + streamsToObserve = append(streamsToObserve, vals) + } + d.configDigestToStreamMu.Unlock() + + // deduplicate streams and get the active ocr instance options + for _, vals := range streamsToObserve { + active, err := vals.IsActive() + if !active { + continue + } + + if err != nil { + d.lggr.Errorw("getObservableStreams: failed to check if OCR instance is active", "error", err) + continue + } + + return vals.opts, vals.streamValues, vals.observationInterval + } + + d.lggr.Errorw("getObservableStreams: no active OCR instance found") + return nil, nil, 0 } diff --git a/core/services/llo/observation/data_source_test.go b/core/services/llo/observation/data_source_test.go index 2de22fba926..341eb49e2d7 100644 --- a/core/services/llo/observation/data_source_test.go +++ b/core/services/llo/observation/data_source_test.go @@ -2,7 +2,6 @@ package observation import ( "context" - "crypto/rand" "encoding/hex" "errors" "fmt" @@ -13,7 +12,6 @@ import ( "testing" "time" - "github.com/prometheus/client_golang/prometheus/testutil" "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -24,6 +22,7 @@ import ( llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" + "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" clhttptest "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/httptest" @@ -66,17 +65,24 @@ func (m *mockRegistry) Get(streamID streams.StreamID) (p streams.Pipeline, exist func makePipelineWithSingleResult[T any](runID int64, res T, err error) *mockPipeline { return &mockPipeline{ run: &pipeline.Run{ID: runID}, - trrs: []pipeline.TaskRunResult{pipeline.TaskRunResult{Task: &pipeline.MemoTask{}, Result: pipeline.Result{Value: res}}}, + trrs: []pipeline.TaskRunResult{{Task: &pipeline.MemoTask{}, Result: pipeline.Result{Value: res}}}, err: err, } } -func makeStreamValues() llo.StreamValues { - return llo.StreamValues{ - 1: nil, - 2: nil, - 3: nil, +func makeStreamValues(streamIDs ...llotypes.StreamID) llo.StreamValues { + if len(streamIDs) == 0 { + return llo.StreamValues{ + 1: nil, + 2: nil, + 3: nil, + } + } + vals := llo.StreamValues{} + for _, streamID := range streamIDs { + vals[streamID] = nil } + return vals } type mockOpts struct { @@ -96,7 +102,7 @@ func (m *mockOpts) SeqNr() uint64 { } func (m *mockOpts) OutCtx() ocr3types.OutcomeContext { if m.outCtx.SeqNr == 0 { - return ocr3types.OutcomeContext{SeqNr: 1042, PreviousOutcome: ocr3types.Outcome([]byte("foo"))} + return ocr3types.OutcomeContext{SeqNr: 1042, PreviousOutcome: []byte("foo")} } return m.outCtx } @@ -112,9 +118,19 @@ func (m *mockOpts) ObservationTimestamp() time.Time { } return m.observationTimestamp } - func (m *mockOpts) OutcomeCodec() llo.OutcomeCodec { - return nil + return mockOutputCodec{} +} + +type mockOutputCodec struct{} + +func (oc mockOutputCodec) Encode(outcome llo.Outcome) (ocr3types.Outcome, error) { + return ocr3types.Outcome{}, nil +} +func (oc mockOutputCodec) Decode(encoded ocr3types.Outcome) (outcome llo.Outcome, err error) { + return llo.Outcome{ + LifeCycleStage: llo.LifeCycleStageProduction, + }, nil } type mockTelemeter struct { @@ -152,77 +168,101 @@ func (m *mockTelemeter) CaptureObservationTelemetry() bool { func Test_DataSource(t *testing.T) { lggr := logger.TestLogger(t) - reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, telem.NullTelemeter, false) ctx := testutils.Context(t) opts := &mockOpts{} + observationLoopSleepDuration := 10 * time.Millisecond t.Run("Observe", func(t *testing.T) { t.Run("doesn't set any values if no streams are defined", func(t *testing.T) { + reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, telem.NullTelemeter, true) + vals := makeStreamValues() err := ds.Observe(ctx, vals, opts) assert.NoError(t, err) assert.Equal(t, makeStreamValues(), vals) + ds.Close() }) + t.Run("observes each stream with success and returns values matching map argument", func(t *testing.T) { + reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, telem.NullTelemeter, true) + reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil) reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](3, big.NewInt(15), nil) + // Sleep, so the observation loop has time to process the new pipelines. + time.Sleep(2 * observationLoopSleepDuration) + vals := makeStreamValues() err := ds.Observe(ctx, vals, opts) assert.NoError(t, err) assert.Equal(t, llo.StreamValues{ - 2: llo.ToDecimal(decimal.NewFromInt(40602)), 1: llo.ToDecimal(decimal.NewFromInt(2181)), + 2: llo.ToDecimal(decimal.NewFromInt(40602)), 3: llo.ToDecimal(decimal.NewFromInt(15)), - }, vals) + }, vals, fmt.Sprintf("vals: %v", vals)) + ds.Close() }) + t.Run("observes each stream and returns success/errors", func(t *testing.T) { - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), errors.New("something exploded")) - reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) - reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](3, nil, errors.New("something exploded 2")) + reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, telem.NullTelemeter, true) - vals := makeStreamValues() + reg.pipelines[11] = makePipelineWithSingleResult[*big.Int](11, big.NewInt(21810), errors.New("something exploded")) + reg.pipelines[12] = makePipelineWithSingleResult[*big.Int](12, big.NewInt(40602), nil) + reg.pipelines[13] = makePipelineWithSingleResult[*big.Int](13, nil, errors.New("something exploded 2")) + + // Sleep, so the observation loop has time to process the new pipelines. + time.Sleep(observationLoopSleepDuration) + + vals := makeStreamValues(11, 12, 13) err := ds.Observe(ctx, vals, opts) assert.NoError(t, err) assert.Equal(t, llo.StreamValues{ - 1: nil, - 2: llo.ToDecimal(decimal.NewFromInt(40602)), - 3: nil, - }, vals) + 11: nil, + 12: llo.ToDecimal(decimal.NewFromInt(40602)), + 13: nil, + }, vals, fmt.Sprintf("vals: %v", vals)) + ds.Close() }) t.Run("records telemetry", func(t *testing.T) { tm := &mockTelemeter{} - ds.t = tm + reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, tm, true) - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), nil) - reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil) - reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](102, big.NewInt(15), nil) + reg.pipelines[21] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), nil) + reg.pipelines[22] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil) + reg.pipelines[23] = makePipelineWithSingleResult[*big.Int](102, big.NewInt(15), nil) - vals := makeStreamValues() + // Sleep, so the observation loop has time to process the new pipelines. + time.Sleep(2 * observationLoopSleepDuration) + + vals := makeStreamValues(21, 22, 23) err := ds.Observe(ctx, vals, opts) require.NoError(t, err) assert.Equal(t, llo.StreamValues{ - 2: llo.ToDecimal(decimal.NewFromInt(40602)), - 1: llo.ToDecimal(decimal.NewFromInt(2181)), - 3: llo.ToDecimal(decimal.NewFromInt(15)), - }, vals) + 21: llo.ToDecimal(decimal.NewFromInt(2181)), + 22: llo.ToDecimal(decimal.NewFromInt(40602)), + 23: llo.ToDecimal(decimal.NewFromInt(15)), + }, vals, fmt.Sprintf("vals: %v", vals)) - require.Len(t, tm.v3PremiumLegacyPackets, 3) + // Get only the last 3 packets, as those would be the result of the first round of observations. + packets := tm.v3PremiumLegacyPackets[:3] m := make(map[int]v3PremiumLegacyPacket) - for _, pkt := range tm.v3PremiumLegacyPackets { + for _, pkt := range packets { m[int(pkt.run.ID)] = pkt } pkt := m[100] assert.Equal(t, 100, int(pkt.run.ID)) assert.Len(t, pkt.trrs, 1) - assert.Equal(t, 1, int(pkt.streamID)) + assert.Equal(t, 21, int(pkt.streamID)) assert.Equal(t, opts, pkt.opts) assert.Equal(t, "2181", pkt.val.(*llo.Decimal).String()) require.NoError(t, pkt.err) @@ -230,14 +270,18 @@ func Test_DataSource(t *testing.T) { telems := []interface{}{} for p := range tm.ch { telems = append(telems, p) + if len(telems) >= 3 { + break + } } - require.Len(t, telems, 3) + + require.Len(t, telems[:3], 3) sort.Slice(telems, func(i, j int) bool { return telems[i].(*telem.LLOObservationTelemetry).StreamId < telems[j].(*telem.LLOObservationTelemetry).StreamId }) require.IsType(t, &telem.LLOObservationTelemetry{}, telems[0]) obsTelem := telems[0].(*telem.LLOObservationTelemetry) - assert.Equal(t, uint32(1), obsTelem.StreamId) + assert.Equal(t, uint32(21), obsTelem.StreamId) assert.Equal(t, int32(llo.LLOStreamValue_Decimal), obsTelem.StreamValueType) assert.Equal(t, "00000000020885", hex.EncodeToString(obsTelem.StreamValueBinary)) assert.Equal(t, "2181", obsTelem.StreamValueText) @@ -247,25 +291,27 @@ func Test_DataSource(t *testing.T) { assert.Equal(t, uint32(0), obsTelem.DonId) assert.Equal(t, opts.SeqNr(), obsTelem.SeqNr) assert.Equal(t, opts.ConfigDigest().Hex(), hex.EncodeToString(obsTelem.ConfigDigest)) + ds.Close() }) t.Run("records telemetry for errors", func(t *testing.T) { tm := &mockTelemeter{} - ds.t = tm + reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)} + ds := newDataSource(lggr, reg, tm, true) - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), errors.New("something exploded")) - reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil) - reg.pipelines[3] = makePipelineWithSingleResult[*big.Int](102, nil, errors.New("something exploded 2")) + reg.pipelines[31] = makePipelineWithSingleResult[*big.Int](100, big.NewInt(2181), errors.New("something exploded")) + reg.pipelines[32] = makePipelineWithSingleResult[*big.Int](101, big.NewInt(40602), nil) + reg.pipelines[33] = makePipelineWithSingleResult[*big.Int](102, nil, errors.New("something exploded 2")) - vals := makeStreamValues() + vals := makeStreamValues(31, 32, 33) err := ds.Observe(ctx, vals, opts) require.NoError(t, err) assert.Equal(t, llo.StreamValues{ - 2: llo.ToDecimal(decimal.NewFromInt(40602)), - 1: nil, - 3: nil, - }, vals) + 31: nil, + 32: llo.ToDecimal(decimal.NewFromInt(40602)), + 33: nil, + }, vals, fmt.Sprintf("vals: %v", vals)) require.Len(t, tm.v3PremiumLegacyPackets, 3) m := make(map[int]v3PremiumLegacyPacket) @@ -275,139 +321,13 @@ func Test_DataSource(t *testing.T) { pkt := m[100] assert.Equal(t, 100, int(pkt.run.ID)) assert.Len(t, pkt.trrs, 1) - assert.Equal(t, 1, int(pkt.streamID)) + assert.Equal(t, 31, int(pkt.streamID)) assert.Equal(t, opts, pkt.opts) assert.Nil(t, pkt.val) assert.Error(t, pkt.err) + ds.Close() }) - t.Run("uses cached values when available", func(t *testing.T) { - ds := newDataSource(lggr, reg, telem.NullTelemeter, true) - - // First observation to populate cache - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil) - reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) - - vals := makeStreamValues() - key := make([]byte, 32) - _, err := rand.Read(key) - require.NoError(t, err) - - opts2 := &mockOpts{configDigest: ocr2types.ConfigDigest(key)} - err = ds.Observe(ctx, vals, opts2) - require.NoError(t, err) - - // Verify initial values - assert.Equal(t, llo.StreamValues{ - 1: llo.ToDecimal(decimal.NewFromInt(2181)), - 2: llo.ToDecimal(decimal.NewFromInt(40602)), - 3: nil, - }, vals) - - // Change pipeline results - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(9999), nil) - reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(8888), nil) - - // Second observation should use cached values - vals = makeStreamValues() - err = ds.Observe(ctx, vals, opts2) - require.NoError(t, err) - - // Should still have original values from cache - assert.Equal(t, llo.StreamValues{ - 1: llo.ToDecimal(decimal.NewFromInt(2181)), - 2: llo.ToDecimal(decimal.NewFromInt(40602)), - 3: nil, - }, vals) - - // Verify cache metrics - assert.InEpsilon(t, float64(1), testutil.ToFloat64( - promCacheHitCount.WithLabelValues(opts2.ConfigDigest().Hex(), "1")), 0.0001) - assert.InEpsilon(t, float64(1), testutil.ToFloat64( - promCacheHitCount.WithLabelValues(opts2.ConfigDigest().Hex(), "2")), 0.0001) - assert.InEpsilon(t, float64(1), testutil.ToFloat64( - promCacheMissCount.WithLabelValues(opts2.ConfigDigest().Hex(), "1", "notFound")), 0.0001) - assert.InEpsilon(t, float64(1), testutil.ToFloat64( - promCacheMissCount.WithLabelValues(opts2.ConfigDigest().Hex(), "2", "notFound")), 0.0001) - }) - - t.Run("refreshes cache after expiration", func(t *testing.T) { - ds := newDataSource(lggr, reg, telem.NullTelemeter, true) - - // First observation - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) - vals := llo.StreamValues{1: nil} - - opts2 := &mockOpts{configDigest: ocr2types.ConfigDigest{6, 5, 9}} - err := ds.Observe(ctx, vals, opts2) - require.NoError(t, err) - - // Wait for cache to expire - time.Sleep(501 * time.Millisecond) - - // Change pipeline result - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(200), nil) - - // Second observation should use new value - vals = llo.StreamValues{1: nil} - err = ds.Observe(ctx, vals, opts) - require.NoError(t, err) - - assert.Equal(t, llo.StreamValues{1: llo.ToDecimal(decimal.NewFromInt(200))}, vals) - }) - - t.Run("handles concurrent cache access", func(t *testing.T) { - // Create a new data source - ds := newDataSource(lggr, reg, telem.NullTelemeter, true) - - // Set up pipeline to return different values - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) - - // First observation to cache - vals := llo.StreamValues{1: nil} - opts2 := &mockOpts{configDigest: ocr2types.ConfigDigest{6, 5, 6}} - - err := ds.Observe(ctx, vals, opts2) - require.NoError(t, err) - - // Run multiple observations concurrently - var wg sync.WaitGroup - for i := 0; i < 10; i++ { - wg.Add(1) - go func() { - defer wg.Done() - vals := llo.StreamValues{1: nil} - err := ds.Observe(ctx, vals, opts2) - assert.NoError(t, err) - assert.Equal(t, llo.StreamValues{1: llo.ToDecimal(decimal.NewFromInt(100))}, vals) - }() - } - wg.Wait() - - // Verify pipeline was only called once - assert.Equal(t, 1, reg.pipelines[1].runCount) - }) - - t.Run("handles cache errors gracefully", func(t *testing.T) { - ds := newDataSource(lggr, reg, telem.NullTelemeter, true) - - // First observation with error - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, nil, errors.New("pipeline error")) - vals := makeStreamValues() - opts2 := &mockOpts{configDigest: ocr2types.ConfigDigest{6, 5, 2}} - err := ds.Observe(ctx, vals, opts2) - require.NoError(t, err) // Observe returns nil error even if some streams fail - - time.Sleep(501 * time.Millisecond) - - // Second observation should try again (not use cache for error case) - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) - vals = llo.StreamValues{1: nil} - err = ds.Observe(ctx, vals, opts2) - require.NoError(t, err) - - assert.Equal(t, llo.StreamValues{1: llo.ToDecimal(decimal.NewFromInt(100))}, vals) - }) }) } @@ -484,4 +404,5 @@ result3 -> result3_parse -> multiply3; b.ResetTimer() err := ds.Observe(ctx, vals, opts) require.NoError(b, err) + ds.Close() }