Skip to content

Commit

Permalink
Merge branch 'develop' into ks-444/contract-deploy
Browse files Browse the repository at this point in the history
  • Loading branch information
krehermann authored Oct 9, 2024
2 parents 36916d7 + 5ca0d1f commit 85c1b8f
Show file tree
Hide file tree
Showing 43 changed files with 1,497 additions and 146 deletions.
9 changes: 9 additions & 0 deletions .changeset/brave-ads-explode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"chainlink": patch
---

Remove finality depth as the default value for minConfirmation for tx jobs.
Update the sql query for fetching pending callback transactions:
if minConfirmation is not null, we check difference if the current block - tx block > minConfirmation
else we check if the tx block is <= finalizedBlock
#updated
5 changes: 5 additions & 0 deletions .changeset/chilled-months-bow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added oracle support in standard capabilities
5 changes: 5 additions & 0 deletions .changeset/metal-meals-mix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Adjustments for usdc reader tests #internal
5 changes: 5 additions & 0 deletions .changeset/old-humans-watch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Fix TXM flakey test #internal
6 changes: 3 additions & 3 deletions common/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) pro

if ec.resumeCallback != nil {
mark = time.Now()
if err := ec.ResumePendingTaskRuns(ctx, head); err != nil {
if err := ec.ResumePendingTaskRuns(ctx, head.BlockNumber(), latestFinalizedHead.BlockNumber()); err != nil {
return fmt.Errorf("ResumePendingTaskRuns failed: %w", err)
}

Expand Down Expand Up @@ -1259,8 +1259,8 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) sen
}

// ResumePendingTaskRuns issues callbacks to task runs that are pending waiting for receipts
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, head types.Head[BLOCK_HASH]) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, head.BlockNumber(), ec.chainID)
func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) ResumePendingTaskRuns(ctx context.Context, latest, finalized int64) error {
receiptsPlus, err := ec.txStore.FindTxesPendingCallback(ctx, latest, finalized, ec.chainID)

if err != nil {
return err
Expand Down
31 changes: 16 additions & 15 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type TxStore[
TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]

// Find confirmed txes beyond the minConfirmations param that require callback but have not yet been signaled
FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID CHAIN_ID) (receiptsPlus []ReceiptPlus[R], err error)
// Update tx to mark that its callback has been signaled
UpdateTxCallbackCompleted(ctx context.Context, pipelineTaskRunRid uuid.UUID, chainId CHAIN_ID) error
SaveFetchedReceipts(ctx context.Context, r []R, state TxState, errorMsg *string, chainID CHAIN_ID) error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/crypto"

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
Expand All @@ -40,6 +38,8 @@ import (
)

func Test_USDCReader_MessageHashes(t *testing.T) {
finalityDepth := 5

ctx := testutils.Context(t)
ethereumChain := cciptypes.ChainSelector(sel.ETHEREUM_MAINNET_OPTIMISM_1.Selector)
ethereumDomainCCTP := reader.CCTPDestDomains[uint64(ethereumChain)]
Expand All @@ -48,9 +48,10 @@ func Test_USDCReader_MessageHashes(t *testing.T) {
polygonChain := cciptypes.ChainSelector(sel.POLYGON_MAINNET.Selector)
polygonDomainCCTP := reader.CCTPDestDomains[uint64(polygonChain)]

ts := testSetup(ctx, t, ethereumChain, evmconfig.USDCReaderConfig)
ts := testSetup(ctx, t, ethereumChain, evmconfig.USDCReaderConfig, finalityDepth)

usdcReader, err := reader.NewUSDCMessageReader(
logger.TestLogger(t),
map[cciptypes.ChainSelector]pluginconfig.USDCCCTPTokenConfig{
ethereumChain: {
SourceMessageTransmitterAddr: ts.contractAddr.String(),
Expand All @@ -67,6 +68,11 @@ func Test_USDCReader_MessageHashes(t *testing.T) {
emitMessageSent(t, ts, ethereumDomainCCTP, avalancheDomainCCTP, 41)
emitMessageSent(t, ts, ethereumDomainCCTP, polygonDomainCCTP, 31)
emitMessageSent(t, ts, ethereumDomainCCTP, polygonDomainCCTP, 41)
// Finalize events
for i := 0; i < finalityDepth; i++ {
ts.sb.Commit()
}
emitMessageSent(t, ts, ethereumDomainCCTP, avalancheDomainCCTP, 51)

// Need to replay as sometimes the logs are not picked up by the log poller (?)
// Maybe another situation where chain reader doesn't register filters as expected.
Expand Down Expand Up @@ -167,25 +173,30 @@ func Test_USDCReader_MessageHashes(t *testing.T) {
reader.NewMessageTokenID(1, 3),
},
},
{
name: "not finalized events are not returned",
tokens: map[reader.MessageTokenID]cciptypes.RampTokenAmount{
reader.NewMessageTokenID(1, 5): {
ExtraData: reader.NewSourceTokenDataPayload(51, ethereumDomainCCTP).ToBytes(),
},
},
sourceChain: ethereumChain,
destChain: avalancheChain,
expectedMsgIDs: []reader.MessageTokenID{},
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
require.Eventually(t, func() bool {
hashes, err1 := usdcReader.MessageHashes(ctx, tc.sourceChain, tc.destChain, tc.tokens)
require.NoError(t, err1)
hashes, err1 := usdcReader.MessageHashes(ctx, tc.sourceChain, tc.destChain, tc.tokens)
require.NoError(t, err1)

if len(tc.expectedMsgIDs) != len(hashes) {
return false
}
require.Equal(t, len(tc.expectedMsgIDs), len(hashes))

for _, msgID := range tc.expectedMsgIDs {
if _, ok := hashes[msgID]; !ok {
return false
}
}
return true
}, tests.WaitTimeout(t), 50*time.Millisecond)
for _, msgID := range tc.expectedMsgIDs {
_, ok := hashes[msgID]
require.True(t, ok)
}
})
}
}
Expand All @@ -207,7 +218,7 @@ func emitMessageSent(t *testing.T, testEnv *testSetupData, source, dest uint32,
testEnv.sb.Commit()
}

func testSetup(ctx context.Context, t *testing.T, readerChain cciptypes.ChainSelector, cfg evmtypes.ChainReaderConfig) *testSetupData {
func testSetup(ctx context.Context, t *testing.T, readerChain cciptypes.ChainSelector, cfg evmtypes.ChainReaderConfig, depth int) *testSetupData {
const chainID = 1337

// Generate a new key pair for the simulated account
Expand Down Expand Up @@ -239,7 +250,7 @@ func testSetup(ctx context.Context, t *testing.T, readerChain cciptypes.ChainSel
db := pgtest.NewSqlxDB(t)
lpOpts := logpoller.Opts{
PollPeriod: time.Millisecond,
FinalityDepth: 0,
FinalityDepth: int64(depth),
BackfillBatchSize: 10,
RpcBatchSize: 10,
KeepFinalizedBlocksDepth: 100000,
Expand Down
14 changes: 14 additions & 0 deletions core/capabilities/ccip/configs/evm/contract_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ var USDCReaderConfig = evmrelaytypes.ChainReaderConfig{
consts.EventNameCCTPMessageSent: {
ChainSpecificName: consts.EventNameCCTPMessageSent,
ReadType: evmrelaytypes.Event,
EventDefinitions: &evmrelaytypes.EventDefinitions{
GenericDataWordDetails: map[string]evmrelaytypes.DataWordDetail{
consts.CCTPMessageSentValue: {
Name: consts.CCTPMessageSentValue,
// Filtering by the 3rd word (indexing starts from 0) so it's ptr(2)
Index: ptr(2),
Type: "bytes32",
},
},
},
},
},
},
Expand Down Expand Up @@ -327,3 +337,7 @@ func mustGetEventName(event string, tabi abi.ABI) string {
}
return e.Name
}

func ptr[T any](v T) *T {
return &v
}
10 changes: 5 additions & 5 deletions core/chains/evm/txmgr/confirmer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3055,7 +3055,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
// It would only be in a state past suspended if the resume callback was called and callback_completed was set to TRUE
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE, callback_completed = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
require.NoError(t, err)
})

Expand All @@ -3073,7 +3073,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {

pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
require.NoError(t, err)
})

Expand Down Expand Up @@ -3101,7 +3101,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
t.Cleanup(func() { <-done })
go func() {
defer close(done)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
if !assert.NoError(t, err2) {
return
}
Expand Down Expand Up @@ -3155,7 +3155,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
t.Cleanup(func() { <-done })
go func() {
defer close(done)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err2 := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
if !assert.NoError(t, err2) {
return
}
Expand Down Expand Up @@ -3192,7 +3192,7 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) {
mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, etx.TxAttempts[0].Hash)
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr.ID, minConfirmations, etx.ID)

err := ec.ResumePendingTaskRuns(tests.Context(t), &head)
err := ec.ResumePendingTaskRuns(tests.Context(t), head.Number, 0)
require.Error(t, err)

// Retrieve Tx to check if callback completed flag was left unchanged
Expand Down
10 changes: 7 additions & 3 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ WHERE evm.tx_attempts.state = 'in_progress' AND evm.txes.from_address = $1 AND e
}

// Find confirmed txes requiring callback but have not yet been signaled
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, latest, finalized int64, chainID *big.Int) (receiptsPlus []ReceiptPlus, err error) {
var rs []dbReceiptPlus

var cancel context.CancelFunc
Expand All @@ -1066,8 +1066,12 @@ func (o *evmTxStore) FindTxesPendingCallback(ctx context.Context, blockNum int64
INNER JOIN evm.tx_attempts ON evm.txes.id = evm.tx_attempts.eth_tx_id
INNER JOIN evm.receipts ON evm.tx_attempts.hash = evm.receipts.tx_hash
WHERE evm.txes.pipeline_task_run_id IS NOT NULL AND evm.txes.signal_callback = TRUE AND evm.txes.callback_completed = FALSE
AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations) AND evm.txes.evm_chain_id = $2
`, blockNum, chainID.String())
AND (
(evm.txes.min_confirmations IS NOT NULL AND evm.receipts.block_number <= ($1 - evm.txes.min_confirmations))
OR (evm.txes.min_confirmations IS NULL AND evm.receipts.block_number <= $2)
)
AND evm.txes.evm_chain_id = $3
`, latest, finalized, chainID.String())
if err != nil {
return nil, fmt.Errorf("failed to retrieve transactions pending pipeline resume callback: %w", err)
}
Expand Down
24 changes: 20 additions & 4 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
etx1 := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress)
pgtest.MustExec(t, db, `UPDATE evm.txes SET meta='{"FailOnRevert": true}'`)
attempt1 := etx1.TxAttempts[0]
mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash)
etxBlockNum := mustInsertEthReceipt(t, txStore, head.Number-minConfirmations, head.Hash, attempt1.Hash).BlockNumber
pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, min_confirmations = $2, signal_callback = TRUE WHERE id = $3`, &tr1.ID, minConfirmations, etx1.ID)

// Callback to pipeline service completed. Should be ignored
Expand Down Expand Up @@ -685,10 +685,26 @@ func TestORM_FindTxesPendingCallback(t *testing.T) {
pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = $1 WHERE id = $2`, minConfirmations, etx5.ID)

// Search evm.txes table for tx requiring callback
receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, ethClient.ConfiguredChainID())
receiptsPlus, err := txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID())
require.NoError(t, err)
assert.Len(t, receiptsPlus, 1)
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
if assert.Len(t, receiptsPlus, 1) {
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
}

// Clear min_confirmations
pgtest.MustExec(t, db, `UPDATE evm.txes SET min_confirmations = NULL WHERE id = $1`, etx1.ID)

// Search evm.txes table for tx requiring callback
receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, 0, ethClient.ConfiguredChainID())
require.NoError(t, err)
assert.Empty(t, receiptsPlus)

// Search evm.txes table for tx requiring callback, with block 1 finalized
receiptsPlus, err = txStore.FindTxesPendingCallback(tests.Context(t), head.Number, etxBlockNum, ethClient.ConfiguredChainID())
require.NoError(t, err)
if assert.Len(t, receiptsPlus, 1) {
assert.Equal(t, tr1.ID, receiptsPlus[0].ID)
}
}

func Test_FindTxWithIdempotencyKey(t *testing.T) {
Expand Down
Loading

0 comments on commit 85c1b8f

Please sign in to comment.