From e9b3397f465e26209c8c8ccfed66aa9595f8246b Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Wed, 9 Oct 2024 15:40:51 +0200 Subject: [PATCH 1/5] CCIP-3713 Filtering by the 3rd word in USDC Reader (#14694) * Using filtering by the 3rd word directly * Missing changeset * Using filtering by the 3rd word directly * Using filtering by the 3rd word directly --- .changeset/metal-meals-mix.md | 5 ++ .../usdcreader/usdcreader_test.go | 47 ++++++++++++------- .../ccip/configs/evm/contract_reader.go | 14 ++++++ core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 +- go.mod | 2 +- go.sum | 4 +- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 +- integration-tests/load/go.mod | 2 +- integration-tests/load/go.sum | 4 +- 11 files changed, 60 insertions(+), 30 deletions(-) create mode 100644 .changeset/metal-meals-mix.md diff --git a/.changeset/metal-meals-mix.md b/.changeset/metal-meals-mix.md new file mode 100644 index 00000000000..66ebcec9982 --- /dev/null +++ b/.changeset/metal-meals-mix.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Adjustments for usdc reader tests #internal diff --git a/core/capabilities/ccip/ccip_integration_tests/usdcreader/usdcreader_test.go b/core/capabilities/ccip/ccip_integration_tests/usdcreader/usdcreader_test.go index 8637cb67a03..dd4801260bd 100644 --- a/core/capabilities/ccip/ccip_integration_tests/usdcreader/usdcreader_test.go +++ b/core/capabilities/ccip/ccip_integration_tests/usdcreader/usdcreader_test.go @@ -12,8 +12,6 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/crypto" - "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -40,6 +38,8 @@ import ( ) func Test_USDCReader_MessageHashes(t *testing.T) { + finalityDepth := 5 + ctx := testutils.Context(t) ethereumChain := cciptypes.ChainSelector(sel.ETHEREUM_MAINNET_OPTIMISM_1.Selector) ethereumDomainCCTP := reader.CCTPDestDomains[uint64(ethereumChain)] @@ -48,9 +48,10 @@ func Test_USDCReader_MessageHashes(t *testing.T) { polygonChain := cciptypes.ChainSelector(sel.POLYGON_MAINNET.Selector) polygonDomainCCTP := reader.CCTPDestDomains[uint64(polygonChain)] - ts := testSetup(ctx, t, ethereumChain, evmconfig.USDCReaderConfig) + ts := testSetup(ctx, t, ethereumChain, evmconfig.USDCReaderConfig, finalityDepth) usdcReader, err := reader.NewUSDCMessageReader( + logger.TestLogger(t), map[cciptypes.ChainSelector]pluginconfig.USDCCCTPTokenConfig{ ethereumChain: { SourceMessageTransmitterAddr: ts.contractAddr.String(), @@ -67,6 +68,11 @@ func Test_USDCReader_MessageHashes(t *testing.T) { emitMessageSent(t, ts, ethereumDomainCCTP, avalancheDomainCCTP, 41) emitMessageSent(t, ts, ethereumDomainCCTP, polygonDomainCCTP, 31) emitMessageSent(t, ts, ethereumDomainCCTP, polygonDomainCCTP, 41) + // Finalize events + for i := 0; i < finalityDepth; i++ { + ts.sb.Commit() + } + emitMessageSent(t, ts, ethereumDomainCCTP, avalancheDomainCCTP, 51) // Need to replay as sometimes the logs are not picked up by the log poller (?) // Maybe another situation where chain reader doesn't register filters as expected. @@ -167,25 +173,30 @@ func Test_USDCReader_MessageHashes(t *testing.T) { reader.NewMessageTokenID(1, 3), }, }, + { + name: "not finalized events are not returned", + tokens: map[reader.MessageTokenID]cciptypes.RampTokenAmount{ + reader.NewMessageTokenID(1, 5): { + ExtraData: reader.NewSourceTokenDataPayload(51, ethereumDomainCCTP).ToBytes(), + }, + }, + sourceChain: ethereumChain, + destChain: avalancheChain, + expectedMsgIDs: []reader.MessageTokenID{}, + }, } for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - require.Eventually(t, func() bool { - hashes, err1 := usdcReader.MessageHashes(ctx, tc.sourceChain, tc.destChain, tc.tokens) - require.NoError(t, err1) + hashes, err1 := usdcReader.MessageHashes(ctx, tc.sourceChain, tc.destChain, tc.tokens) + require.NoError(t, err1) - if len(tc.expectedMsgIDs) != len(hashes) { - return false - } + require.Equal(t, len(tc.expectedMsgIDs), len(hashes)) - for _, msgID := range tc.expectedMsgIDs { - if _, ok := hashes[msgID]; !ok { - return false - } - } - return true - }, tests.WaitTimeout(t), 50*time.Millisecond) + for _, msgID := range tc.expectedMsgIDs { + _, ok := hashes[msgID] + require.True(t, ok) + } }) } } @@ -207,7 +218,7 @@ func emitMessageSent(t *testing.T, testEnv *testSetupData, source, dest uint32, testEnv.sb.Commit() } -func testSetup(ctx context.Context, t *testing.T, readerChain cciptypes.ChainSelector, cfg evmtypes.ChainReaderConfig) *testSetupData { +func testSetup(ctx context.Context, t *testing.T, readerChain cciptypes.ChainSelector, cfg evmtypes.ChainReaderConfig, depth int) *testSetupData { const chainID = 1337 // Generate a new key pair for the simulated account @@ -239,7 +250,7 @@ func testSetup(ctx context.Context, t *testing.T, readerChain cciptypes.ChainSel db := pgtest.NewSqlxDB(t) lpOpts := logpoller.Opts{ PollPeriod: time.Millisecond, - FinalityDepth: 0, + FinalityDepth: int64(depth), BackfillBatchSize: 10, RpcBatchSize: 10, KeepFinalizedBlocksDepth: 100000, diff --git a/core/capabilities/ccip/configs/evm/contract_reader.go b/core/capabilities/ccip/configs/evm/contract_reader.go index 07ec7de4eb2..bbbcc6faa93 100644 --- a/core/capabilities/ccip/configs/evm/contract_reader.go +++ b/core/capabilities/ccip/configs/evm/contract_reader.go @@ -271,6 +271,16 @@ var USDCReaderConfig = evmrelaytypes.ChainReaderConfig{ consts.EventNameCCTPMessageSent: { ChainSpecificName: consts.EventNameCCTPMessageSent, ReadType: evmrelaytypes.Event, + EventDefinitions: &evmrelaytypes.EventDefinitions{ + GenericDataWordDetails: map[string]evmrelaytypes.DataWordDetail{ + consts.CCTPMessageSentValue: { + Name: consts.CCTPMessageSentValue, + // Filtering by the 3rd word (indexing starts from 0) so it's ptr(2) + Index: ptr(2), + Type: "bytes32", + }, + }, + }, }, }, }, @@ -327,3 +337,7 @@ func mustGetEventName(event string, tabi abi.ABI) string { } return e.Name } + +func ptr[T any](v T) *T { + return &v +} diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 2a376f22b77..af7ddc2f98f 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -278,7 +278,7 @@ require ( github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/shirou/gopsutil/v3 v3.24.3 // indirect github.com/smartcontractkit/chain-selectors v1.0.23 // indirect - github.com/smartcontractkit/chainlink-ccip v0.0.0-20241008154535-20ff51a45385 // indirect + github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb // indirect github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 // indirect github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 // indirect github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 1b321c04875..684155bba51 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1097,8 +1097,8 @@ github.com/smartcontractkit/chain-selectors v1.0.23 h1:D2Eaex4Cw/O7Lg3tX6WklOqnj github.com/smartcontractkit/chain-selectors v1.0.23/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-ccip v0.0.0-20241008154535-20ff51a45385 h1:BLaoELbQm9Mr+v+TzqWIY/zp7E+zXaW/kfaDcobOF3k= -github.com/smartcontractkit/chainlink-ccip v0.0.0-20241008154535-20ff51a45385/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= +github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ= +github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts= github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= diff --git a/go.mod b/go.mod index 8c4fda45ae2..93696685c60 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,7 @@ require ( github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.23 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-ccip v0.0.0-20241008154535-20ff51a45385 + github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 diff --git a/go.sum b/go.sum index c3829de0d35..2e4a7775a60 100644 --- a/go.sum +++ b/go.sum @@ -1058,8 +1058,8 @@ github.com/smartcontractkit/chain-selectors v1.0.23 h1:D2Eaex4Cw/O7Lg3tX6WklOqnj github.com/smartcontractkit/chain-selectors v1.0.23/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-ccip v0.0.0-20241008154535-20ff51a45385 h1:BLaoELbQm9Mr+v+TzqWIY/zp7E+zXaW/kfaDcobOF3k= -github.com/smartcontractkit/chainlink-ccip v0.0.0-20241008154535-20ff51a45385/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= +github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ= +github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts= github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index b8e945f90f5..d3ad17f908c 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -39,7 +39,7 @@ require ( github.com/smartcontractkit/ccip-owner-contracts v0.0.0-20240926212305-a6deabdfce86 github.com/smartcontractkit/chain-selectors v1.0.23 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-ccip v0.0.0-20241008154535-20ff51a45385 + github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd github.com/smartcontractkit/chainlink-testing-framework/havoc v1.50.0 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.9 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 84c7a40fb29..90c7f08b0f9 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1437,8 +1437,8 @@ github.com/smartcontractkit/chain-selectors v1.0.23 h1:D2Eaex4Cw/O7Lg3tX6WklOqnj github.com/smartcontractkit/chain-selectors v1.0.23/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-ccip v0.0.0-20241008154535-20ff51a45385 h1:BLaoELbQm9Mr+v+TzqWIY/zp7E+zXaW/kfaDcobOF3k= -github.com/smartcontractkit/chainlink-ccip v0.0.0-20241008154535-20ff51a45385/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= +github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ= +github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts= github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index d0826a0e790..8b3fed5432b 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -391,7 +391,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/smartcontractkit/chain-selectors v1.0.23 // indirect github.com/smartcontractkit/chainlink-automation v1.0.4 // indirect - github.com/smartcontractkit/chainlink-ccip v0.0.0-20241008154535-20ff51a45385 // indirect + github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb // indirect github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 // indirect github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 // indirect github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f // indirect diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 779f0ccd724..8bdc23dc35f 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1413,8 +1413,8 @@ github.com/smartcontractkit/chain-selectors v1.0.23 h1:D2Eaex4Cw/O7Lg3tX6WklOqnj github.com/smartcontractkit/chain-selectors v1.0.23/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-ccip v0.0.0-20241008154535-20ff51a45385 h1:BLaoELbQm9Mr+v+TzqWIY/zp7E+zXaW/kfaDcobOF3k= -github.com/smartcontractkit/chainlink-ccip v0.0.0-20241008154535-20ff51a45385/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= +github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ= +github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts= github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= From fa1a1e2706fc0d1529c022fcd75085d2e9b4f972 Mon Sep 17 00:00:00 2001 From: Connor Stein Date: Wed, 9 Oct 2024 10:07:47 -0400 Subject: [PATCH 2/5] Support RMN in tooling (#14682) * RMN init * Scaffolding * Local only * Fix import cycle * Improve readability * Simplifying more * Fixes * More readability * Drop always pull, images working * change to evmNetworks * Fix key extraction * Lint and RMN names * Comments * More comments * Fix test --------- Co-authored-by: AnieeG Co-authored-by: Anindita Ghosh <88458927+AnieeG@users.noreply.github.com> --- integration-tests/deployment/ccip/rmn_test.go | 18 + .../deployment/ccip/test_helpers.go | 134 +++++- .../deployment/devenv/build_env.go | 30 +- integration-tests/deployment/devenv/rmn.go | 384 ++++++++++++++++++ .../deployment/devenv/rmn_config.go | 137 +++++++ .../deployment/devenv/rmn_test.go | 15 + integration-tests/smoke/ccip_test.go | 2 +- integration-tests/testconfig/ccip/config.go | 9 + 8 files changed, 717 insertions(+), 12 deletions(-) create mode 100644 integration-tests/deployment/ccip/rmn_test.go create mode 100644 integration-tests/deployment/devenv/rmn.go create mode 100644 integration-tests/deployment/devenv/rmn_config.go create mode 100644 integration-tests/deployment/devenv/rmn_test.go diff --git a/integration-tests/deployment/ccip/rmn_test.go b/integration-tests/deployment/ccip/rmn_test.go new file mode 100644 index 00000000000..0ece72eadd3 --- /dev/null +++ b/integration-tests/deployment/ccip/rmn_test.go @@ -0,0 +1,18 @@ +package ccipdeployment + +import ( + "testing" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +func TestRMN(t *testing.T) { + t.Skip("Local only") + // TODO: needs to return RMN peerIDs. + _, rmnCluster := NewLocalDevEnvironmentWithRMN(t, logger.TestLogger(t)) + for rmnNode, rmn := range rmnCluster.Nodes { + t.Log(rmnNode, rmn.Proxy.PeerID, rmn.RMN.OffchainPublicKey, rmn.RMN.EVMOnchainPublicKey) + } + // Use peerIDs to set RMN config. + // Add a lane, send a message. +} diff --git a/integration-tests/deployment/ccip/test_helpers.go b/integration-tests/deployment/ccip/test_helpers.go index f4c80f095bc..3b351c1b84f 100644 --- a/integration-tests/deployment/ccip/test_helpers.go +++ b/integration-tests/deployment/ccip/test_helpers.go @@ -12,6 +12,10 @@ import ( "github.com/pkg/errors" "golang.org/x/sync/errgroup" + "github.com/smartcontractkit/chainlink-testing-framework/lib/blockchain" + + "github.com/smartcontractkit/chainlink-ccip/pluginconfig" + cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3" "github.com/smartcontractkit/chainlink-testing-framework/lib/utils/testcontext" "github.com/smartcontractkit/chainlink-testing-framework/lib/logging" @@ -28,6 +32,7 @@ import ( jobv1 "github.com/smartcontractkit/chainlink/integration-tests/deployment/jd/job/v1" "github.com/smartcontractkit/chainlink/integration-tests/deployment/memory" "github.com/smartcontractkit/chainlink/integration-tests/docker/test_env" + "github.com/smartcontractkit/chainlink/integration-tests/testconfig" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -251,7 +256,7 @@ func (d DeployedLocalDevEnvironment) RestartChainlinkNodes(t *testing.T) error { return errGrp.Wait() } -func NewLocalDevEnvironment(t *testing.T, lggr logger.Logger) DeployedEnv { +func NewLocalDevEnvironment(t *testing.T, lggr logger.Logger) (DeployedEnv, *test_env.CLClusterTestEnv, testconfig.TestConfig) { ctx := testcontext.Get(t) // create a local docker environment with simulated chains and job-distributor // we cannot create the chainlink nodes yet as we need to deploy the capability registry first @@ -292,7 +297,134 @@ func NewLocalDevEnvironment(t *testing.T, lggr logger.Logger) DeployedEnv { FeedChainSel: feedSel, ReplayBlocks: replayBlocks, FeeTokenContracts: feeContracts, + }, testEnv, cfg +} + +func NewLocalDevEnvironmentWithRMN(t *testing.T, lggr logger.Logger) (DeployedEnv, devenv.RMNCluster) { + tenv, dockerenv, _ := NewLocalDevEnvironment(t, lggr) + state, err := LoadOnchainState(tenv.Env, tenv.Ab) + require.NoError(t, err) + + feeds := state.Chains[tenv.FeedChainSel].USDFeeds + tokenConfig := NewTokenConfig() + tokenConfig.UpsertTokenInfo(LinkSymbol, + pluginconfig.TokenInfo{ + AggregatorAddress: feeds[LinkSymbol].Address().String(), + Decimals: LinkDecimals, + DeviationPPB: cciptypes.NewBigIntFromInt64(1e9), + }, + ) + // Deploy CCIP contracts. + err = DeployCCIPContracts(tenv.Env, tenv.Ab, DeployCCIPContractConfig{ + HomeChainSel: tenv.HomeChainSel, + FeedChainSel: tenv.FeedChainSel, + ChainsToDeploy: tenv.Env.AllChainSelectors(), + TokenConfig: tokenConfig, + MCMSConfig: NewTestMCMSConfig(t, tenv.Env), + CapabilityRegistry: state.Chains[tenv.HomeChainSel].CapabilityRegistry.Address(), + FeeTokenContracts: tenv.FeeTokenContracts, + }) + require.NoError(t, err) + l := logging.GetTestLogger(t) + config := GenerateTestRMNConfig(t, 1, tenv, MustNetworksToRPCMap(dockerenv.EVMNetworks)) + rmnCluster, err := devenv.NewRMNCluster( + t, l, + []string{dockerenv.DockerNetwork.Name}, + config, + "rageproxy", + "latest", + "afn2proxy", + "latest", + dockerenv.LogStream, + ) + require.NoError(t, err) + return tenv, *rmnCluster +} + +func MustNetworksToRPCMap(evmNetworks []*blockchain.EVMNetwork) map[uint64]string { + rpcs := make(map[uint64]string) + for _, network := range evmNetworks { + sel, err := chainsel.SelectorFromChainId(uint64(network.ChainID)) + if err != nil { + panic(err) + } + rpcs[sel] = network.HTTPURLs[0] + } + return rpcs +} + +func MustCCIPNameToRMNName(a string) string { + m := map[string]string{ + chainsel.GETH_TESTNET.Name: "DevnetAlpha", + chainsel.GETH_DEVNET_2.Name: "DevnetBeta", + // TODO: Add more as needed. + } + v, ok := m[a] + if !ok { + panic(fmt.Sprintf("no mapping for %s", a)) + } + return v +} + +func GenerateTestRMNConfig(t *testing.T, nRMNNodes int, tenv DeployedEnv, rpcMap map[uint64]string) map[string]devenv.RMNConfig { + // Find the bootstrappers. + nodes, err := deployment.NodeInfo(tenv.Env.NodeIDs, tenv.Env.Offchain) + require.NoError(t, err) + bootstrappers := nodes.BootstrapLocators() + + // Just set all RMN nodes to support all chains. + state, err := LoadOnchainState(tenv.Env, tenv.Ab) + require.NoError(t, err) + var remoteChains []devenv.RemoteChain + var rpcs []devenv.Chain + for chainSel, chain := range state.Chains { + c, _ := chainsel.ChainBySelector(chainSel) + rmnName := MustCCIPNameToRMNName(c.Name) + remoteChains = append(remoteChains, devenv.RemoteChain{ + Name: rmnName, + Stability: devenv.Stability{Type: "stable"}, + StartBlockNumber: 0, + OffRamp: chain.OffRamp.Address().String(), + RMNRemote: chain.RMNRemote.Address().String(), + }) + rpcs = append(rpcs, devenv.Chain{ + Name: rmnName, + RPC: rpcMap[chainSel], + }) + } + hc, _ := chainsel.ChainBySelector(tenv.HomeChainSel) + shared := devenv.SharedConfig{ + Networking: devenv.Networking{ + RageProxy: devenv.DefaultRageProxy, + Bootstrappers: bootstrappers, + }, + HomeChain: devenv.HomeChain{ + Name: MustCCIPNameToRMNName(hc.Name), + CapabilitiesRegistry: state.Chains[tenv.HomeChainSel].CapabilityRegistry.Address().String(), + CCIPConfig: state.Chains[tenv.HomeChainSel].CCIPConfig.Address().String(), + // TODO: RMNHome + }, + RemoteChains: remoteChains, + } + + rmnConfig := make(map[string]devenv.RMNConfig) + for i := 0; i < nRMNNodes; i++ { + // Listen addresses _should_ be able to operator on the same port since + // they are inside the docker network. + proxyLocal := devenv.ProxyLocalConfig{ + ListenAddresses: []string{devenv.DefaultProxyListenAddress}, + AnnounceAddresses: []string{}, + ProxyAddress: devenv.DefaultRageProxy, + DiscovererDbPath: devenv.DefaultDiscovererDbPath, + } + rmnConfig[fmt.Sprintf("rmn_%d", i)] = devenv.RMNConfig{ + Shared: shared, + Local: devenv.LocalConfig{Chains: rpcs}, + ProxyShared: devenv.DefaultRageProxySharedConfig, + ProxyLocal: proxyLocal, + } } + return rmnConfig } // AddLanesForAll adds densely connected lanes for all chains in the environment so that each chain diff --git a/integration-tests/deployment/devenv/build_env.go b/integration-tests/deployment/devenv/build_env.go index 881c49ce65f..c564c4e068a 100644 --- a/integration-tests/deployment/devenv/build_env.go +++ b/integration-tests/deployment/devenv/build_env.go @@ -19,6 +19,8 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc/credentials/insecure" + "github.com/smartcontractkit/chainlink-testing-framework/lib/blockchain" + "github.com/smartcontractkit/chainlink-testing-framework/lib/utils/testcontext" "github.com/smartcontractkit/chainlink-testing-framework/lib/utils/conversions" @@ -80,6 +82,21 @@ func CreateDockerEnv(t *testing.T) ( env, err := builder.Build() require.NoError(t, err, "Error building test environment") + // we need to update the URLs for the simulated networks to the private chain RPCs in the docker test environment + // so that the chainlink nodes and rmn nodes can internally connect to the chain + env.EVMNetworks = []*blockchain.EVMNetwork{} + for i, net := range evmNetworks { + // if network is simulated, update the URLs with private chain RPCs in the docker test environment + // so that nodes can internally connect to the chain + if net.Simulated { + rpcProvider, err := env.GetRpcProvider(net.ChainID) + require.NoError(t, err, "Error getting rpc provider") + evmNetworks[i].HTTPURLs = rpcProvider.PrivateHttpUrls() + evmNetworks[i].URLs = rpcProvider.PrivateWsUrsl() + } + env.EVMNetworks = append(env.EVMNetworks, &evmNetworks[i]) + } + chains := CreateChainConfigFromNetworks(t, env, privateEthereumNetworks, cfg.GetNetworkConfig()) jdConfig := JDConfig{ @@ -123,16 +140,9 @@ func StartChainlinkNodes( env *test_env.CLClusterTestEnv, cfg tc.TestConfig, ) error { - evmNetworks := networks.MustGetSelectedNetworkConfig(cfg.GetNetworkConfig()) - for i, net := range evmNetworks { - // if network is simulated, update the URLs with private chain RPCs in the docker test environment - // so that nodes can internally connect to the chain - if net.Simulated { - rpcProvider, err := env.GetRpcProvider(net.ChainID) - require.NoError(t, err, "Error getting rpc provider") - evmNetworks[i].HTTPURLs = rpcProvider.PrivateHttpUrls() - evmNetworks[i].URLs = rpcProvider.PrivateWsUrsl() - } + var evmNetworks []blockchain.EVMNetwork + for i := range env.EVMNetworks { + evmNetworks = append(evmNetworks, *env.EVMNetworks[i]) } noOfNodes := pointer.GetInt(cfg.CCIP.CLNode.NoOfPluginNodes) + pointer.GetInt(cfg.CCIP.CLNode.NoOfBootstraps) if env.ClCluster == nil { diff --git a/integration-tests/deployment/devenv/rmn.go b/integration-tests/deployment/devenv/rmn.go new file mode 100644 index 00000000000..a877c4fdfab --- /dev/null +++ b/integration-tests/deployment/devenv/rmn.go @@ -0,0 +1,384 @@ +package devenv + +import ( + "context" + "crypto/ed25519" + "encoding/json" + "fmt" + "io" + "net" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/rs/zerolog" + tc "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/exec" + tcwait "github.com/testcontainers/testcontainers-go/wait" + + "github.com/smartcontractkit/chainlink-testing-framework/lib/docker" + "github.com/smartcontractkit/chainlink-testing-framework/lib/docker/test_env" + "github.com/smartcontractkit/chainlink-testing-framework/lib/logging" + "github.com/smartcontractkit/chainlink-testing-framework/lib/logstream" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +const ( + RMNKeyStore = "keystore/afn2proxy-keystore.json" + ProxyKeyStore = "keystore/rageproxy-keystore.json" +) + +type RageProxy struct { + test_env.EnvComponent + proxyListenerPort string + proxyPort string + Passphrase string + Local ProxyLocalConfig + Shared ProxySharedConfig + + // Generated on first time boot. + // Needed for RMHHome. + PeerID p2ptypes.PeerID +} + +func NewRage2ProxyComponent( + networks []string, + name, + imageName, + imageVersion string, + local ProxyLocalConfig, + shared ProxySharedConfig, + logStream *logstream.LogStream, +) (*RageProxy, error) { + rageName := fmt.Sprintf("%s-proxy-%s", name, uuid.NewString()[0:8]) + + // TODO support multiple listeners + _, listenPort, err := net.SplitHostPort(local.ListenAddresses[0]) + if err != nil { + return nil, err + } + _, proxyPort, err := net.SplitHostPort(local.ProxyAddress) + if err != nil { + return nil, err + } + + rmn := &RageProxy{ + EnvComponent: test_env.EnvComponent{ + ContainerName: rageName, + ContainerImage: imageName, + ContainerVersion: imageVersion, + Networks: networks, + LogStream: logStream, + }, + Passphrase: DefaultAFNPasphrase, + proxyListenerPort: listenPort, + proxyPort: proxyPort, + Local: local, + Shared: shared, + } + return rmn, nil +} + +func extractPeerID(b []byte) (p2ptypes.PeerID, error) { + var keystore struct { + AdditionalData string `json:"additionalData"` + } + if err := json.Unmarshal(b, &keystore); err != nil { + return p2ptypes.PeerID{}, err + } + var additionalData struct { + PeerID string `json:"PeerID"` + } + if err := json.Unmarshal([]byte(keystore.AdditionalData), &additionalData); err != nil { + return p2ptypes.PeerID{}, err + } + var peerID p2ptypes.PeerID + if err := peerID.UnmarshalText([]byte(additionalData.PeerID)); err != nil { + return p2ptypes.PeerID{}, err + } + return peerID, nil +} + +func (proxy *RageProxy) Start(t *testing.T, lggr zerolog.Logger) (tc.Container, error) { + sharedRageProxy, err := proxy.Shared.rageProxyShared() + if err != nil { + return nil, err + } + localRageProxy, err := proxy.Local.rageProxyLocal() + if err != nil { + return nil, err + } + + l := tc.Logger + if t != nil { + l = logging.CustomT{ + T: t, + L: lggr, + } + } + container, err := docker.StartContainerWithRetry(lggr, tc.GenericContainerRequest{ + ContainerRequest: tc.ContainerRequest{ + Name: proxy.ContainerName, + Image: fmt.Sprintf("%s:%s", proxy.ContainerImage, proxy.ContainerVersion), + Env: map[string]string{ + "RAGEPROXY_PASSPHRASE": proxy.Passphrase, + }, + ExposedPorts: []string{ + test_env.NatPortFormat(proxy.proxyPort), + test_env.NatPortFormat(proxy.proxyListenerPort), + }, + Files: []tc.ContainerFile{ + { + HostFilePath: sharedRageProxy, + ContainerFilePath: "/app/cfg/rageproxy-shared.json", + FileMode: 0644, + }, + { + HostFilePath: localRageProxy, + ContainerFilePath: "/app/cfg/rageproxy-local.json", + FileMode: 0644, + }, + }, + WaitingFor: tcwait.ForExec([]string{"cat", ProxyKeyStore}), + LifecycleHooks: []tc.ContainerLifecycleHooks{ + { + PostStarts: proxy.PostStartsHooks, + PostStops: proxy.PostStopsHooks, + PreTerminates: proxy.PreTerminatesHooks, + }, + }, + }, + Started: true, + Logger: l, + }) + if err != nil { + return nil, err + } + _, reader, err := container.Exec(context.Background(), []string{ + "cat", ProxyKeyStore}, exec.Multiplexed()) + if err != nil { + return nil, errors.Wrapf(err, "Unable to cat keystore") + } + b, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + peerID, err := extractPeerID(b) + if err != nil { + return nil, errors.Wrapf(err, "Unable to extract peerID %s", string(b)) + } + proxy.PeerID = peerID + proxy.Container = container + return container, nil +} + +type AFN2Proxy struct { + test_env.EnvComponent + AFNPassphrase string + Shared SharedConfig + Local LocalConfig + + // Generated on boot + OffchainPublicKey ed25519.PublicKey // RMNHome + EVMOnchainPublicKey common.Address // RMNRemote +} + +func NewAFN2ProxyComponent( + networks []string, + name, + imageName, + imageVersion string, + shared SharedConfig, + local LocalConfig, + logStream *logstream.LogStream) (*AFN2Proxy, error) { + afnName := fmt.Sprintf("%s-%s", name, uuid.NewString()[0:8]) + rmn := &AFN2Proxy{ + EnvComponent: test_env.EnvComponent{ + ContainerName: afnName, + ContainerImage: imageName, + ContainerVersion: imageVersion, + Networks: networks, + LogStream: logStream, + }, + AFNPassphrase: DefaultAFNPasphrase, + Shared: shared, + Local: local, + } + + return rmn, nil +} + +func extractKeys(b []byte) (common.Address, ed25519.PublicKey, error) { + var keystore struct { + AssociatedData string `json:"associated_data"` + } + if err := json.Unmarshal(b, &keystore); err != nil { + return common.Address{}, ed25519.PublicKey{}, err + } + var associatedData struct { + OffchainPublicKey string `json:"offchain_public_key"` + EVMOnchainPublicKey string `json:"evm_onchain_public_key"` + } + if err := json.Unmarshal([]byte(keystore.AssociatedData), &associatedData); err != nil { + return common.Address{}, ed25519.PublicKey{}, err + } + offchainKey, err := hexutil.Decode(associatedData.OffchainPublicKey) + if err != nil { + return common.Address{}, ed25519.PublicKey{}, err + } + if len(offchainKey) != ed25519.PublicKeySize { + return common.Address{}, ed25519.PublicKey{}, fmt.Errorf("invalid offchain public key: %x", offchainKey) + } + return common.HexToAddress(associatedData.EVMOnchainPublicKey), offchainKey, nil +} + +func (rmn *AFN2Proxy) Start(t *testing.T, lggr zerolog.Logger, reuse bool) (tc.Container, error) { + localAFN2Proxy, err := rmn.Local.afn2ProxyLocalConfigFile() + if err != nil { + return nil, err + } + sharedAFN2Proxy, err := rmn.Shared.afn2ProxySharedConfigFile() + if err != nil { + return nil, err + } + + l := tc.Logger + if t != nil { + l = logging.CustomT{ + T: t, + L: lggr, + } + } + container, err := docker.StartContainerWithRetry(lggr, tc.GenericContainerRequest{ + ContainerRequest: tc.ContainerRequest{ + Name: rmn.ContainerName, + Image: fmt.Sprintf("%s:%s", rmn.ContainerImage, rmn.ContainerVersion), + Env: map[string]string{ + "AFN_PASSPHRASE": rmn.AFNPassphrase, + }, + Files: []tc.ContainerFile{ + { + HostFilePath: sharedAFN2Proxy, + ContainerFilePath: "/app/cfg/afn2proxy-shared.toml", + FileMode: 0644, + }, + { + HostFilePath: localAFN2Proxy, + ContainerFilePath: "/app/cfg/afn2proxy-local.toml", + FileMode: 0644, + }, + }, + WaitingFor: tcwait.ForExec([]string{"cat", RMNKeyStore}), + LifecycleHooks: []tc.ContainerLifecycleHooks{ + { + PostStarts: rmn.PostStartsHooks, + PostStops: rmn.PostStopsHooks, + PreTerminates: rmn.PreTerminatesHooks, + }, + }, + }, + Started: true, + Reuse: reuse, + Logger: l, + }) + if err != nil { + return nil, err + } + _, reader, err := container.Exec(context.Background(), []string{ + "cat", RMNKeyStore}, exec.Multiplexed()) + if err != nil { + return nil, errors.Wrapf(err, "Unable to cat keystore") + } + b, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + onchainPubKey, offchainPubKey, err := extractKeys(b) + if err != nil { + return nil, errors.Wrapf(err, "Unable to extract peerID %s", string(b)) + } + rmn.OffchainPublicKey = offchainPubKey + rmn.EVMOnchainPublicKey = onchainPubKey + rmn.Container = container + return container, nil +} + +type RMNNode struct { + RMN AFN2Proxy + Proxy RageProxy +} + +func (n *RMNNode) Start(t *testing.T, lggr zerolog.Logger) error { + _, err := n.Proxy.Start(t, lggr) + if err != nil { + return err + } + _, err = n.RMN.Start(t, lggr, false) + if err != nil { + return err + } + return nil +} + +type RMNCluster struct { + Nodes map[string]RMNNode + t *testing.T + l zerolog.Logger +} + +// NewRMNCluster creates a new RMNCluster with the given configuration +// and starts it. +func NewRMNCluster( + t *testing.T, + l zerolog.Logger, + networks []string, + config map[string]RMNConfig, + proxyImage string, + proxyVersion string, + rmnImage string, + rmnVersion string, + logStream *logstream.LogStream, +) (*RMNCluster, error) { + rmn := &RMNCluster{ + t: t, + l: l, + Nodes: make(map[string]RMNNode), + } + for name, rmnConfig := range config { + proxy, err := NewRage2ProxyComponent(networks, name, proxyImage, proxyVersion, rmnConfig.ProxyLocal, rmnConfig.ProxyShared, logStream) + if err != nil { + return nil, err + } + _, err = proxy.Start(t, l) + if err != nil { + return nil, err + } + + // TODO: Hack here is we overwrite the host with the container name + // since the RMN node needs to be able to reach its own proxy container. + proxyName, err := proxy.Container.Name(context.Background()) + if err != nil { + return nil, err + } + _, port, err := net.SplitHostPort(rmnConfig.Shared.Networking.RageProxy) + if err != nil { + return nil, err + } + rmnConfig.Shared.Networking.RageProxy = fmt.Sprintf("%s:%s", proxyName, port) + afn, err := NewAFN2ProxyComponent(networks, name, rmnImage, rmnVersion, rmnConfig.Shared, rmnConfig.Local, logStream) + if err != nil { + return nil, err + } + _, err = afn.Start(t, l, false) + if err != nil { + return nil, err + } + rmn.Nodes[name] = RMNNode{ + RMN: *afn, + Proxy: *proxy, + } + } + return rmn, nil +} diff --git a/integration-tests/deployment/devenv/rmn_config.go b/integration-tests/deployment/devenv/rmn_config.go new file mode 100644 index 00000000000..798a988a5e8 --- /dev/null +++ b/integration-tests/deployment/devenv/rmn_config.go @@ -0,0 +1,137 @@ +package devenv + +import ( + "encoding/json" + "fmt" + "os" + + "github.com/pelletier/go-toml/v2" +) + +const ( + DefaultAFNPasphrase = "my-not-so-secret-passphrase" + DefaultRageProxy = "127.0.0.1:8081" + DefaultProxyListenAddress = "127.0.0.1:8080" + DefaultDiscovererDbPath = "/app/rageproxy-discoverer-db.json" +) + +var ( + DefaultRageProxySharedConfig = ProxySharedConfig{ + Host: HostConfig{ + DurationBetweenDials: 1000000000, + }, + Discoverer: DiscovererConfig{ + DeltaReconcile: 1000000000, + }, + } +) + +type Networking struct { + RageProxy string `toml:"rageproxy"` + Bootstrappers []string `toml:"bootstrappers"` +} + +type HomeChain struct { + Name string `toml:"name"` + CapabilitiesRegistry string `toml:"capabilities_registry"` + CCIPConfig string `toml:"ccip_config"` + RMNHome string `toml:"rmn_home"` +} + +type Stability struct { + Type string `toml:"type"` +} + +type RemoteChain struct { + Name string `toml:"name"` + Stability Stability `toml:"stability"` + StartBlockNumber int `toml:"start_block_number"` + OffRamp string `toml:"off_ramp"` + RMNRemote string `toml:"rmn_remote"` +} + +type SharedConfig struct { + Networking Networking `toml:"networking"` + HomeChain HomeChain `toml:"home_chain"` + RemoteChains []RemoteChain `toml:"remote_chains"` +} + +func (s SharedConfig) afn2ProxySharedConfigFile() (string, error) { + data, err := toml.Marshal(s) + if err != nil { + return "", fmt.Errorf("failed to marshal afn2Proxy shared config: %w", err) + } + return CreateTempFile(data, "afn2proxy_shared") +} + +type LocalConfig struct { + Chains []Chain `toml:"chains"` +} + +func (l LocalConfig) afn2ProxyLocalConfigFile() (string, error) { + data, err := toml.Marshal(l) + if err != nil { + return "", fmt.Errorf("failed to marshal afn2Proxy local config: %w", err) + } + return CreateTempFile(data, "afn2proxy_local") +} + +type Chain struct { + Name string `toml:"name"` + RPC string `toml:"rpc"` +} + +type ProxyLocalConfig struct { + ListenAddresses []string `json:"ListenAddresses"` + AnnounceAddresses []string `json:"AnnounceAddresses"` + ProxyAddress string `json:"ProxyAddress"` + DiscovererDbPath string `json:"DiscovererDbPath"` +} + +func (l ProxyLocalConfig) rageProxyLocal() (string, error) { + data, err := json.Marshal(l) + if err != nil { + return "", fmt.Errorf("failed to marshal rageProxy local config: %w", err) + } + return CreateTempFile(data, "rageproxy_local") +} + +type HostConfig struct { + DurationBetweenDials int64 `json:"DurationBetweenDials"` +} + +type DiscovererConfig struct { + DeltaReconcile int64 `json:"DeltaReconcile"` +} + +type ProxySharedConfig struct { + Host HostConfig `json:"Host"` + Discoverer DiscovererConfig `json:"Discoverer"` +} + +func (s ProxySharedConfig) rageProxyShared() (string, error) { + data, err := json.Marshal(s) + if err != nil { + return "", fmt.Errorf("failed to marshal rageProxy shared config: %w", err) + } + return CreateTempFile(data, "rageproxy_shared") +} + +type RMNConfig struct { + Shared SharedConfig + Local LocalConfig + ProxyLocal ProxyLocalConfig + ProxyShared ProxySharedConfig +} + +func CreateTempFile(data []byte, pattern string) (string, error) { + file, err := os.CreateTemp("", pattern) + if err != nil { + return "", fmt.Errorf("failed to create temp file for %s: %w", pattern, err) + } + _, err = file.Write(data) + if err != nil { + return "", fmt.Errorf("failed to write %s: %w", pattern, err) + } + return file.Name(), nil +} diff --git a/integration-tests/deployment/devenv/rmn_test.go b/integration-tests/deployment/devenv/rmn_test.go new file mode 100644 index 00000000000..7d07670ff7d --- /dev/null +++ b/integration-tests/deployment/devenv/rmn_test.go @@ -0,0 +1,15 @@ +package devenv + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExtractPeerID(t *testing.T) { + data := []byte("{\"Salt\":\"yAUVSNGPnY78hJwJwKDBmA==\",\"Nonce\":\"8p0vNKpnQH1P+7/cg2dM3vNlc60tzPl0\",\"Ciphertext\":\"VboRchQzDx/+zIVDWyTXIEwo4Ej0s7O6kQqmgCqW+A+HEXl6bI02W1Y2T88XuY0m44eC9DvSpBPhs/VLtgymj6nBcl+nzfJHLMp2pBMPjKHxgNsEk34mDgnmqYKTtIkgzv7hEyy7j0CAcR/RxkjfQNKpfWOlNtAFryFO+w==\",\"AdditionalData\":\"{\\\"PeerID\\\":\\\"12D3KooWNqugYSJw9thwwu1PC3aEpPsBxMZM2EdzpJXGesRG4E8n\\\"}\"}") + peerID, err := extractPeerID(data) + require.NoError(t, err) + assert.Equal(t, "12D3KooWNqugYSJw9thwwu1PC3aEpPsBxMZM2EdzpJXGesRG4E8n", peerID.String()) +} diff --git a/integration-tests/smoke/ccip_test.go b/integration-tests/smoke/ccip_test.go index 6108d3889d6..c4fd42a1bec 100644 --- a/integration-tests/smoke/ccip_test.go +++ b/integration-tests/smoke/ccip_test.go @@ -19,7 +19,7 @@ import ( func TestInitialDeployOnLocal(t *testing.T) { lggr := logger.TestLogger(t) ctx := ccdeploy.Context(t) - tenv := ccdeploy.NewLocalDevEnvironment(t, lggr) + tenv, _, _ := ccdeploy.NewLocalDevEnvironment(t, lggr) e := tenv.Env state, err := ccdeploy.LoadOnchainState(tenv.Env, tenv.Ab) diff --git a/integration-tests/testconfig/ccip/config.go b/integration-tests/testconfig/ccip/config.go index bf487ed3940..53c1afaeb0b 100644 --- a/integration-tests/testconfig/ccip/config.go +++ b/integration-tests/testconfig/ccip/config.go @@ -34,6 +34,15 @@ type Config struct { JobDistributorConfig JDConfig `toml:",omitempty"` HomeChainSelector *string `toml:",omitempty"` FeedChainSelector *string `toml:",omitempty"` + RMNConfig RMNConfig `toml:",omitempty"` +} + +type RMNConfig struct { + NoOfNodes *int `toml:",omitempty"` + ProxyImage *string `toml:",omitempty"` + ProxyVersion *string `toml:",omitempty"` + AFNImage *string `toml:",omitempty"` + AFNVersion *string `toml:",omitempty"` } type NodeConfig struct { From 4c3e7ec8c3b334a13be13780dbfe19dc1f2eacd1 Mon Sep 17 00:00:00 2001 From: Dimitris Grigoriou Date: Wed, 9 Oct 2024 17:20:17 +0300 Subject: [PATCH 3/5] Fix TXM flakey test (#14697) --- .changeset/old-humans-watch.md | 5 +++++ core/chains/evm/txmgr/txmgr_test.go | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 .changeset/old-humans-watch.md diff --git a/.changeset/old-humans-watch.md b/.changeset/old-humans-watch.md new file mode 100644 index 00000000000..e58dedcd368 --- /dev/null +++ b/.changeset/old-humans-watch.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Fix TXM flakey test #internal diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 3bdf9ef5f2e..c47ca85737b 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -501,8 +501,8 @@ func TestTxm_Lifecycle(t *testing.T) { head := cltest.Head(42) finalizedHead := cltest.Head(0) - ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once() - ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(finalizedHead, nil).Once() + ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil) + ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(finalizedHead, nil) keyChangeCh := make(chan struct{}) unsub := cltest.NewAwaiter() From dbd42db9b838b619d0f8a5acd21328ecd5043cd3 Mon Sep 17 00:00:00 2001 From: Joe Huang Date: Wed, 9 Oct 2024 10:04:48 -0500 Subject: [PATCH 4/5] BCFR-934/remove-finality-depth-as-default-value-for-minConfirmation-and-fix-inconsistency (#14509) * remove default finality depth, and allow 0 to be provided by user * update changeset * fix lint * restore > 0 check * wait for callback unless explicit zero * support finality callbacks * update changeset * update changeset * feedback --------- Co-authored-by: Jordan Krage --- .changeset/brave-ads-explode.md | 9 ++++++ common/txmgr/confirmer.go | 6 ++-- common/txmgr/types/mocks/tx_store.go | 31 ++++++++++---------- common/txmgr/types/tx_store.go | 2 +- core/chains/evm/txmgr/confirmer_test.go | 10 +++---- core/chains/evm/txmgr/evm_tx_store.go | 10 +++++-- core/chains/evm/txmgr/evm_tx_store_test.go | 24 +++++++++++++--- core/chains/evm/txmgr/mocks/evm_tx_store.go | 31 ++++++++++---------- core/services/pipeline/task.eth_tx.go | 32 ++++++++++----------- 9 files changed, 92 insertions(+), 63 deletions(-) create mode 100644 .changeset/brave-ads-explode.md diff --git a/.changeset/brave-ads-explode.md b/.changeset/brave-ads-explode.md new file mode 100644 index 00000000000..4be608e2d1d --- /dev/null +++ b/.changeset/brave-ads-explode.md @@ -0,0 +1,9 @@ +--- +"chainlink": patch +--- + +Remove finality depth as the default value for minConfirmation for tx jobs. +Update the sql query for fetching pending callback transactions: +if minConfirmation is not null, we check difference if the current block - tx block > minConfirmation +else we check if the tx block is <= finalizedBlock +#updated diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index 4eaa6739d58..d2d491d794c 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -347,7 +347,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro if ec.resumeCallback != nil { mark = time.Now() - if err := ec.ResumePendingTaskRuns(ctx, head); err != nil { + if err := ec.ResumePendingTaskRuns(ctx, head.BlockNumber(), latestFinalizedHead.BlockNumber()); err != nil { return fmt.Errorf("ResumePendingTaskRuns failed: %w", err) } @@ -1259,8 +1259,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen } // ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts -func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error { - receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID) +func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, latest, finalized int64) error { + receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, latest, finalized, ec.chainID) if err != nil { return err diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 4467729e167..279a7252f1a 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -1096,9 +1096,9 @@ func (_c *TxStore_FindTxesByMetaFieldAndStates_Call[ADDR, CHAIN_ID, TX_HASH, BLO return _c } -// FindTxesPendingCallback provides a mock function with given fields: ctx, blockNum, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error) { - ret := _m.Called(ctx, blockNum, chainID) +// FindTxesPendingCallback provides a mock function with given fields: ctx, latest, finalized, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx context.Context, latest int64, finalized int64, chainID CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error) { + ret := _m.Called(ctx, latest, finalized, chainID) if len(ret) == 0 { panic("no return value specified for FindTxesPendingCallback") @@ -1106,19 +1106,19 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPen var r0 []txmgrtypes.ReceiptPlus[R] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)); ok { - return rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)); ok { + return rf(ctx, latest, finalized, chainID) } - if rf, ok := ret.Get(0).(func(context.Context, int64, CHAIN_ID) []txmgrtypes.ReceiptPlus[R]); ok { - r0 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, CHAIN_ID) []txmgrtypes.ReceiptPlus[R]); ok { + r0 = rf(ctx, latest, finalized, chainID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]txmgrtypes.ReceiptPlus[R]) } } - if rf, ok := ret.Get(1).(func(context.Context, int64, CHAIN_ID) error); ok { - r1 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(1).(func(context.Context, int64, int64, CHAIN_ID) error); ok { + r1 = rf(ctx, latest, finalized, chainID) } else { r1 = ret.Error(1) } @@ -1133,15 +1133,16 @@ type TxStore_FindTxesPendingCallback_Call[ADDR types.Hashable, CHAIN_ID types.ID // FindTxesPendingCallback is a helper method to define mock.On call // - ctx context.Context -// - blockNum int64 +// - latest int64 +// - finalized int64 // - chainID CHAIN_ID -func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx interface{}, blockNum interface{}, chainID interface{}) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { - return &TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("FindTxesPendingCallback", ctx, blockNum, chainID)} +func (_e *TxStore_Expecter[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxesPendingCallback(ctx interface{}, latest interface{}, finalized interface{}, chainID interface{}) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { + return &TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{Call: _e.mock.On("FindTxesPendingCallback", ctx, latest, finalized, chainID)} } -func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, blockNum int64, chainID CHAIN_ID)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { +func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Run(run func(ctx context.Context, latest int64, finalized int64, chainID CHAIN_ID)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(CHAIN_ID)) + run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(CHAIN_ID)) }) return _c } @@ -1151,7 +1152,7 @@ func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HA return _c } -func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { +func (_c *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RunAndReturn(run func(context.Context, int64, int64, CHAIN_ID) ([]txmgrtypes.ReceiptPlus[R], error)) *TxStore_FindTxesPendingCallback_Call[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] { _c.Call.Return(run) return _c } diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 3d874cc4366..668b8db2049 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -34,7 +34,7 @@ type TxStore[ TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE] // Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled - FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error) + FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error) // Update tx to mark that its callback has been signaled UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index a5e19cda277..a75b7709787 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -3055,7 +3055,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { // It would only be in a state past suspended if the resume callback was called and callback_completed was set to TRUE pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE, callback_completed = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) - err := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0) require.NoError(t, err) }) @@ -3073,7 +3073,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) - err := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0) require.NoError(t, err) }) @@ -3101,7 +3101,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { t.Cleanup(func() { <-done }) go func() { defer close(done) - err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0) if !assert.NoError(t, err2) { return } @@ -3155,7 +3155,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { t.Cleanup(func() { <-done }) go func() { defer close(done) - err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0) if !assert.NoError(t, err2) { return } @@ -3192,7 +3192,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash) pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID) - err := ec.ResumePendingTaskRuns(tests.Context(t), &head) + err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0) require.Error(t, err) // Retrieve Tx to check if callback completed flag was left unchanged diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index f56e8fab3fd..b75533e8d05 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1055,7 +1055,7 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e } // Find confirmed txes requiring callback but have not yet been signaled -func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) { +func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) { var rs []dbReceiptPlus var cancel context.CancelFunc @@ -1066,8 +1066,12 @@ func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64 INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash WHERE evm.txes.pipeline_task_run_id IS NOT NULL AND evm.txes.signal_callback = TRUE AND evm.txes.callback_completed = FALSE - AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) AND evm.txes.evm_chain_id = $2 - `, blockNum, chainID.String()) + AND ( + (evm.txes.min_confirmations IS NOT NULL AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations)) + OR (evm.txes.min_confirmations IS NULL AND evm.receipts.block_number <= $2) + ) + AND evm.txes.evm_chain_id = $3 + `, latest, finalized, chainID.String()) if err != nil { return nil, fmt.Errorf("failed to retrieve transactions pending pipeline resume callback: %w", err) } diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index ffea13def04..9e1f135e0b2 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -652,7 +652,7 @@ func TestORM_FindTxesPendingCallback(t *testing.T) { etx1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress) pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`) attempt1 := etx1.TxAttempts[0] - mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash) + etxBlockNum := mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash).BlockNumber pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr1.ID, minConfirmations, etx1.ID) // Callback to pipeline service completed. Should be ignored @@ -685,10 +685,26 @@ func TestORM_FindTxesPendingCallback(t *testing.T) { pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = $1 WHERE id = $2`, minConfirmations, etx5.ID) // Search evm.txes table for tx requiring callback - receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, ethClient.ConfiguredChainID()) + receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID()) require.NoError(t, err) - assert.Len(t, receiptsPlus, 1) - assert.Equal(t, tr1.ID, receiptsPlus[0].ID) + if assert.Len(t, receiptsPlus, 1) { + assert.Equal(t, tr1.ID, receiptsPlus[0].ID) + } + + // Clear min_confirmations + pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = NULL WHERE id = $1`, etx1.ID) + + // Search evm.txes table for tx requiring callback + receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID()) + require.NoError(t, err) + assert.Empty(t, receiptsPlus) + + // Search evm.txes table for tx requiring callback, with block 1 finalized + receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, etxBlockNum, ethClient.ConfiguredChainID()) + require.NoError(t, err) + if assert.Len(t, receiptsPlus, 1) { + assert.Equal(t, tr1.ID, receiptsPlus[0].ID) + } } func Test_FindTxWithIdempotencyKey(t *testing.T) { diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index a9a175e3d94..7800b26e47a 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -1395,9 +1395,9 @@ func (_c *EvmTxStore_FindTxesByMetaFieldAndStates_Call) RunAndReturn(run func(co return _c } -// FindTxesPendingCallback provides a mock function with given fields: ctx, blockNum, chainID -func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error) { - ret := _m.Called(ctx, blockNum, chainID) +// FindTxesPendingCallback provides a mock function with given fields: ctx, latest, finalized, chainID +func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, latest int64, finalized int64, chainID *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error) { + ret := _m.Called(ctx, latest, finalized, chainID) if len(ret) == 0 { panic("no return value specified for FindTxesPendingCallback") @@ -1405,19 +1405,19 @@ func (_m *EvmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int6 var r0 []types.ReceiptPlus[*evmtypes.Receipt] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)); ok { - return rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)); ok { + return rf(ctx, latest, finalized, chainID) } - if rf, ok := ret.Get(0).(func(context.Context, int64, *big.Int) []types.ReceiptPlus[*evmtypes.Receipt]); ok { - r0 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(0).(func(context.Context, int64, int64, *big.Int) []types.ReceiptPlus[*evmtypes.Receipt]); ok { + r0 = rf(ctx, latest, finalized, chainID) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]types.ReceiptPlus[*evmtypes.Receipt]) } } - if rf, ok := ret.Get(1).(func(context.Context, int64, *big.Int) error); ok { - r1 = rf(ctx, blockNum, chainID) + if rf, ok := ret.Get(1).(func(context.Context, int64, int64, *big.Int) error); ok { + r1 = rf(ctx, latest, finalized, chainID) } else { r1 = ret.Error(1) } @@ -1432,15 +1432,16 @@ type EvmTxStore_FindTxesPendingCallback_Call struct { // FindTxesPendingCallback is a helper method to define mock.On call // - ctx context.Context -// - blockNum int64 +// - latest int64 +// - finalized int64 // - chainID *big.Int -func (_e *EvmTxStore_Expecter) FindTxesPendingCallback(ctx interface{}, blockNum interface{}, chainID interface{}) *EvmTxStore_FindTxesPendingCallback_Call { - return &EvmTxStore_FindTxesPendingCallback_Call{Call: _e.mock.On("FindTxesPendingCallback", ctx, blockNum, chainID)} +func (_e *EvmTxStore_Expecter) FindTxesPendingCallback(ctx interface{}, latest interface{}, finalized interface{}, chainID interface{}) *EvmTxStore_FindTxesPendingCallback_Call { + return &EvmTxStore_FindTxesPendingCallback_Call{Call: _e.mock.On("FindTxesPendingCallback", ctx, latest, finalized, chainID)} } -func (_c *EvmTxStore_FindTxesPendingCallback_Call) Run(run func(ctx context.Context, blockNum int64, chainID *big.Int)) *EvmTxStore_FindTxesPendingCallback_Call { +func (_c *EvmTxStore_FindTxesPendingCallback_Call) Run(run func(ctx context.Context, latest int64, finalized int64, chainID *big.Int)) *EvmTxStore_FindTxesPendingCallback_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64), args[2].(*big.Int)) + run(args[0].(context.Context), args[1].(int64), args[2].(int64), args[3].(*big.Int)) }) return _c } @@ -1450,7 +1451,7 @@ func (_c *EvmTxStore_FindTxesPendingCallback_Call) Return(receiptsPlus []types.R return _c } -func (_c *EvmTxStore_FindTxesPendingCallback_Call) RunAndReturn(run func(context.Context, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)) *EvmTxStore_FindTxesPendingCallback_Call { +func (_c *EvmTxStore_FindTxesPendingCallback_Call) RunAndReturn(run func(context.Context, int64, int64, *big.Int) ([]types.ReceiptPlus[*evmtypes.Receipt], error)) *EvmTxStore_FindTxesPendingCallback_Call { _c.Call.Return(run) return _c } diff --git a/core/services/pipeline/task.eth_tx.go b/core/services/pipeline/task.eth_tx.go index 506a2518f76..3c340261966 100644 --- a/core/services/pipeline/task.eth_tx.go +++ b/core/services/pipeline/task.eth_tx.go @@ -64,11 +64,11 @@ func (t *ETHTxTask) getEvmChainID() string { return t.EVMChainID } -func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (result Result, runInfo RunInfo) { +func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inputs []Result) (Result, RunInfo) { var chainID StringParam err := errors.Wrap(ResolveParam(&chainID, From(VarExpr(t.getEvmChainID(), vars), NonemptyString(t.getEvmChainID()), "")), "evmChainID") if err != nil { - return Result{Error: err}, runInfo + return Result{Error: err}, RunInfo{} } chain, err := t.legacyChains.Get(string(chainID)) @@ -81,7 +81,7 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu txManager := chain.TxManager() _, err = CheckInputs(inputs, -1, -1, 0) if err != nil { - return Result{Error: errors.Wrap(err, "task inputs")}, runInfo + return Result{Error: errors.Wrap(err, "task inputs")}, RunInfo{} } maximumGasLimit := SelectGasLimit(cfg.GasEstimator(), t.jobType, t.specGasLimit) @@ -107,25 +107,20 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu errors.Wrap(ResolveParam(&failOnRevert, From(NonemptyString(t.FailOnRevert), false)), "failOnRevert"), ) if err != nil { - return Result{Error: err}, runInfo - } - var minOutgoingConfirmations uint64 - if min, isSet := maybeMinConfirmations.Uint64(); isSet { - minOutgoingConfirmations = min - } else { - minOutgoingConfirmations = uint64(cfg.FinalityDepth()) + return Result{Error: err}, RunInfo{} } + minOutgoingConfirmations, isMinConfirmationSet := maybeMinConfirmations.Uint64() txMeta, err := decodeMeta(txMetaMap) if err != nil { - return Result{Error: err}, runInfo + return Result{Error: err}, RunInfo{} } txMeta.FailOnRevert = null.BoolFrom(bool(failOnRevert)) setJobIDOnMeta(lggr, vars, txMeta) transmitChecker, err := decodeTransmitChecker(transmitCheckerMap) if err != nil { - return Result{Error: err}, runInfo + return Result{Error: err}, RunInfo{} } fromAddr, err := t.keyStore.GetRoundRobinAddress(ctx, chain.ID(), fromAddrs...) @@ -159,8 +154,11 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu SignalCallback: true, } - if minOutgoingConfirmations > 0 { - // Store the task run ID, so we can resume the pipeline when tx is confirmed + if !isMinConfirmationSet { + // Store the task run ID, so we can resume the pipeline when tx is finalized + txRequest.PipelineTaskRunID = &t.uuid + } else if minOutgoingConfirmations > 0 { + // Store the task run ID, so we can resume the pipeline after minOutgoingConfirmations txRequest.PipelineTaskRunID = &t.uuid txRequest.MinConfirmations = clnull.Uint32From(uint32(minOutgoingConfirmations)) } @@ -170,11 +168,11 @@ func (t *ETHTxTask) Run(ctx context.Context, lggr logger.Logger, vars Vars, inpu return Result{Error: errors.Wrapf(ErrTaskRunFailed, "while creating transaction: %v", err)}, retryableRunInfo() } - if minOutgoingConfirmations > 0 { - return Result{}, pendingRunInfo() + if txRequest.PipelineTaskRunID != nil { + return Result{}, RunInfo{IsPending: true} } - return Result{Value: nil}, runInfo + return Result{}, RunInfo{} } func decodeMeta(metaMap MapParam) (*txmgr.TxMeta, error) { From 5ca0d1f19f90c7b42c3cb1ae7b6b860802c92f64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Deividas=20Kar=C5=BEinauskas?= Date: Wed, 9 Oct 2024 18:57:21 +0300 Subject: [PATCH 5/5] [KS-430] Provide an OracleFactory to StandardCapabilities (#14305) * --wip-- [skip CI] * --wip-- [skip CI] * Add bootstrap peers via capabilities spec * Add peer wrapper to Oracle Factory * Add comment explaining delegate position * Cleanup * Create a key bundle if none exists * Use PR #738 hash * Fix bad merge * Bump chainlink-common * Use in-memory DB for OCR persistance * Also add eth pubkey * Oracle instance spawned * Bump chainlink-common version * Undo some changes * Add changeset + fix interface * Update changeset * Use chainlink-common commit from main branch * Add oracle_factory config to SC spec * Undo a change * Things work as is. Checkpoint. * Update comments. Remove redundant log. * Remove redundant test and bring back log line * Remove redundant JSONConfig * Implement oracle factory transmitter * Stop using pkg/errors * Naming convention * Woops * Fix lint errors * Tidy --- .changeset/chilled-months-bow.md | 5 + core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 +- core/services/chainlink/application.go | 23 +-- core/services/job/models.go | 33 ++++- core/services/job/orm.go | 4 +- .../ocr2/plugins/generic/oraclefactory.go | 140 ++++++++++++++++++ .../ocr2/plugins/generic/oraclefactorydb.go | 135 +++++++++++++++++ .../generic/oraclefactorytransmitter.go | 36 +++++ .../services/standardcapabilities/delegate.go | 114 +++++++++++++- .../standardcapabilities/delegate_test.go | 121 +++++++++++++++ .../standard_capabilities.go | 12 +- core/services/workflows/engine.go | 3 +- .../0255_standard_capabilities_extension.sql | 9 ++ go.mod | 2 +- go.sum | 4 +- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 +- integration-tests/load/go.mod | 2 +- integration-tests/load/go.sum | 4 +- .../capabilities/log-event-trigger/main.go | 1 + 21 files changed, 621 insertions(+), 39 deletions(-) create mode 100644 .changeset/chilled-months-bow.md create mode 100644 core/services/ocr2/plugins/generic/oraclefactory.go create mode 100644 core/services/ocr2/plugins/generic/oraclefactorydb.go create mode 100644 core/services/ocr2/plugins/generic/oraclefactorytransmitter.go create mode 100644 core/services/standardcapabilities/delegate_test.go create mode 100644 core/store/migrate/migrations/0255_standard_capabilities_extension.sql diff --git a/.changeset/chilled-months-bow.md b/.changeset/chilled-months-bow.md new file mode 100644 index 00000000000..d3bbf7f97e3 --- /dev/null +++ b/.changeset/chilled-months-bow.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#added oracle support in standard capabilities diff --git a/core/scripts/go.mod b/core/scripts/go.mod index af7ddc2f98f..5c9043111da 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -22,7 +22,7 @@ require ( github.com/prometheus/client_golang v1.20.0 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd + github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240717100443-f6226e09bee7 github.com/spf13/cobra v1.8.1 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 684155bba51..edb7d16b965 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1099,8 +1099,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d h1:GpGKzVtDWSb8cesHSvm+LQpCJpFdfVPk3us5wAY6ixI= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d/go.mod h1:+yz2So1Lp90C3ATfMmqMEJaIr3zfG8e4xHa/3u1kIDk= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q= diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index c9e72244cf1..784abac9516 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -453,15 +453,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { pipelineRunner, cfg.JobPipeline(), ), - job.StandardCapabilities: standardcapabilities.NewDelegate( - globalLogger, - opts.DS, jobORM, - opts.CapabilitiesRegistry, - loopRegistrarConfig, - telemetryManager, - pipelineRunner, - opts.RelayerChainInteroperators, - gatewayConnectorWrapper), } webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner() ) @@ -501,6 +492,20 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, fmt.Errorf("P2P stack required for OCR or OCR2") } + // If peer wrapper is initialized, Oracle Factory dependency will be available to standard capabilities + delegates[job.StandardCapabilities] = standardcapabilities.NewDelegate( + globalLogger, + opts.DS, jobORM, + opts.CapabilitiesRegistry, + loopRegistrarConfig, + telemetryManager, + pipelineRunner, + opts.RelayerChainInteroperators, + gatewayConnectorWrapper, + keyStore, + peerWrapper, + ) + if cfg.OCR().Enabled() { delegates[job.OffchainReporting] = ocr.NewDelegate( opts.DS, diff --git a/core/services/job/models.go b/core/services/job/models.go index f4b773a1bfb..2c225cd0f2b 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -948,12 +948,35 @@ func (w *WorkflowSpec) RawSpec(ctx context.Context) ([]byte, error) { return rs, nil } +type OracleFactoryConfig struct { + Enabled bool `toml:"enabled"` + BootstrapPeers []string `toml:"bootstrap_peers"` // e.g.,["12D3KooWEBVwbfdhKnicois7FTYVsBFGFcoMhMCKXQC57BQyZMhz@localhost:6690"] + OCRContractAddress string `toml:"ocr_contract_address"` // e.g., 0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6 + ChainID string `toml:"chain_id"` // e.g., "31337" + Network string `toml:"network"` // e.g., "evm" +} + +// Value returns this instance serialized for database storage. +func (ofc OracleFactoryConfig) Value() (driver.Value, error) { + return json.Marshal(ofc) +} + +// Scan reads the database value and returns an instance. +func (ofc *OracleFactoryConfig) Scan(value interface{}) error { + b, ok := value.([]byte) + if !ok { + return errors.Errorf("expected bytes got %T", b) + } + return json.Unmarshal(b, &ofc) +} + type StandardCapabilitiesSpec struct { - ID int32 - CreatedAt time.Time `toml:"-"` - UpdatedAt time.Time `toml:"-"` - Command string `toml:"command"` - Config string `toml:"config"` + ID int32 + CreatedAt time.Time `toml:"-"` + UpdatedAt time.Time `toml:"-"` + Command string `toml:"command" db:"command"` + Config string `toml:"config" db:"config"` + OracleFactory OracleFactoryConfig `toml:"oracle_factory" db:"oracle_factory"` } func (w *StandardCapabilitiesSpec) GetID() string { diff --git a/core/services/job/orm.go b/core/services/job/orm.go index d02e0b29200..071ca37203e 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -418,8 +418,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error { } jb.WorkflowSpecID = &specID case StandardCapabilities: - sql := `INSERT INTO standardcapabilities_specs (command, config, created_at, updated_at) - VALUES (:command, :config, NOW(), NOW()) + sql := `INSERT INTO standardcapabilities_specs (command, config, oracle_factory, created_at, updated_at) + VALUES (:command, :config, :oracle_factory, NOW(), NOW()) RETURNING id;` specID, err := tx.prepareQuerySpecID(ctx, sql, jb.StandardCapabilitiesSpec) if err != nil { diff --git a/core/services/ocr2/plugins/generic/oraclefactory.go b/core/services/ocr2/plugins/generic/oraclefactory.go new file mode 100644 index 00000000000..7d44a239d2e --- /dev/null +++ b/core/services/ocr2/plugins/generic/oraclefactory.go @@ -0,0 +1,140 @@ +package generic + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/prometheus/client_golang/prometheus" + ocr "github.com/smartcontractkit/libocr/offchainreporting2plus" + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key" + "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" + "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" + + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" +) + +type oracleFactory struct { + database ocr3types.Database + jobID int32 + jobName string + jobORM job.ORM + kb ocr2key.KeyBundle + lggr logger.Logger + config job.OracleFactoryConfig + peerWrapper *ocrcommon.SingletonPeerWrapper + relayerSet *RelayerSet + transmitterID string +} + +type OracleFactoryParams struct { + JobID int32 + JobName string + JobORM job.ORM + KB ocr2key.KeyBundle + Logger logger.Logger + Config job.OracleFactoryConfig + PeerWrapper *ocrcommon.SingletonPeerWrapper + RelayerSet *RelayerSet + TransmitterID string +} + +func NewOracleFactory(params OracleFactoryParams) (core.OracleFactory, error) { + return &oracleFactory{ + database: OracleFactoryDB(params.JobID, params.Logger), + jobID: params.JobID, + jobName: params.JobName, + jobORM: params.JobORM, + kb: params.KB, + lggr: params.Logger, + config: params.Config, + peerWrapper: params.PeerWrapper, + relayerSet: params.RelayerSet, + transmitterID: params.TransmitterID, + }, nil +} + +func (of *oracleFactory) NewOracle(ctx context.Context, args core.OracleArgs) (core.Oracle, error) { + if !of.peerWrapper.IsStarted() { + return nil, errors.New("peer wrapper not started") + } + + relayer, err := of.relayerSet.Get(ctx, types.RelayID{Network: of.config.Network, ChainID: of.config.ChainID}) + if err != nil { + return nil, fmt.Errorf("error when getting relayer: %w", err) + } + + var relayConfig = struct { + ChainID string `json:"chainID"` + EffectiveTransmitterID string `json:"effectiveTransmitterID"` + SendingKeys []string `json:"sendingKeys"` + }{ + ChainID: of.config.ChainID, + EffectiveTransmitterID: of.transmitterID, + SendingKeys: []string{of.transmitterID}, + } + relayConfigBytes, err := json.Marshal(relayConfig) + if err != nil { + return nil, fmt.Errorf("error when marshalling relay config: %w", err) + } + + pluginProvider, err := relayer.NewPluginProvider(ctx, core.RelayArgs{ + ContractID: of.config.OCRContractAddress, + ProviderType: "plugin", + RelayConfig: relayConfigBytes, + }, core.PluginArgs{ + TransmitterID: of.transmitterID, + }) + if err != nil { + return nil, fmt.Errorf("error when getting offchain digester: %w", err) + } + + bootstrapPeers, err := ocrcommon.ParseBootstrapPeers(of.config.BootstrapPeers) + if err != nil { + return nil, fmt.Errorf("failed to parse bootstrap peers: %w", err) + } + + oracle, err := ocr.NewOracle(ocr.OCR3OracleArgs[[]byte]{ + // We are relying on the relayer plugin provider for the offchain config digester + // and the contract config tracker to save time. + ContractConfigTracker: pluginProvider.ContractConfigTracker(), + OffchainConfigDigester: pluginProvider.OffchainConfigDigester(), + LocalConfig: args.LocalConfig, + ContractTransmitter: NewContractTransmitter(of.transmitterID, args.ContractTransmitter), + ReportingPluginFactory: args.ReportingPluginFactoryService, + BinaryNetworkEndpointFactory: of.peerWrapper.Peer2, + V2Bootstrappers: bootstrapPeers, + Database: of.database, + Logger: ocrcommon.NewOCRWrapper(of.lggr, true, func(ctx context.Context, msg string) { + logger.Sugared(of.lggr).ErrorIf(of.jobORM.RecordError(ctx, of.jobID, msg), "unable to record error") + }), + MonitoringEndpoint: &telemetry.NoopAgent{}, + OffchainKeyring: of.kb, + OnchainKeyring: ocrcommon.NewOCR3OnchainKeyringAdapter(of.kb), + MetricsRegisterer: prometheus.WrapRegistererWith(map[string]string{"job_name": of.jobName}, prometheus.DefaultRegisterer), + }) + + if err != nil { + return nil, fmt.Errorf("%w: failed to create new OCR oracle", err) + } + + return &adaptedOracle{oracle: oracle}, nil +} + +type adaptedOracle struct { + oracle ocr.Oracle +} + +func (a *adaptedOracle) Start(ctx context.Context) error { + return a.oracle.Start() +} + +func (a *adaptedOracle) Close(ctx context.Context) error { + return a.oracle.Close() +} diff --git a/core/services/ocr2/plugins/generic/oraclefactorydb.go b/core/services/ocr2/plugins/generic/oraclefactorydb.go new file mode 100644 index 00000000000..5db6f8a2ffe --- /dev/null +++ b/core/services/ocr2/plugins/generic/oraclefactorydb.go @@ -0,0 +1,135 @@ +package generic + +import ( + "context" + "encoding/json" + "fmt" + "time" + + ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +type oracleFactoryDb struct { + // The ID is used for logging and error messages + // A single standard capabilities spec can instantiate multiple oracles + // TODO: NewOracle should take a unique identifier for the oracle + specID int32 + lggr logger.SugaredLogger + config *ocrtypes.ContractConfig + states map[ocrtypes.ConfigDigest]*ocrtypes.PersistentState + pendingTransmissions map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission + protocolStates map[ocrtypes.ConfigDigest]map[string][]byte +} + +var ( + _ ocrtypes.Database = &oracleFactoryDb{} +) + +// NewDB returns a new DB scoped to this instanceID +func OracleFactoryDB(specID int32, lggr logger.Logger) *oracleFactoryDb { + return &oracleFactoryDb{ + specID: specID, + lggr: logger.Sugared(lggr.Named("OracleFactoryMemoryDb")), + states: make(map[ocrtypes.ConfigDigest]*ocrtypes.PersistentState), + pendingTransmissions: make(map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission), + protocolStates: make(map[ocrtypes.ConfigDigest]map[string][]byte), + } +} + +func (ofdb *oracleFactoryDb) ReadState(ctx context.Context, cd ocrtypes.ConfigDigest) (ps *ocrtypes.PersistentState, err error) { + ps, ok := ofdb.states[cd] + if !ok { + return nil, fmt.Errorf("state not found for standard capabilities spec ID %d, config digest %s", ofdb.specID, cd) + } + + return ps, nil +} + +func (ofdb *oracleFactoryDb) WriteState(ctx context.Context, cd ocrtypes.ConfigDigest, state ocrtypes.PersistentState) error { + ofdb.states[cd] = &state + return nil +} + +func (ofdb *oracleFactoryDb) ReadConfig(ctx context.Context) (c *ocrtypes.ContractConfig, err error) { + if ofdb.config == nil { + // Returning nil, nil because this is a cache miss + return nil, nil + } + return ofdb.config, nil +} + +func (ofdb *oracleFactoryDb) WriteConfig(ctx context.Context, c ocrtypes.ContractConfig) error { + ofdb.config = &c + + cBytes, err := json.Marshal(c) + if err != nil { + return fmt.Errorf("MemoryDB: WriteConfig failed to marshal config: %w", err) + } + + ofdb.lggr.Debugw("MemoryDB: WriteConfig", "ocrtypes.ContractConfig", string(cBytes)) + + return nil +} + +func (ofdb *oracleFactoryDb) StorePendingTransmission(ctx context.Context, t ocrtypes.ReportTimestamp, tx ocrtypes.PendingTransmission) error { + ofdb.pendingTransmissions[t] = tx + return nil +} + +func (ofdb *oracleFactoryDb) PendingTransmissionsWithConfigDigest(ctx context.Context, cd ocrtypes.ConfigDigest) (map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission, error) { + m := make(map[ocrtypes.ReportTimestamp]ocrtypes.PendingTransmission) + for k, v := range ofdb.pendingTransmissions { + if k.ConfigDigest == cd { + m[k] = v + } + } + + return m, nil +} + +func (ofdb *oracleFactoryDb) DeletePendingTransmission(ctx context.Context, t ocrtypes.ReportTimestamp) error { + delete(ofdb.pendingTransmissions, t) + return nil +} + +func (ofdb *oracleFactoryDb) DeletePendingTransmissionsOlderThan(ctx context.Context, t time.Time) error { + for k, v := range ofdb.pendingTransmissions { + if v.Time.Before(t) { + delete(ofdb.pendingTransmissions, k) + } + } + + return nil +} + +func (ofdb *oracleFactoryDb) ReadProtocolState( + ctx context.Context, + configDigest ocrtypes.ConfigDigest, + key string, +) ([]byte, error) { + value, ok := ofdb.protocolStates[configDigest][key] + if !ok { + // Previously implementation returned nil if the state is not found + return nil, nil + } + return value, nil +} + +func (ofdb *oracleFactoryDb) WriteProtocolState( + ctx context.Context, + configDigest ocrtypes.ConfigDigest, + key string, + value []byte, +) error { + if value == nil { + delete(ofdb.protocolStates[configDigest], key) + } else { + if ofdb.protocolStates[configDigest] == nil { + ofdb.protocolStates[configDigest] = make(map[string][]byte) + } + ofdb.protocolStates[configDigest][key] = value + } + return nil +} diff --git a/core/services/ocr2/plugins/generic/oraclefactorytransmitter.go b/core/services/ocr2/plugins/generic/oraclefactorytransmitter.go new file mode 100644 index 00000000000..41a2c3fa1c4 --- /dev/null +++ b/core/services/ocr2/plugins/generic/oraclefactorytransmitter.go @@ -0,0 +1,36 @@ +package generic + +import ( + "context" + + "github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types" + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" +) + +var _ ocr3types.ContractTransmitter[[]byte] = (*contractTransmitter)(nil) + +type contractTransmitter struct { + impl ocr3types.ContractTransmitter[[]byte] + transmitterID string +} + +func NewContractTransmitter(transmitterID string, impl ocr3types.ContractTransmitter[[]byte]) *contractTransmitter { + return &contractTransmitter{ + impl: impl, + transmitterID: transmitterID, + } +} + +func (ct *contractTransmitter) Transmit( + ctx context.Context, + configDigest types.ConfigDigest, + seqNr uint64, + reportWithInfo ocr3types.ReportWithInfo[[]byte], + attributedOnchainSignature []types.AttributedOnchainSignature, +) error { + return ct.impl.Transmit(ctx, configDigest, seqNr, reportWithInfo, attributedOnchainSignature) +} + +func (ct *contractTransmitter) FromAccount() (types.Account, error) { + return types.Account(ct.transmitterID), nil +} diff --git a/core/services/standardcapabilities/delegate.go b/core/services/standardcapabilities/delegate.go index c22134bb48b..7c370d4f8de 100644 --- a/core/services/standardcapabilities/delegate.go +++ b/core/services/standardcapabilities/delegate.go @@ -18,7 +18,12 @@ import ( webapitarget "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi/target" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/chaintype" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ocr2key" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/generic" + "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/telemetry" "github.com/smartcontractkit/chainlink/v2/plugins" @@ -39,6 +44,8 @@ type Delegate struct { pipelineRunner pipeline.Runner relayers RelayGetter gatewayConnectorWrapper *gatewayconnector.ServiceWrapper + ks keystore.Master + peerWrapper *ocrcommon.SingletonPeerWrapper isNewlyCreatedJob bool } @@ -49,11 +56,33 @@ const ( commandOverrideForCustomComputeAction = "__builtin_custom-compute-action" ) -func NewDelegate(logger logger.Logger, ds sqlutil.DataSource, jobORM job.ORM, registry core.CapabilitiesRegistry, - cfg plugins.RegistrarConfig, monitoringEndpointGen telemetry.MonitoringEndpointGenerator, pipelineRunner pipeline.Runner, - relayers RelayGetter, gatewayConnectorWrapper *gatewayconnector.ServiceWrapper) *Delegate { - return &Delegate{logger: logger, ds: ds, jobORM: jobORM, registry: registry, cfg: cfg, monitoringEndpointGen: monitoringEndpointGen, pipelineRunner: pipelineRunner, - relayers: relayers, isNewlyCreatedJob: false, gatewayConnectorWrapper: gatewayConnectorWrapper} +func NewDelegate( + logger logger.Logger, + ds sqlutil.DataSource, + jobORM job.ORM, + registry core.CapabilitiesRegistry, + cfg plugins.RegistrarConfig, + monitoringEndpointGen telemetry.MonitoringEndpointGenerator, + pipelineRunner pipeline.Runner, + relayers RelayGetter, + gatewayConnectorWrapper *gatewayconnector.ServiceWrapper, + ks keystore.Master, + peerWrapper *ocrcommon.SingletonPeerWrapper, +) *Delegate { + return &Delegate{ + logger: logger, + ds: ds, + jobORM: jobORM, + registry: registry, + cfg: cfg, + monitoringEndpointGen: monitoringEndpointGen, + pipelineRunner: pipelineRunner, + relayers: relayers, + isNewlyCreatedJob: false, + gatewayConnectorWrapper: gatewayConnectorWrapper, + ks: ks, + peerWrapper: peerWrapper, + } } func (d *Delegate) JobType() job.Type { @@ -78,6 +107,63 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser return nil, fmt.Errorf("failed to create relayer set: %w", err) } + ocrKeyBundles, err := d.ks.OCR2().GetAll() + if err != nil { + return nil, err + } + + if len(ocrKeyBundles) > 1 { + return nil, fmt.Errorf("expected exactly one OCR key bundle, but found: %d", len(ocrKeyBundles)) + } + + var ocrKeyBundle ocr2key.KeyBundle + if len(ocrKeyBundles) == 0 { + ocrKeyBundle, err = d.ks.OCR2().Create(ctx, chaintype.EVM) + if err != nil { + return nil, errors.Wrap(err, "failed to create OCR key bundle") + } + } else { + ocrKeyBundle = ocrKeyBundles[0] + } + + ethKeyBundles, err := d.ks.Eth().GetAll(ctx) + if err != nil { + return nil, err + } + if len(ethKeyBundles) > 1 { + return nil, fmt.Errorf("expected exactly one ETH key bundle, but found: %d", len(ethKeyBundles)) + } + + var ethKeyBundle ethkey.KeyV2 + if len(ethKeyBundles) == 0 { + ethKeyBundle, err = d.ks.Eth().Create(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to create ETH key bundle") + } + } else { + ethKeyBundle = ethKeyBundles[0] + } + + log.Debug("oracleFactoryConfig: ", spec.StandardCapabilitiesSpec.OracleFactory) + + if spec.StandardCapabilitiesSpec.OracleFactory.Enabled && d.peerWrapper == nil { + return nil, errors.New("P2P stack required for Oracle Factory") + } + + oracleFactory, err := generic.NewOracleFactory(generic.OracleFactoryParams{ + Logger: log, + JobORM: d.jobORM, + JobID: spec.ID, + JobName: spec.Name.ValueOrZero(), + KB: ocrKeyBundle, + Config: spec.StandardCapabilitiesSpec.OracleFactory, + PeerWrapper: d.peerWrapper, + RelayerSet: relayerSet, + TransmitterID: ethKeyBundle.Address.String(), + }) + if err != nil { + return nil, fmt.Errorf("failed to create oracle factory: %w", err) + } // NOTE: special cases for built-in capabilities (to be moved into LOOPPs in the future) if spec.StandardCapabilitiesSpec.Command == commandOverrideForWebAPITrigger { if d.gatewayConnectorWrapper == nil { @@ -122,7 +208,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser } standardCapability := newStandardCapabilities(log, spec.StandardCapabilitiesSpec, d.cfg, telemetryService, kvStore, d.registry, errorLog, - pr, relayerSet) + pr, relayerSet, oracleFactory) return []job.ServiceCtx{standardCapability}, nil } @@ -161,6 +247,22 @@ func ValidatedStandardCapabilitiesSpec(tomlString string) (job.Job, error) { return jb, errors.Errorf("standard capabilities command must be set") } + // Skip validation if Oracle Factory is not enabled + if !jb.StandardCapabilitiesSpec.OracleFactory.Enabled { + return jb, nil + } + + // If Oracle Factory is enabled, it must have at least one bootstrap peer + if len(jb.StandardCapabilitiesSpec.OracleFactory.BootstrapPeers) == 0 { + return jb, errors.New("no bootstrap peers found") + } + + // Validate bootstrap peers + _, err = ocrcommon.ParseBootstrapPeers(jb.StandardCapabilitiesSpec.OracleFactory.BootstrapPeers) + if err != nil { + return jb, errors.Wrap(err, "failed to parse bootstrap peers") + } + return jb, nil } diff --git a/core/services/standardcapabilities/delegate_test.go b/core/services/standardcapabilities/delegate_test.go new file mode 100644 index 00000000000..27b6734f911 --- /dev/null +++ b/core/services/standardcapabilities/delegate_test.go @@ -0,0 +1,121 @@ +package standardcapabilities_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/standardcapabilities" +) + +func Test_ValidatedStandardCapabilitiesSpec(t *testing.T) { + type testCase struct { + name string + tomlString string + expectedError string + expectedSpec *job.StandardCapabilitiesSpec + } + + testCases := []testCase{ + { + name: "invalid TOML string", + tomlString: `[[]`, + expectedError: "toml error on load standard capabilities", + }, + { + name: "incorrect job type", + tomlString: ` + type="nonstandardcapabilities" + `, + expectedError: "standard capabilities unsupported job type", + }, + { + name: "command unset", + tomlString: ` + type="standardcapabilities" + `, + expectedError: "standard capabilities command must be set", + }, + { + name: "invalid oracle config: malformed peer", + tomlString: ` + type="standardcapabilities" + command="path/to/binary" + + [oracle_factory] + enabled=true + bootstrap_peers = [ + "invalid_p2p_id@invalid_ip:1111" + ] + `, + expectedError: "failed to parse bootstrap peers", + }, + { + name: "invalid oracle config: missing bootstrap peers", + tomlString: ` + type="standardcapabilities" + command="path/to/binary" + + [oracle_factory] + enabled=true + `, + expectedError: "no bootstrap peers found", + }, + { + name: "valid spec", + tomlString: ` + type="standardcapabilities" + command="path/to/binary" + `, + }, + { + name: "valid spec with oracle config", + tomlString: ` + type="standardcapabilities" + command="path/to/binary" + + [capabilities] + target = "enabled" + + [oracle_factory] + enabled=true + bootstrap_peers = [ + "12D3KooWEBVwbfdhKnicois7FTYVsBFGFcoMhMCKXQC57BQyZMhz@localhost:6690" + ] + network="evm" + chain_id="31337" + ocr_contract_address="0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6" + `, + expectedSpec: &job.StandardCapabilitiesSpec{ + Command: "path/to/binary", + OracleFactory: job.OracleFactoryConfig{ + Enabled: true, + BootstrapPeers: []string{ + "12D3KooWEBVwbfdhKnicois7FTYVsBFGFcoMhMCKXQC57BQyZMhz@localhost:6690", + }, + OCRContractAddress: "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6", + ChainID: "31337", + Network: "evm", + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + jobSpec, err := standardcapabilities.ValidatedStandardCapabilitiesSpec(tc.tomlString) + + if tc.expectedError != "" { + assert.ErrorContains(t, err, tc.expectedError) + } else { + require.NoError(t, err) + } + + if tc.expectedSpec != nil { + assert.EqualValues(t, tc.expectedSpec, jobSpec.StandardCapabilitiesSpec) + } + }) + } +} diff --git a/core/services/standardcapabilities/standard_capabilities.go b/core/services/standardcapabilities/standard_capabilities.go index a8d007d5df8..fe3dad7bb2f 100644 --- a/core/services/standardcapabilities/standard_capabilities.go +++ b/core/services/standardcapabilities/standard_capabilities.go @@ -23,18 +23,23 @@ type standardCapabilities struct { errorLog core.ErrorLog pipelineRunner core.PipelineRunnerService relayerSet core.RelayerSet + oracleFactory core.OracleFactory capabilitiesLoop *loop.StandardCapabilitiesService } -func newStandardCapabilities(log logger.Logger, spec *job.StandardCapabilitiesSpec, +func newStandardCapabilities( + log logger.Logger, + spec *job.StandardCapabilitiesSpec, pluginRegistrar plugins.RegistrarConfig, telemetryService core.TelemetryService, store core.KeyValueStore, CapabilitiesRegistry core.CapabilitiesRegistry, errorLog core.ErrorLog, pipelineRunner core.PipelineRunnerService, - relayerSet core.RelayerSet) *standardCapabilities { + relayerSet core.RelayerSet, + oracleFactory core.OracleFactory, +) *standardCapabilities { return &standardCapabilities{ log: log, spec: spec, @@ -45,6 +50,7 @@ func newStandardCapabilities(log logger.Logger, spec *job.StandardCapabilitiesSp errorLog: errorLog, pipelineRunner: pipelineRunner, relayerSet: relayerSet, + oracleFactory: oracleFactory, } } @@ -73,7 +79,7 @@ func (s *standardCapabilities) Start(ctx context.Context) error { } if err = s.capabilitiesLoop.Service.Initialise(ctx, s.spec.Config, s.telemetryService, s.store, s.CapabilitiesRegistry, s.errorLog, - s.pipelineRunner, s.relayerSet); err != nil { + s.pipelineRunner, s.relayerSet, s.oracleFactory); err != nil { return fmt.Errorf("error initialising standard capabilities service: %v", err) } diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index ed584ba5aec..680c29371a0 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -11,9 +11,8 @@ import ( "github.com/jonboulle/clockwork" - "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" - "github.com/smartcontractkit/chainlink-common/pkg/workflows/exec" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/services" diff --git a/core/store/migrate/migrations/0255_standard_capabilities_extension.sql b/core/store/migrate/migrations/0255_standard_capabilities_extension.sql new file mode 100644 index 00000000000..d81b4864eb4 --- /dev/null +++ b/core/store/migrate/migrations/0255_standard_capabilities_extension.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE standardcapabilities_specs +ADD COLUMN oracle_factory JSONB; +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +ALTER TABLE standardcapabilities_specs DROP COLUMN oracle_factory; +-- +goose StatementEnd diff --git a/go.mod b/go.mod index 93696685c60..a2c9b1b211f 100644 --- a/go.mod +++ b/go.mod @@ -76,7 +76,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.23 github.com/smartcontractkit/chainlink-automation v1.0.4 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb - github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd + github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 github.com/smartcontractkit/chainlink-feeds v0.0.0-20240910155501-42f20443189f diff --git a/go.sum b/go.sum index 2e4a7775a60..8fad9ec97ad 100644 --- a/go.sum +++ b/go.sum @@ -1060,8 +1060,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d h1:GpGKzVtDWSb8cesHSvm+LQpCJpFdfVPk3us5wAY6ixI= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d/go.mod h1:+yz2So1Lp90C3ATfMmqMEJaIr3zfG8e4xHa/3u1kIDk= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index d3ad17f908c..34cf2df9cbb 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -40,7 +40,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.23 github.com/smartcontractkit/chainlink-automation v1.0.4 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb - github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd + github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d github.com/smartcontractkit/chainlink-testing-framework/havoc v1.50.0 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.9 github.com/smartcontractkit/chainlink-testing-framework/lib/grafana v1.50.0 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 90c7f08b0f9..bd416855c59 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1439,8 +1439,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d h1:GpGKzVtDWSb8cesHSvm+LQpCJpFdfVPk3us5wAY6ixI= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d/go.mod h1:+yz2So1Lp90C3ATfMmqMEJaIr3zfG8e4xHa/3u1kIDk= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 8b3fed5432b..85b57698a3e 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -15,7 +15,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.33.0 github.com/slack-go/slack v0.12.2 - github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd + github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.9 github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.1 github.com/smartcontractkit/chainlink-testing-framework/wasp v1.50.0 diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 8bdc23dc35f..b921352eeb5 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1415,8 +1415,8 @@ github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8um github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb h1:bYT7j5syymCtAKv/gGG7CLp4APe/VDWkig9Ph7mc8fQ= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241009125450-4290917273fb/go.mod h1:WbtjuYMnDAYojL3CSWmruc1ecSBgSTggzXJ6F1U6bxw= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd h1:DraA9kRpkyI02OEx7QDbSq3bCYxVFZ78TdOP2rUvHts= -github.com/smartcontractkit/chainlink-common v0.2.3-0.20241001210038-dd59341432bd/go.mod h1:F6WUS6N4mP5ScwpwyTyAJc9/vjR+GXbMCRUOVekQi1g= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d h1:GpGKzVtDWSb8cesHSvm+LQpCJpFdfVPk3us5wAY6ixI= +github.com/smartcontractkit/chainlink-common v0.2.3-0.20241008170407-8bfcea33a98d/go.mod h1:+yz2So1Lp90C3ATfMmqMEJaIr3zfG8e4xHa/3u1kIDk= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7 h1:lTGIOQYLk1Ufn++X/AvZnt6VOcuhste5yp+C157No/Q= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240911175228-daf2600bb7b7/go.mod h1:BMYE1vC/pGmdFSsOJdPrAA0/4gZ0Xo0SxTMdGspBtRo= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240916152957-433914114bd2 h1:yRk4ektpx/UxwarqAfgxUXLrsYXlaNeP1NOwzHGrK2Q= diff --git a/plugins/cmd/capabilities/log-event-trigger/main.go b/plugins/cmd/capabilities/log-event-trigger/main.go index 7cf66f9c847..d01485a743f 100644 --- a/plugins/cmd/capabilities/log-event-trigger/main.go +++ b/plugins/cmd/capabilities/log-event-trigger/main.go @@ -92,6 +92,7 @@ func (cs *LogEventTriggerGRPCService) Initialise( errorLog core.ErrorLog, pipelineRunner core.PipelineRunnerService, relayerSet core.RelayerSet, + oracleFactory core.OracleFactory, ) error { cs.s.Logger.Debugf("Initialising %s", serviceName)