diff --git a/.changeset/fresh-buttons-refuse.md b/.changeset/fresh-buttons-refuse.md new file mode 100644 index 00000000000..522d057a0c9 --- /dev/null +++ b/.changeset/fresh-buttons-refuse.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#updated llo plugin data source and telemetry performance improvements diff --git a/core/scripts/go.mod b/core/scripts/go.mod index f0c816bbf36..f90781a27ee 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -48,7 +48,7 @@ require ( github.com/smartcontractkit/chainlink-automation v0.8.1 github.com/smartcontractkit/chainlink-ccip v0.1.1-solana.0.20250930202440-88c08e65d960 github.com/smartcontractkit/chainlink-common v0.9.6-0.20251001150007-98903c79c124 - github.com/smartcontractkit/chainlink-data-streams v0.1.2 + github.com/smartcontractkit/chainlink-data-streams v0.1.5 github.com/smartcontractkit/chainlink-deployments-framework v0.54.0 github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251002155240-31abd326e293 github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250827130336-5922343458be diff --git a/core/scripts/go.sum b/core/scripts/go.sum index dd39a12c05a..9b78f0b7c6d 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1603,8 +1603,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 h1:hvqATtrZ0 github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.2 h1:g/UmFJa/E1Zmc7NO20ob5SijxQen51DhnqTLr2f7BEc= -github.com/smartcontractkit/chainlink-data-streams v0.1.2/go.mod h1:lxY97sDlDorQAmLGFo6x1tl8SQ2E7adsS0/wU8+mmTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-deployments-framework v0.54.0 h1:KMkw64j2VUMWTGkWTSFcNrWXcY8189kTjc33n2FaA2w= github.com/smartcontractkit/chainlink-deployments-framework v0.54.0/go.mod h1:KmwLKwDuiYo8SfzoGb9TVehbodBU+yj7HuCfzgU6jx0= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251002155240-31abd326e293 h1:MVTDt2N5pYGIxxBMG0CCMu+bSsea+/ZOW9iyPVSkRCQ= diff --git a/core/services/llo/delegate.go b/core/services/llo/delegate.go index cd1b7dcdc9b..f2b586fd9a5 100644 --- a/core/services/llo/delegate.go +++ b/core/services/llo/delegate.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strconv" + "time" "github.com/prometheus/client_golang/prometheus" ocrcommontypes "github.com/smartcontractkit/libocr/commontypes" @@ -119,7 +120,22 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) { CaptureOutcomeTelemetry: cfg.CaptureOutcomeTelemetry, CaptureReportTelemetry: cfg.CaptureReportTelemetry, }) - ds := observation.NewDataSource(logger.Named(lggr, "DataSource"), cfg.Registry, t) + + cache := observation.NewCache(500*time.Millisecond, time.Minute) + ds := observation.NewDataSource( + logger.Named(lggr, "DataSource"), + cfg.Registry, + t, + cache, + ) + + notifier, ok := cfg.ContractTransmitter.(TransmitNotifier) + if ok { + notifier.OnTransmit(t.TrackSeqNr) + notifier.OnTransmit(func(digest ocr2types.ConfigDigest, seqNr uint64) { + cache.SetLastTransmissionSeqNr(seqNr) + }) + } return &delegate{services.StateMachine{}, cfg, reportCodecs, cfg.ShouldRetireCache, ds, t, []Closer{}}, nil } diff --git a/core/services/llo/mercurytransmitter/transmitter.go b/core/services/llo/mercurytransmitter/transmitter.go index 71de79681b1..077ac2f3e72 100644 --- a/core/services/llo/mercurytransmitter/transmitter.go +++ b/core/services/llo/mercurytransmitter/transmitter.go @@ -25,7 +25,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/services/llo/grpc" - "github.com/smartcontractkit/chainlink/v2/core/services/llo/observation" ) const ( @@ -269,7 +268,6 @@ func (mt *transmitter) Transmit( err = fmt.Errorf("failed to add transmission to commit channel: %w", ctx.Err()) } } - observation.GetCache(digest).SetLastTransmissionSeqNr(seqNr) }) if !ok { diff --git a/core/services/llo/observation/cache.go b/core/services/llo/observation/cache.go index 772258a33f3..f4a45bdcb76 100644 --- a/core/services/llo/observation/cache.go +++ b/core/services/llo/observation/cache.go @@ -9,23 +9,19 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" ) var ( - registryMu = sync.Mutex{} - registry = map[ocr2types.ConfigDigest]*Cache{} - promCacheHitCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "llo", Subsystem: "datasource", Name: "cache_hit_count", Help: "Number of local observation cache hits", }, - []string{"configDigest", "streamID"}, + []string{"streamID"}, ) promCacheMissCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "llo", @@ -33,23 +29,10 @@ var ( Name: "cache_miss_count", Help: "Number of local observation cache misses", }, - []string{"configDigest", "streamID", "reason"}, + []string{"streamID", "reason"}, ) ) -func GetCache(configDigest ocr2types.ConfigDigest) *Cache { - registryMu.Lock() - defer registryMu.Unlock() - - cache, ok := registry[configDigest] - if !ok { - cache = NewCache(configDigest, 500*time.Millisecond, time.Minute) - registry[configDigest] = cache - } - - return cache -} - // Cache of stream values. // It maintains a cache of stream values fetched from adapters until the last // transmission sequence number is greater or equal the sequence number at which @@ -60,7 +43,6 @@ func GetCache(configDigest ocr2types.ConfigDigest) *Cache { type Cache struct { mu sync.RWMutex - configDigestStr string values map[llotypes.StreamID]item maxAge time.Duration cleanupInterval time.Duration @@ -72,16 +54,15 @@ type Cache struct { type item struct { value llo.StreamValue seqNr uint64 - createdAt time.Time + expiresAt time.Time } // NewCache creates a new cache. // // maxAge is the maximum age of a stream value to keep in the cache. // cleanupInterval is the interval to clean up the cache. -func NewCache(configDigest ocr2types.ConfigDigest, maxAge time.Duration, cleanupInterval time.Duration) *Cache { +func NewCache(maxAge time.Duration, cleanupInterval time.Duration) *Cache { c := &Cache{ - configDigestStr: configDigest.Hex(), values: make(map[llotypes.StreamID]item), maxAge: maxAge, cleanupInterval: cleanupInterval, @@ -112,39 +93,51 @@ func NewCache(configDigest ocr2types.ConfigDigest, maxAge time.Duration, cleanup // SetLastTransmissionSeqNr sets the last transmission sequence number. func (c *Cache) SetLastTransmissionSeqNr(seqNr uint64) { + if c == nil { + return + } + c.lastTransmissionSeqNr.Store(seqNr) } // Add adds a stream value to the cache. func (c *Cache) Add(id llotypes.StreamID, value llo.StreamValue, seqNr uint64) { + if c == nil { + return + } + c.mu.Lock() defer c.mu.Unlock() - c.values[id] = item{value: value, seqNr: seqNr, createdAt: time.Now()} + c.values[id] = item{value: value, seqNr: seqNr, expiresAt: time.Now().Add(c.maxAge)} } -func (c *Cache) Get(id llotypes.StreamID) (llo.StreamValue, bool) { +func (c *Cache) Get(id llotypes.StreamID) llo.StreamValue { + if c == nil { + return nil + } + c.mu.RLock() defer c.mu.RUnlock() label := strconv.FormatUint(uint64(id), 10) item, ok := c.values[id] if !ok { - promCacheMissCount.WithLabelValues(c.configDigestStr, label, "notFound").Inc() - return nil, false + promCacheMissCount.WithLabelValues(label, "notFound").Inc() + return nil } if item.seqNr <= c.lastTransmissionSeqNr.Load() { - promCacheMissCount.WithLabelValues(c.configDigestStr, label, "seqNr").Inc() - return nil, false + promCacheMissCount.WithLabelValues(label, "seqNr").Inc() + return nil } - if time.Since(item.createdAt) >= c.maxAge { - promCacheMissCount.WithLabelValues(c.configDigestStr, label, "maxAge").Inc() - return nil, false + if time.Now().After(item.expiresAt) { + promCacheMissCount.WithLabelValues(label, "maxAge").Inc() + return nil } - promCacheHitCount.WithLabelValues(c.configDigestStr, label).Inc() - return item.value, true + promCacheHitCount.WithLabelValues(label).Inc() + return item.value } func (c *Cache) cleanup() { @@ -152,8 +145,9 @@ func (c *Cache) cleanup() { defer c.mu.Unlock() lastTransmissionSeqNr := c.lastTransmissionSeqNr.Load() + now := time.Now() for id, item := range c.values { - if item.seqNr <= lastTransmissionSeqNr || time.Since(item.createdAt) >= c.maxAge { + if item.seqNr <= lastTransmissionSeqNr || now.After(item.expiresAt) { delete(c.values, id) } } diff --git a/core/services/llo/observation/cache_test.go b/core/services/llo/observation/cache_test.go index 454c959a962..a6468796759 100644 --- a/core/services/llo/observation/cache_test.go +++ b/core/services/llo/observation/cache_test.go @@ -10,8 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - ocr2types "github.com/smartcontractkit/libocr/offchainreporting2/types" - llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" ) @@ -72,7 +70,7 @@ func TestNewCache(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cache := NewCache(ocr2types.ConfigDigest{}, tt.maxAge, tt.cleanupInterval) + cache := NewCache(tt.maxAge, tt.cleanupInterval) require.NotNil(t, cache) assert.Equal(t, tt.maxAge, cache.maxAge) assert.Equal(t, tt.cleanupInterval, cache.cleanupInterval) @@ -139,7 +137,7 @@ func TestCache_Add_Get(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cache := NewCache(ocr2types.ConfigDigest{}, tt.maxAge, 0) + cache := NewCache(tt.maxAge, 0) if tt.wantFound { cache.Add(tt.streamID, tt.value, tt.seqNr) @@ -149,30 +147,23 @@ func TestCache_Add_Get(t *testing.T) { tt.beforeGet(cache) } - gotValue, gotFound := cache.Get(tt.streamID) - assert.Equal(t, tt.wantFound, gotFound) - if tt.wantFound { - assert.Equal(t, tt.wantValue, gotValue) - } + assert.Equal(t, tt.wantValue, cache.Get(tt.streamID)) }) } } func TestCache_Cleanup(t *testing.T) { - cache := NewCache(ocr2types.ConfigDigest{}, time.Nanosecond*100, time.Millisecond) + cache := NewCache(time.Nanosecond*100, time.Millisecond) streamID := llotypes.StreamID(1) value := &mockStreamValue{value: []byte{42}} cache.Add(streamID, value, 10) time.Sleep(time.Millisecond * 2) - - gotValue, gotFound := cache.Get(streamID) - assert.False(t, gotFound) - assert.Nil(t, gotValue) + assert.Nil(t, cache.Get(streamID)) } func TestCache_ConcurrentAccess(t *testing.T) { - cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0) + cache := NewCache(time.Second, 0) const numGoroutines = 10 const numOperations = uint32(1000) @@ -195,15 +186,13 @@ func TestCache_ConcurrentAccess(t *testing.T) { for i := uint32(0); i < numGoroutines; i++ { for j := uint32(0); j < numOperations; j++ { streamID := i*numOperations + j - value, found := cache.Get(streamID) - assert.True(t, found) - assert.Equal(t, &mockStreamValue{value: []byte{byte(i)}}, value) + assert.Equal(t, &mockStreamValue{value: []byte{byte(i)}}, cache.Get(streamID)) } } } func TestCache_ConcurrentReadWrite(t *testing.T) { - cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0) + cache := NewCache(time.Second, 0) const numGoroutines = 10 const numOperations = uint32(1000) @@ -236,7 +225,7 @@ func TestCache_ConcurrentReadWrite(t *testing.T) { } func TestCache_ConcurrentAddGet(t *testing.T) { - cache := NewCache(ocr2types.ConfigDigest{}, time.Second, 0) + cache := NewCache(time.Second, 0) const numGoroutines = 10 const numOperations = uint32(1000) diff --git a/core/services/llo/observation/data_source.go b/core/services/llo/observation/data_source.go index b77effd2a4d..b447e65a7b2 100644 --- a/core/services/llo/observation/data_source.go +++ b/core/services/llo/observation/data_source.go @@ -12,9 +12,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "golang.org/x/exp/maps" - - ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" @@ -73,22 +70,25 @@ func (e *ErrObservationFailed) Unwrap() error { var _ llo.DataSource = &dataSource{} type dataSource struct { - lggr logger.Logger - registry Registry - t Telemeter - shouldCache bool + lggr logger.Logger + registry Registry + t Telemeter + + activeSeqNrMu sync.Mutex + activeSeqNr uint64 + cache *Cache } -func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter) llo.DataSource { - return newDataSource(lggr, registry, t, true) +func NewDataSource(lggr logger.Logger, registry Registry, t Telemeter, c *Cache) llo.DataSource { + return newDataSource(lggr, registry, t, c) } -func newDataSource(lggr logger.Logger, registry Registry, t Telemeter, shouldCache bool) *dataSource { +func newDataSource(lggr logger.Logger, registry Registry, t Telemeter, c *Cache) *dataSource { return &dataSource{ - lggr: logger.Named(lggr, "DataSource"), - registry: registry, - t: t, - shouldCache: shouldCache, + lggr: logger.Named(lggr, "DataSource"), + registry: registry, + t: t, + cache: c, } } @@ -97,16 +97,27 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, now := time.Now() lggr := logger.With(d.lggr, "observationTimestamp", opts.ObservationTimestamp(), "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr) + // stream ids to observe + streamIDs := make([]streams.StreamID, 0, len(streamValues)) + for streamID := range streamValues { + streamIDs = append(streamIDs, streamID) + } + if opts.VerboseLogging() { - streamIDs := make([]streams.StreamID, 0, len(streamValues)) - for streamID := range streamValues { - streamIDs = append(streamIDs, streamID) - } sort.Slice(streamIDs, func(i, j int) bool { return streamIDs[i] < streamIDs[j] }) lggr = logger.With(lggr, "streamIDs", streamIDs) lggr.Debugw("Observing streams") } + // update the active seq nr + // we track the transmitting sequence number to ensure observations + // are cached at the sequence number of the active plugin ocr instance (blue/green) + // but can also be used by the passive instance. + // In case of cache misses for the passive instance we still run the pipeline + // but cache at the last sequence number of the active instance. + // this ensures that they are still invalidated at the next transmission. + activeSeqNr := d.updateActiveSeqNr(opts) + var wg sync.WaitGroup wg.Add(len(streamValues)) @@ -137,14 +148,14 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, } // Observe all streams concurrently - for _, streamID := range maps.Keys(streamValues) { + for _, streamID := range streamIDs { go func(streamID llotypes.StreamID) { defer wg.Done() var val llo.StreamValue var err error // check for valid cached value before observing - if val = d.fromCache(opts.ConfigDigest(), streamID); val == nil { + if val = d.cache.Get(streamID); val == nil { // no valid cached value, observe the stream if val, err = oc.Observe(ctx, streamID, opts); err != nil { strmIDStr := strconv.FormatUint(uint64(streamID), 10) @@ -159,7 +170,7 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, } // cache the observed value - d.toCache(opts.ConfigDigest(), streamID, val, opts.OutCtx().SeqNr) + d.cache.Add(streamID, val, activeSeqNr) } mu.Lock() @@ -206,18 +217,22 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues, return nil } -func (d *dataSource) fromCache(configDigest ocrtypes.ConfigDigest, streamID llotypes.StreamID) llo.StreamValue { - if d.shouldCache { - if streamValue, found := GetCache(configDigest).Get(streamID); found && streamValue != nil { - return streamValue - } +func (d *dataSource) updateActiveSeqNr(opts llo.DSOpts) uint64 { + if opts.OutcomeCodec() == nil { + return opts.OutCtx().SeqNr } - return nil -} -func (d *dataSource) toCache(configDigest ocrtypes.ConfigDigest, streamID llotypes.StreamID, val llo.StreamValue, seqNr uint64) { - if d.shouldCache && val != nil { - // Use the current sequence number as the cache key - GetCache(configDigest).Add(streamID, val, seqNr) + outcome, err := opts.OutcomeCodec().Decode(opts.OutCtx().PreviousOutcome) + if err != nil { + d.lggr.Warnf("failed to decode previous outcome, err: %v", err) + return opts.OutCtx().SeqNr } + + d.activeSeqNrMu.Lock() + defer d.activeSeqNrMu.Unlock() + if outcome.LifeCycleStage == llo.LifeCycleStageProduction { + d.activeSeqNr = opts.OutCtx().SeqNr + } + + return d.activeSeqNr } diff --git a/core/services/llo/observation/data_source_test.go b/core/services/llo/observation/data_source_test.go index 2de22fba926..2bae0abb4af 100644 --- a/core/services/llo/observation/data_source_test.go +++ b/core/services/llo/observation/data_source_test.go @@ -153,7 +153,7 @@ func (m *mockTelemeter) CaptureObservationTelemetry() bool { func Test_DataSource(t *testing.T) { lggr := logger.TestLogger(t) reg := &mockRegistry{make(map[streams.StreamID]*mockPipeline)} - ds := newDataSource(lggr, reg, telem.NullTelemeter, false) + ds := newDataSource(lggr, reg, telem.NullTelemeter, nil) ctx := testutils.Context(t) opts := &mockOpts{} @@ -282,13 +282,18 @@ func Test_DataSource(t *testing.T) { }) t.Run("uses cached values when available", func(t *testing.T) { - ds := newDataSource(lggr, reg, telem.NullTelemeter, true) + ds := newDataSource(lggr, reg, telem.NullTelemeter, + NewCache(time.Millisecond*500, time.Minute)) // First observation to populate cache - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil) - reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) + reg.pipelines[10001] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(2181), nil) + reg.pipelines[20001] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(40602), nil) - vals := makeStreamValues() + vals := llo.StreamValues{ + 10001: nil, + 20001: nil, + 30001: nil, + } key := make([]byte, 32) _, err := rand.Read(key) require.NoError(t, err) @@ -299,40 +304,45 @@ func Test_DataSource(t *testing.T) { // Verify initial values assert.Equal(t, llo.StreamValues{ - 1: llo.ToDecimal(decimal.NewFromInt(2181)), - 2: llo.ToDecimal(decimal.NewFromInt(40602)), - 3: nil, + 10001: llo.ToDecimal(decimal.NewFromInt(2181)), + 20001: llo.ToDecimal(decimal.NewFromInt(40602)), + 30001: nil, }, vals) // Change pipeline results - reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(9999), nil) - reg.pipelines[2] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(8888), nil) + reg.pipelines[10001] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(9999), nil) + reg.pipelines[20001] = makePipelineWithSingleResult[*big.Int](2, big.NewInt(8888), nil) // Second observation should use cached values - vals = makeStreamValues() + vals = llo.StreamValues{ + 10001: nil, + 20001: nil, + 30001: nil, + } err = ds.Observe(ctx, vals, opts2) require.NoError(t, err) // Should still have original values from cache assert.Equal(t, llo.StreamValues{ - 1: llo.ToDecimal(decimal.NewFromInt(2181)), - 2: llo.ToDecimal(decimal.NewFromInt(40602)), - 3: nil, + 10001: llo.ToDecimal(decimal.NewFromInt(2181)), + 20001: llo.ToDecimal(decimal.NewFromInt(40602)), + 30001: nil, }, vals) // Verify cache metrics assert.InEpsilon(t, float64(1), testutil.ToFloat64( - promCacheHitCount.WithLabelValues(opts2.ConfigDigest().Hex(), "1")), 0.0001) + promCacheHitCount.WithLabelValues("10001")), 0.0001) assert.InEpsilon(t, float64(1), testutil.ToFloat64( - promCacheHitCount.WithLabelValues(opts2.ConfigDigest().Hex(), "2")), 0.0001) + promCacheHitCount.WithLabelValues("20001")), 0.0001) assert.InEpsilon(t, float64(1), testutil.ToFloat64( - promCacheMissCount.WithLabelValues(opts2.ConfigDigest().Hex(), "1", "notFound")), 0.0001) + promCacheMissCount.WithLabelValues("10001", "notFound")), 0.0001) assert.InEpsilon(t, float64(1), testutil.ToFloat64( - promCacheMissCount.WithLabelValues(opts2.ConfigDigest().Hex(), "2", "notFound")), 0.0001) + promCacheMissCount.WithLabelValues("20001", "notFound")), 0.0001) }) t.Run("refreshes cache after expiration", func(t *testing.T) { - ds := newDataSource(lggr, reg, telem.NullTelemeter, true) + ds := newDataSource(lggr, reg, telem.NullTelemeter, + NewCache(time.Millisecond*500, time.Minute)) // First observation reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) @@ -358,7 +368,8 @@ func Test_DataSource(t *testing.T) { t.Run("handles concurrent cache access", func(t *testing.T) { // Create a new data source - ds := newDataSource(lggr, reg, telem.NullTelemeter, true) + ds := newDataSource(lggr, reg, telem.NullTelemeter, + NewCache(time.Millisecond*500, time.Minute)) // Set up pipeline to return different values reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, big.NewInt(100), nil) @@ -389,7 +400,8 @@ func Test_DataSource(t *testing.T) { }) t.Run("handles cache errors gracefully", func(t *testing.T) { - ds := newDataSource(lggr, reg, telem.NullTelemeter, true) + ds := newDataSource(lggr, reg, telem.NullTelemeter, + NewCache(time.Millisecond*500, time.Minute)) // First observation with error reg.pipelines[1] = makePipelineWithSingleResult[*big.Int](1, nil, errors.New("pipeline error")) @@ -475,7 +487,7 @@ result3 -> result3_parse -> multiply3; require.NoError(b, err) } - ds := newDataSource(lggr, r, telem.NullTelemeter, false) + ds := newDataSource(lggr, r, telem.NullTelemeter, nil) vals := make(map[llotypes.StreamID]llo.StreamValue) for i := uint32(0); i < 4*n; i++ { vals[i] = nil diff --git a/core/services/llo/telem/telemetry.go b/core/services/llo/telem/telemetry.go index d25c9cf00d0..a0bc6f1bdb4 100644 --- a/core/services/llo/telem/telemetry.go +++ b/core/services/llo/telem/telemetry.go @@ -8,11 +8,12 @@ import ( "google.golang.org/protobuf/proto" + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-data-streams/llo" datastreamsllo "github.com/smartcontractkit/chainlink-data-streams/llo" - "github.com/smartcontractkit/chainlink-data-streams/llo/reportcodecs/evm" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" @@ -32,6 +33,7 @@ type Telemeter interface { GetReportTelemetryCh() chan<- *datastreamsllo.LLOReportTelemetry CaptureEATelemetry() bool CaptureObservationTelemetry() bool + TrackSeqNr(digest types.ConfigDigest, seqNr uint64) } type TelemeterService interface { @@ -69,6 +71,12 @@ func newTelemeter(params TelemeterParams) *telemeter { chch: make(chan telemetryCollectionContext, 1000), // chch should be consumed from very quickly so we don't need a large buffer, but it also won't hurt captureEATelemetry: params.CaptureEATelemetry, captureObservationTelemetry: params.CaptureObservationTelemetry, + chTransmissionSeqNr: make(chan struct { + digest types.ConfigDigest + seqNr uint64 + }, 10), + currentSeqNr: make(map[string]uint64), + telemetryBuffer: make(map[string]map[uint64][]telemetryEntry), } if params.CaptureOutcomeTelemetry { t.chOutcomeTelemetry = make(chan *datastreamsllo.LLOOutcomeTelemetry, 100) // only one per round so 100 buffer should be more than enough even for very fast rounds @@ -92,6 +100,8 @@ func newTelemeter(params TelemeterParams) *telemeter { if t.chReportTelemetry != nil { close(t.chReportTelemetry) } + + close(t.chTransmissionSeqNr) return nil }, }.NewServiceEngine(params.Logger) @@ -99,6 +109,11 @@ func newTelemeter(params TelemeterParams) *telemeter { return t } +type telemetryEntry struct { + telemType synchronization.TelemetryType + msg proto.Message +} + type telemeter struct { services.Service eng *services.Engine @@ -112,6 +127,18 @@ type telemeter struct { chch chan telemetryCollectionContext chOutcomeTelemetry chan *datastreamsllo.LLOOutcomeTelemetry chReportTelemetry chan *datastreamsllo.LLOReportTelemetry + + currentSeqNrMu sync.Mutex + currentSeqNr map[string]uint64 + chTransmissionSeqNr chan struct { + digest types.ConfigDigest + seqNr uint64 + } + + // Buffer Report and Outcome telemetry to only send + // for transmitting rounds sequence numbers + telemetryBufferMu sync.Mutex + telemetryBuffer map[string]map[uint64][]telemetryEntry } func (t *telemeter) EnqueueV3PremiumLegacy(run *pipeline.Run, trrs pipeline.TaskRunResults, streamID uint32, opts llo.DSOpts, val llo.StreamValue, err error) { @@ -204,19 +231,21 @@ func (t *telemeter) start(_ context.Context) error { // channel closed by producer return } - t.collectObservationTelemetry(p, tcc.opts) + t.prepareObservationTelemetry(p, tcc.opts) case <-ctx.Done(): return } } }() - + case p := <-t.chTelemetryPipeline: + t.prepareV3PremiumLegacyTelemetry(p) case rt := <-t.chOutcomeTelemetry: - t.collectOutcomeTelemetry(rt) + t.enqueueTelemetry(types.ConfigDigest(rt.ConfigDigest).Hex(), rt.SeqNr, synchronization.LLOOutcome, rt) case rt := <-t.chReportTelemetry: - t.collectReportTelemetry(rt) - case p := <-t.chTelemetryPipeline: - t.collectV3PremiumLegacyTelemetry(p) + t.enqueueTelemetry(types.ConfigDigest(rt.ConfigDigest).Hex(), rt.SeqNr, synchronization.LLOReport, rt) + case tx := <-t.chTransmissionSeqNr: + // Drain any pending outcome or report telemetry before sending buffered telemetry + t.sendBufferedTelemetry(tx.digest, tx.seqNr) case <-ctx.Done(): wg.Wait() return @@ -226,7 +255,83 @@ func (t *telemeter) start(_ context.Context) error { return nil } -func (t *telemeter) collectObservationTelemetry(p interface{}, opts llo.DSOpts) { +func (t *telemeter) sendBufferedTelemetry(digest types.ConfigDigest, seqNr uint64) { + cd := digest.Hex() + + // update current sequence number + t.currentSeqNrMu.Lock() + currentSeqNr := t.currentSeqNr[cd] + t.currentSeqNr[cd] = seqNr + t.currentSeqNrMu.Unlock() + + // get buffered messages for this config digest + t.telemetryBufferMu.Lock() + defer t.telemetryBufferMu.Unlock() + digestMessages := t.telemetryBuffer[cd] + + // include any message from the previous sequence number + // that was enqueued after the its transmission + var messages [2][]telemetryEntry + messages[0] = digestMessages[currentSeqNr] + messages[1] = digestMessages[seqNr] + + t.eng.Debugw("Telemetry: Sending buffered telemetry", + "digest", digest, "currentSeqNr", currentSeqNr, "seqNr", seqNr, "message_count", len(messages[0])+len(messages[1])) + + // drop stale messages + for messgesSeqNr := range digestMessages { + if messgesSeqNr <= seqNr { + delete(digestMessages, messgesSeqNr) + } + } + + // drop messages for config digests that are not transmitting + for d := range t.telemetryBuffer { + if d == cd { + continue + } + delete(t.telemetryBuffer, d) + } + + go func() { + for _, msgs := range messages { + for _, msg := range msgs { + bytes, err := proto.Marshal(msg.msg) + if err != nil { + t.eng.Warnf("protobuf marshal failed %v", err.Error()) + continue + } + t.monitoringEndpoint.SendTypedLog(msg.telemType, bytes) + } + } + }() +} + +func (t *telemeter) enqueueTelemetry(digest string, seqNr uint64, typ synchronization.TelemetryType, msg proto.Message) { + switch typ { + case synchronization.PipelineBridge, synchronization.LLOObservation, synchronization.EnhancedEAMercury: + bytes, err := proto.Marshal(msg) + if err != nil { + t.eng.Warnf("protobuf marshal failed %v", err.Error()) + return + } + // observation and bridge telemetry are not buffered + t.monitoringEndpoint.SendTypedLog(typ, bytes) + default: // synchronization.LLOOutcome, synchronization.LLOReport + t.telemetryBufferMu.Lock() + defer t.telemetryBufferMu.Unlock() + + if _, ok := t.telemetryBuffer[digest]; !ok { + t.telemetryBuffer[digest] = make(map[uint64][]telemetryEntry) + } + t.telemetryBuffer[digest][seqNr] = append(t.telemetryBuffer[digest][seqNr], telemetryEntry{ + telemType: typ, + msg: msg, + }) + } +} + +func (t *telemeter) prepareObservationTelemetry(p interface{}, opts llo.DSOpts) { var telemType synchronization.TelemetryType var msg proto.Message switch v := p.(type) { @@ -264,34 +369,11 @@ func (t *telemeter) collectObservationTelemetry(p interface{}, opts llo.DSOpts) t.eng.Warnw("Unknown telemetry type", "type", fmt.Sprintf("%T", p)) return } - bytes, err := proto.Marshal(msg) - if err != nil { - t.eng.Warnf("protobuf marshal failed %v", err.Error()) - return - } - - t.monitoringEndpoint.SendTypedLog(telemType, bytes) -} - -func (t *telemeter) collectOutcomeTelemetry(rt *datastreamsllo.LLOOutcomeTelemetry) { - bytes, err := proto.Marshal(rt) - if err != nil { - t.eng.Warnf("protobuf marshal failed %v", err.Error()) - return - } - t.monitoringEndpoint.SendTypedLog(synchronization.LLOOutcome, bytes) -} -func (t *telemeter) collectReportTelemetry(rt *datastreamsllo.LLOReportTelemetry) { - bytes, err := proto.Marshal(rt) - if err != nil { - t.eng.Warnf("protobuf marshal failed %v", err.Error()) - return - } - t.monitoringEndpoint.SendTypedLog(synchronization.LLOReport, bytes) + t.enqueueTelemetry(opts.ConfigDigest().Hex(), opts.SeqNr(), telemType, msg) } -func (t *telemeter) collectV3PremiumLegacyTelemetry(d *TelemetryPipeline) { +func (t *telemeter) prepareV3PremiumLegacyTelemetry(d *TelemetryPipeline) { eaTelemetryValues := ocrcommon.ParseMercuryEATelemetry(t.eng.SugaredLogger, d.trrs, mercuryutils.REPORT_V3) for _, eaTelem := range eaTelemetryValues { var benchmarkPrice, bidPrice, askPrice int64 @@ -342,13 +424,17 @@ func (t *telemeter) collectV3PremiumLegacyTelemetry(d *TelemetryPipeline) { tea.Epoch = int64(epoch) } - bytes, err := proto.Marshal(tea) - if err != nil { - t.eng.Warnf("protobuf marshal failed %v", err.Error()) - continue - } + t.enqueueTelemetry(d.opts.ConfigDigest().Hex(), d.opts.SeqNr(), synchronization.EnhancedEAMercury, tea) + } +} - t.monitoringEndpoint.SendTypedLog(synchronization.EnhancedEAMercury, bytes) +func (t *telemeter) TrackSeqNr(digest types.ConfigDigest, seqNr uint64) { + select { + case t.chTransmissionSeqNr <- struct { + digest types.ConfigDigest + seqNr uint64 + }{digest, seqNr}: + default: } } @@ -408,3 +494,5 @@ func (t *nullTelemeter) Name() string { func (t *nullTelemeter) Ready() error { return nil } +func (t *nullTelemeter) TrackSeqNr(digest types.ConfigDigest, seqNr uint64) { +} diff --git a/core/services/llo/telem/telemetry_test.go b/core/services/llo/telem/telemetry_test.go index 39c3afeb411..666e2f63682 100644 --- a/core/services/llo/telem/telemetry_test.go +++ b/core/services/llo/telem/telemetry_test.go @@ -166,6 +166,7 @@ func Test_Telemeter_v3PremiumLegacy(t *testing.T) { adapterError := new(eautils.AdapterError) adapterError.Name = adapterLWBAErrorName tm.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, nil, adapterError) + tm.TrackSeqNr(opts.ConfigDigest(), opts.SeqNr()) var i int for tLog := range m.chTypedLogs { @@ -189,6 +190,7 @@ func Test_Telemeter_v3PremiumLegacy(t *testing.T) { val := llo.ToDecimal(decimal.NewFromFloat32(102.12)) servicetest.Run(t, tm) tm.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil) + tm.TrackSeqNr(opts.ConfigDigest(), opts.SeqNr()) var i int for tLog := range m.chTypedLogs { @@ -244,6 +246,8 @@ func Test_Telemeter_v3PremiumLegacy(t *testing.T) { val := &llo.Quote{Bid: decimal.NewFromFloat32(102.12), Benchmark: decimal.NewFromFloat32(103.32), Ask: decimal.NewFromFloat32(104.25)} servicetest.Run(t, tm) tm.EnqueueV3PremiumLegacy(run, trrs, streamID, opts, val, nil) + time.Sleep(10 * time.Millisecond) + tm.TrackSeqNr(opts.ConfigDigest(), opts.SeqNr()) var i int for tLog := range m.chTypedLogs { @@ -307,6 +311,7 @@ func Test_Telemeter_observationScopedTelemetry(t *testing.T) { StreamID: ptr(uint32(135)), DotID: "ds1", } + tm.TrackSeqNr(opts.ConfigDigest(), opts.SeqNr()) tLog := <-m.chTypedLogs assert.Equal(t, synchronization.PipelineBridge, tLog.telemType) @@ -356,6 +361,7 @@ func Test_Telemeter_observationScopedTelemetry(t *testing.T) { SeqNr: 42, ConfigDigest: []byte{0x01, 0x02, 0x03}, } + tm.TrackSeqNr(opts.ConfigDigest(), opts.SeqNr()) tLog := <-m.chTypedLogs assert.Equal(t, synchronization.LLOObservation, tLog.telemType) @@ -392,6 +398,7 @@ func Test_Telemeter_observationScopedTelemetry(t *testing.T) { require.NotNil(t, ch) ch <- struct{}{} + tm.TrackSeqNr(opts.ConfigDigest(), opts.SeqNr()) testutils.WaitForLogMessage(t, observedLogs, "Unknown telemetry type") }) @@ -424,8 +431,12 @@ func Test_Telemeter_outcomeTelemetry(t *testing.T) { ch := tm.GetOutcomeTelemetryCh() require.NotNil(t, ch) t.Run("zero values", func(t *testing.T) { - orig := &datastreamsllo.LLOOutcomeTelemetry{} + opts := &mockOpts{} + cd := opts.ConfigDigest() + orig := &datastreamsllo.LLOOutcomeTelemetry{SeqNr: opts.SeqNr(), ConfigDigest: cd[:]} ch <- orig + time.Sleep(5 * time.Millisecond) + tm.TrackSeqNr(opts.ConfigDigest(), opts.SeqNr()) tLog := <-m.chTypedLogs assert.Equal(t, synchronization.LLOOutcome, tLog.telemType) @@ -436,11 +447,13 @@ func Test_Telemeter_outcomeTelemetry(t *testing.T) { assert.Zero(t, decoded.ChannelDefinitions) assert.Zero(t, decoded.ValidAfterNanoseconds) assert.Zero(t, decoded.StreamAggregates) - assert.Zero(t, decoded.SeqNr) - assert.Zero(t, decoded.ConfigDigest) + assert.Equal(t, opts.SeqNr(), decoded.SeqNr) + assert.Equal(t, cd[:], decoded.ConfigDigest) assert.Zero(t, decoded.DonId) }) t.Run("with values", func(t *testing.T) { + opts := &mockOpts{} + cd := opts.ConfigDigest() orig := &datastreamsllo.LLOOutcomeTelemetry{ LifeCycleStage: "foo", ObservationTimestampNanoseconds: 2, @@ -469,11 +482,13 @@ func Test_Telemeter_outcomeTelemetry(t *testing.T) { }, }, }, - SeqNr: 8, - ConfigDigest: []byte{9}, + SeqNr: opts.SeqNr(), + ConfigDigest: cd[:], DonId: 10, } ch <- orig + time.Sleep(5 * time.Millisecond) + tm.TrackSeqNr(opts.ConfigDigest(), opts.SeqNr()) tLog := <-m.chTypedLogs assert.Equal(t, synchronization.LLOOutcome, tLog.telemType) @@ -493,8 +508,8 @@ func Test_Telemeter_outcomeTelemetry(t *testing.T) { assert.Len(t, decoded.StreamAggregates[10].AggregatorValues, 1) assert.Equal(t, llo.LLOStreamValue_Type(12), decoded.StreamAggregates[10].AggregatorValues[11].Type) assert.Equal(t, []byte{13}, decoded.StreamAggregates[10].AggregatorValues[11].Value) - assert.Equal(t, uint64(8), decoded.SeqNr) - assert.Equal(t, []byte{9}, decoded.ConfigDigest) + assert.Equal(t, opts.SeqNr(), decoded.SeqNr) + assert.Equal(t, cd[:], decoded.ConfigDigest) assert.Equal(t, uint32(10), decoded.DonId) }) }) @@ -527,8 +542,12 @@ func Test_Telemeter_reportTelemetry(t *testing.T) { ch := tm.GetReportTelemetryCh() require.NotNil(t, ch) t.Run("zero values", func(t *testing.T) { - orig := &datastreamsllo.LLOReportTelemetry{} + opts := &mockOpts{} + cd := opts.ConfigDigest() + orig := &datastreamsllo.LLOReportTelemetry{SeqNr: opts.SeqNr(), ConfigDigest: cd[:]} ch <- orig + time.Sleep(5 * time.Millisecond) + tm.TrackSeqNr(opts.ConfigDigest(), opts.SeqNr()) tLog := <-m.chTypedLogs assert.Equal(t, synchronization.LLOReport, tLog.telemType) @@ -542,10 +561,12 @@ func Test_Telemeter_reportTelemetry(t *testing.T) { assert.Empty(t, decoded.StreamDefinitions) assert.Empty(t, decoded.StreamValues) assert.Empty(t, decoded.ChannelOpts) - assert.Zero(t, decoded.SeqNr) - assert.Empty(t, decoded.ConfigDigest) + assert.Equal(t, opts.SeqNr(), decoded.SeqNr) + assert.Equal(t, cd[:], decoded.ConfigDigest) }) t.Run("with values", func(t *testing.T) { + opts := &mockOpts{} + cd := opts.ConfigDigest() orig := &datastreamsllo.LLOReportTelemetry{ ChannelId: 1, ValidAfterNanoseconds: 2, @@ -565,10 +586,12 @@ func Test_Telemeter_reportTelemetry(t *testing.T) { }, }, ChannelOpts: []byte{9}, - SeqNr: 10, - ConfigDigest: []byte{11}, + SeqNr: opts.SeqNr(), + ConfigDigest: cd[:], } ch <- orig + time.Sleep(5 * time.Millisecond) + tm.TrackSeqNr(opts.ConfigDigest(), opts.SeqNr()) tLog := <-m.chTypedLogs assert.Equal(t, synchronization.LLOReport, tLog.telemType) @@ -586,8 +609,8 @@ func Test_Telemeter_reportTelemetry(t *testing.T) { assert.Equal(t, llo.LLOStreamValue_Type(7), decoded.StreamValues[0].Type) assert.Equal(t, []byte{8}, decoded.StreamValues[0].Value) assert.Equal(t, []byte{9}, decoded.ChannelOpts) - assert.Equal(t, uint64(10), decoded.SeqNr) - assert.Equal(t, []byte{11}, decoded.ConfigDigest) + assert.Equal(t, opts.SeqNr(), decoded.SeqNr) + assert.Equal(t, cd[:], decoded.ConfigDigest) }) }) } diff --git a/core/services/llo/transmitter.go b/core/services/llo/transmitter.go index 8ab8079477a..0fc3ef55afa 100644 --- a/core/services/llo/transmitter.go +++ b/core/services/llo/transmitter.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "sync" "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" "github.com/smartcontractkit/libocr/offchainreporting2plus/types" @@ -32,6 +33,10 @@ const ( DuplicateReport = 2 ) +type TransmitNotifier interface { + OnTransmit(listen func(digest types.ConfigDigest, seqNr uint64)) +} + type Transmitter interface { llotypes.Transmitter services.Service @@ -41,6 +46,25 @@ type TransmitterRetirementReportCacheWriter interface { StoreAttestedRetirementReport(ctx context.Context, cd ocrtypes.ConfigDigest, seqNr uint64, retirementReport []byte, sigs []types.AttributedOnchainSignature) error } +type onTransmit struct { + mu sync.RWMutex + listeners []func(digest types.ConfigDigest, seqNr uint64) +} + +func (o *onTransmit) OnTransmit(listen func(digest types.ConfigDigest, seqNr uint64)) { + o.mu.Lock() + defer o.mu.Unlock() + o.listeners = append(o.listeners, listen) +} + +func (o *onTransmit) notify(digest types.ConfigDigest, seqNr uint64) { + o.mu.RLock() + defer o.mu.RUnlock() + for _, listener := range o.listeners { + go listener(digest, seqNr) + } +} + type transmitter struct { services.StateMachine lggr logger.Logger @@ -49,6 +73,7 @@ type transmitter struct { subTransmitters []Transmitter retirementReportCache TransmitterRetirementReportCacheWriter + *onTransmit } type TransmitterOpts struct { @@ -99,6 +124,7 @@ func NewTransmitter(opts TransmitterOpts) (Transmitter, error) { opts.FromAccount, subTransmitters, opts.RetirementReportCache, + &onTransmit{}, }, nil } @@ -154,6 +180,8 @@ func (t *transmitter) Transmit( } return nil } + t.notify(digest, seqNr) + g := new(errgroup.Group) for _, st := range t.subTransmitters { g.Go(func() error { diff --git a/core/services/pipeline/common_http.go b/core/services/pipeline/common_http.go index 15dc16af1bd..16de5456f6d 100644 --- a/core/services/pipeline/common_http.go +++ b/core/services/pipeline/common_http.go @@ -3,13 +3,12 @@ package pipeline import ( "bytes" "context" - "encoding/json" "io" "net/http" "time" + "github.com/goccy/go-json" "github.com/pkg/errors" - clhttp "github.com/smartcontractkit/chainlink-common/pkg/http" "github.com/smartcontractkit/chainlink-common/pkg/logger" ) diff --git a/core/services/pipeline/task.bridge.go b/core/services/pipeline/task.bridge.go index 704423cba84..bd52aaebdeb 100644 --- a/core/services/pipeline/task.bridge.go +++ b/core/services/pipeline/task.bridge.go @@ -3,19 +3,18 @@ package pipeline import ( "context" "database/sql" - "encoding/json" stderrors "errors" "net/http" "net/url" "path" "time" + "github.com/goccy/go-json" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink/v2/core/bridges" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/eautils" ) diff --git a/core/services/pipeline/task.jsonparse.go b/core/services/pipeline/task.jsonparse.go index 8de2e3dc18b..d3c6e9f8622 100644 --- a/core/services/pipeline/task.jsonparse.go +++ b/core/services/pipeline/task.jsonparse.go @@ -3,11 +3,11 @@ package pipeline import ( "bytes" "context" - "encoding/json" stderrors "errors" "math/big" "strings" + "github.com/goccy/go-json" "github.com/pkg/errors" "github.com/smartcontractkit/chainlink-common/pkg/logger" diff --git a/deployment/go.mod b/deployment/go.mod index 6eea663ece9..9151fe35f8c 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -402,7 +402,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/chainlink-automation v0.8.1 // indirect github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.2 // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.5 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135 // indirect github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250818175541-3389ac08a563 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250717121125-2350c82883e2 // indirect diff --git a/deployment/go.sum b/deployment/go.sum index 59bd3e81750..b7b124acd42 100644 --- a/deployment/go.sum +++ b/deployment/go.sum @@ -1342,8 +1342,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 h1:hvqATtrZ0 github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.2 h1:g/UmFJa/E1Zmc7NO20ob5SijxQen51DhnqTLr2f7BEc= -github.com/smartcontractkit/chainlink-data-streams v0.1.2/go.mod h1:lxY97sDlDorQAmLGFo6x1tl8SQ2E7adsS0/wU8+mmTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-deployments-framework v0.54.0 h1:KMkw64j2VUMWTGkWTSFcNrWXcY8189kTjc33n2FaA2w= github.com/smartcontractkit/chainlink-deployments-framework v0.54.0/go.mod h1:KmwLKwDuiYo8SfzoGb9TVehbodBU+yj7HuCfzgU6jx0= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251002155240-31abd326e293 h1:MVTDt2N5pYGIxxBMG0CCMu+bSsea+/ZOW9iyPVSkRCQ= diff --git a/go.mod b/go.mod index fe101e17028..e33b009c1f4 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/go-ldap/ldap/v3 v3.4.6 github.com/go-viper/mapstructure/v2 v2.4.0 github.com/go-webauthn/webauthn v0.9.4 + github.com/goccy/go-json v0.10.5 github.com/golang-jwt/jwt/v5 v5.2.3 github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e github.com/google/uuid v1.6.0 @@ -85,7 +86,7 @@ require ( github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250908144012-8184001834b5 github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20250908144012-8184001834b5 github.com/smartcontractkit/chainlink-common v0.9.6-0.20251001150007-98903c79c124 - github.com/smartcontractkit/chainlink-data-streams v0.1.2 + github.com/smartcontractkit/chainlink-data-streams v0.1.5 github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251002155240-31abd326e293 github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250827130336-5922343458be github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135 @@ -247,7 +248,6 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.26.0 // indirect github.com/go-webauthn/x v0.1.5 // indirect - github.com/goccy/go-json v0.10.5 // indirect github.com/goccy/go-yaml v1.12.0 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect github.com/gofrs/flock v0.12.1 // indirect diff --git a/go.sum b/go.sum index 44fb38d4c02..c5d7283f8c3 100644 --- a/go.sum +++ b/go.sum @@ -1119,8 +1119,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 h1:hvqATtrZ0 github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.2 h1:g/UmFJa/E1Zmc7NO20ob5SijxQen51DhnqTLr2f7BEc= -github.com/smartcontractkit/chainlink-data-streams v0.1.2/go.mod h1:lxY97sDlDorQAmLGFo6x1tl8SQ2E7adsS0/wU8+mmTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251002155240-31abd326e293 h1:MVTDt2N5pYGIxxBMG0CCMu+bSsea+/ZOW9iyPVSkRCQ= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251002155240-31abd326e293/go.mod h1:o8MhDlAMwkg5tbWnZKq+A7tzLP87xoMwwn2Y7NXkxwk= github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250827130336-5922343458be h1:NRldnH+Q6v8TjO3sBGo1mL/VRGeaPVneY2L13tCx114= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 837e16e3617..f45c408ecd5 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -483,7 +483,7 @@ require ( github.com/smartcontractkit/ccip-contract-examples/chains/evm v0.0.0-20250826190403-aed7f5f33cde // indirect github.com/smartcontractkit/ccip-owner-contracts v0.1.0 // indirect github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.2 // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.5 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135 // indirect github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250818175541-3389ac08a563 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250717121125-2350c82883e2 // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 784e0125d16..252f2a6e2ed 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1586,8 +1586,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 h1:hvqATtrZ0 github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.2 h1:g/UmFJa/E1Zmc7NO20ob5SijxQen51DhnqTLr2f7BEc= -github.com/smartcontractkit/chainlink-data-streams v0.1.2/go.mod h1:lxY97sDlDorQAmLGFo6x1tl8SQ2E7adsS0/wU8+mmTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-deployments-framework v0.54.0 h1:KMkw64j2VUMWTGkWTSFcNrWXcY8189kTjc33n2FaA2w= github.com/smartcontractkit/chainlink-deployments-framework v0.54.0/go.mod h1:KmwLKwDuiYo8SfzoGb9TVehbodBU+yj7HuCfzgU6jx0= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251002155240-31abd326e293 h1:MVTDt2N5pYGIxxBMG0CCMu+bSsea+/ZOW9iyPVSkRCQ= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 8ddb5aeedf2..c05c843cef3 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -472,7 +472,7 @@ require ( github.com/smartcontractkit/ccip-owner-contracts v0.1.0 // indirect github.com/smartcontractkit/chainlink-automation v0.8.1 // indirect github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.2 // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.5 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135 // indirect github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250818175541-3389ac08a563 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250717121125-2350c82883e2 // indirect diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 7abc140ce9d..a4131604699 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1565,8 +1565,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 h1:hvqATtrZ0 github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.2 h1:g/UmFJa/E1Zmc7NO20ob5SijxQen51DhnqTLr2f7BEc= -github.com/smartcontractkit/chainlink-data-streams v0.1.2/go.mod h1:lxY97sDlDorQAmLGFo6x1tl8SQ2E7adsS0/wU8+mmTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-deployments-framework v0.54.0 h1:KMkw64j2VUMWTGkWTSFcNrWXcY8189kTjc33n2FaA2w= github.com/smartcontractkit/chainlink-deployments-framework v0.54.0/go.mod h1:KmwLKwDuiYo8SfzoGb9TVehbodBU+yj7HuCfzgU6jx0= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251002155240-31abd326e293 h1:MVTDt2N5pYGIxxBMG0CCMu+bSsea+/ZOW9iyPVSkRCQ= diff --git a/plugins/plugins.public.yaml b/plugins/plugins.public.yaml index 61c2727b2ee..61b6991838c 100644 --- a/plugins/plugins.public.yaml +++ b/plugins/plugins.public.yaml @@ -45,7 +45,7 @@ plugins: streams: - moduleURI: "github.com/smartcontractkit/chainlink-data-streams" - gitRef: "v0.1.2" + gitRef: "v0.1.5" installPath: "./mercury/cmd/chainlink-mercury" ton: diff --git a/system-tests/lib/go.mod b/system-tests/lib/go.mod index de2afd453e0..1c8b0e87826 100644 --- a/system-tests/lib/go.mod +++ b/system-tests/lib/go.mod @@ -452,7 +452,7 @@ require ( github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250908144012-8184001834b5 // indirect github.com/smartcontractkit/chainlink-ccip/chains/solana/gobindings v0.0.0-20250908144012-8184001834b5 // indirect github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 // indirect - github.com/smartcontractkit/chainlink-data-streams v0.1.2 // indirect + github.com/smartcontractkit/chainlink-data-streams v0.1.5 // indirect github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135 // indirect github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250818175541-3389ac08a563 // indirect github.com/smartcontractkit/chainlink-framework/chains v0.0.0-20250717121125-2350c82883e2 // indirect diff --git a/system-tests/lib/go.sum b/system-tests/lib/go.sum index 139134794cb..f1cd845cab8 100644 --- a/system-tests/lib/go.sum +++ b/system-tests/lib/go.sum @@ -1581,8 +1581,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 h1:hvqATtrZ0 github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.2 h1:g/UmFJa/E1Zmc7NO20ob5SijxQen51DhnqTLr2f7BEc= -github.com/smartcontractkit/chainlink-data-streams v0.1.2/go.mod h1:lxY97sDlDorQAmLGFo6x1tl8SQ2E7adsS0/wU8+mmTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-deployments-framework v0.54.0 h1:KMkw64j2VUMWTGkWTSFcNrWXcY8189kTjc33n2FaA2w= github.com/smartcontractkit/chainlink-deployments-framework v0.54.0/go.mod h1:KmwLKwDuiYo8SfzoGb9TVehbodBU+yj7HuCfzgU6jx0= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251002155240-31abd326e293 h1:MVTDt2N5pYGIxxBMG0CCMu+bSsea+/ZOW9iyPVSkRCQ= diff --git a/system-tests/tests/go.mod b/system-tests/tests/go.mod index ccc397266fa..688f02f8a39 100644 --- a/system-tests/tests/go.mod +++ b/system-tests/tests/go.mod @@ -43,7 +43,7 @@ require ( github.com/rs/zerolog v1.33.0 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chainlink-common v0.9.6-0.20251001150007-98903c79c124 - github.com/smartcontractkit/chainlink-data-streams v0.1.2 + github.com/smartcontractkit/chainlink-data-streams v0.1.5 github.com/smartcontractkit/chainlink-deployments-framework v0.54.0 github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20250917110014-65bff6568f77 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 diff --git a/system-tests/tests/go.sum b/system-tests/tests/go.sum index 13e954eb4f5..e7cf2805b7b 100644 --- a/system-tests/tests/go.sum +++ b/system-tests/tests/go.sum @@ -1784,8 +1784,8 @@ github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 h1:hvqATtrZ0 github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw= github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0= -github.com/smartcontractkit/chainlink-data-streams v0.1.2 h1:g/UmFJa/E1Zmc7NO20ob5SijxQen51DhnqTLr2f7BEc= -github.com/smartcontractkit/chainlink-data-streams v0.1.2/go.mod h1:lxY97sDlDorQAmLGFo6x1tl8SQ2E7adsS0/wU8+mmTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5 h1:hdc5yy20ylaDML3NGYp/tivm2a5Y+Ysw/e7sJK6eBTc= +github.com/smartcontractkit/chainlink-data-streams v0.1.5/go.mod h1:e9jETTzrVO8iu9Zp5gDuTCmBVhSJwUOk6K4Q/VFrJ6o= github.com/smartcontractkit/chainlink-deployments-framework v0.54.0 h1:KMkw64j2VUMWTGkWTSFcNrWXcY8189kTjc33n2FaA2w= github.com/smartcontractkit/chainlink-deployments-framework v0.54.0/go.mod h1:KmwLKwDuiYo8SfzoGb9TVehbodBU+yj7HuCfzgU6jx0= github.com/smartcontractkit/chainlink-evm v0.3.4-0.20251002155240-31abd326e293 h1:MVTDt2N5pYGIxxBMG0CCMu+bSsea+/ZOW9iyPVSkRCQ=