Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose CheckAttempt to TxManager, saves some unnecessary calls #1327

Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 44 additions & 7 deletions core/adapters/eth_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/store"
strpkg "github.com/smartcontractkit/chainlink/core/store"
"github.com/smartcontractkit/chainlink/core/store/models"
"github.com/smartcontractkit/chainlink/core/utils"
"gopkg.in/guregu/null.v3"
Expand All @@ -33,7 +33,7 @@ type EthTx struct {
// Perform creates the run result for the transaction if the existing run result
// is not currently pending. Then it confirms the transaction was confirmed on
// the blockchain.
func (etx *EthTx) Perform(input models.RunResult, store *store.Store) models.RunResult {
func (etx *EthTx) Perform(input models.RunResult, store *strpkg.Store) models.RunResult {
if !store.TxManager.Connected() {
input.MarkPendingConnection()
return input
Expand Down Expand Up @@ -69,7 +69,7 @@ func getTxData(e *EthTx, input *models.RunResult) ([]byte, error) {
func createTxRunResult(
e *EthTx,
input *models.RunResult,
store *store.Store,
store *strpkg.Store,
) {
value, err := getTxData(e, input)
if err != nil {
Expand All @@ -96,10 +96,38 @@ func createTxRunResult(
}

input.ApplyResult(tx.Hash.String())
ensureTxRunResult(input, store)

txAttempt := tx.Attempts[0]
logger.Debugw(
fmt.Sprintf("Tx #0 checking on-chain state"),
"txHash", txAttempt.Hash.String(),
"txID", txAttempt.TxID,
)

receipt, state, err := store.TxManager.CheckAttempt(txAttempt, tx.SentAt)
if err != nil {
input.SetError(err)
return
}

logger.Debugw(
fmt.Sprintf("Tx #0 is %s", state),
"txHash", txAttempt.Hash.String(),
"txID", txAttempt.TxID,
"receiptBlockNumber", receipt.BlockNumber.ToInt(),
"currentBlockNumber", tx.SentAt,
"receiptHash", receipt.Hash.Hex(),
)

if state != strpkg.Safe {
input.MarkPendingConfirmations()
return
}

addReceiptToResult(receipt, input)
}

func ensureTxRunResult(input *models.RunResult, str *store.Store) {
func ensureTxRunResult(input *models.RunResult, str *strpkg.Store) {
val, err := input.ResultString()
if err != nil {
input.SetError(err)
Expand All @@ -112,14 +140,23 @@ func ensureTxRunResult(input *models.RunResult, str *store.Store) {
return
}

receipt, err := str.TxManager.BumpGasUntilSafe(hash)
receipt, state, err := str.TxManager.BumpGasUntilSafe(hash)
if err != nil {
if state == strpkg.Unknown {
input.SetError(err)
return
}

// We failed to get one of the TxAttempt receipts, so we won't mark this
// run as errored in order to try again
logger.Warn("EthTx Adapter Perform Resuming: ", err)
}
if receipt == nil {

if state != strpkg.Safe {
input.MarkPendingConfirmations()
return
}

addReceiptToResult(receipt, input)
}

Expand Down
80 changes: 54 additions & 26 deletions core/adapters/eth_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/smartcontractkit/chainlink/core/adapters"
"github.com/smartcontractkit/chainlink/core/internal/cltest"
"github.com/smartcontractkit/chainlink/core/internal/mocks"
strpkg "github.com/smartcontractkit/chainlink/core/store"
"github.com/smartcontractkit/chainlink/core/store/models"
"github.com/smartcontractkit/chainlink/core/utils"
"github.com/stretchr/testify/assert"
Expand All @@ -25,7 +26,6 @@ func TestEthTxAdapter_Perform_Confirmed(t *testing.T) {
app, cleanup := cltest.NewApplicationWithKey(t)
defer cleanup()
store := app.Store
config := store.Config

address := cltest.NewAddress()
fHash := models.HexToFunctionSelector("b3f98adc")
Expand All @@ -40,7 +40,6 @@ func TestEthTxAdapter_Perform_Confirmed(t *testing.T) {
hash := cltest.NewHash()
sentAt := uint64(23456)
confirmed := sentAt + 1
safe := confirmed + config.MinOutgoingConfirmations()
ethMock.Register("eth_sendRawTransaction", hash,
func(_ interface{}, data ...interface{}) error {
rlp := data[0].([]interface{})[0].(string)
Expand All @@ -57,7 +56,6 @@ func TestEthTxAdapter_Perform_Confirmed(t *testing.T) {
ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt))
receipt := models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(confirmed)}
ethMock.Register("eth_getTransactionReceipt", receipt)
ethMock.Register("eth_blockNumber", utils.Uint64ToHex(safe))

adapter := adapters.EthTx{
Address: address,
Expand All @@ -84,7 +82,6 @@ func TestEthTxAdapter_Perform_ConfirmedWithBytes(t *testing.T) {
app, cleanup := cltest.NewApplicationWithKey(t)
defer cleanup()
store := app.Store
config := store.Config

address := cltest.NewAddress()
fHash := models.HexToFunctionSelector("b3f98adc")
Expand All @@ -99,7 +96,6 @@ func TestEthTxAdapter_Perform_ConfirmedWithBytes(t *testing.T) {
hash := cltest.NewHash()
sentAt := uint64(23456)
confirmed := sentAt + 1
safe := confirmed + config.MinOutgoingConfirmations()
ethMock.Register("eth_sendRawTransaction", hash,
func(_ interface{}, data ...interface{}) error {
rlp := data[0].([]interface{})[0].(string)
Expand All @@ -118,7 +114,6 @@ func TestEthTxAdapter_Perform_ConfirmedWithBytes(t *testing.T) {
ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt))
receipt := models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(confirmed)}
ethMock.Register("eth_getTransactionReceipt", receipt)
ethMock.Register("eth_blockNumber", utils.Uint64ToHex(safe))

adapter := adapters.EthTx{
Address: address,
Expand Down Expand Up @@ -146,7 +141,6 @@ func TestEthTxAdapter_Perform_ConfirmedWithBytesAndNoDataPrefix(t *testing.T) {
app, cleanup := cltest.NewApplicationWithKey(t)
defer cleanup()
store := app.Store
config := store.Config

address := cltest.NewAddress()
fHash := models.HexToFunctionSelector("b3f98adc")
Expand All @@ -159,8 +153,6 @@ func TestEthTxAdapter_Perform_ConfirmedWithBytesAndNoDataPrefix(t *testing.T) {

hash := cltest.NewHash()
sentAt := uint64(23456)
confirmed := sentAt + 1
safe := confirmed + config.MinOutgoingConfirmations()
ethMock.Register("eth_sendRawTransaction", hash,
func(_ interface{}, data ...interface{}) error {
rlp := data[0].([]interface{})[0].(string)
Expand All @@ -176,9 +168,9 @@ func TestEthTxAdapter_Perform_ConfirmedWithBytesAndNoDataPrefix(t *testing.T) {
return nil
})
ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt))
receipt := models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(confirmed)}
safe := sentAt - store.Config.MinOutgoingConfirmations()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional

This line originally threw me off, because conceptually it's impossible for something to be safe before something that's sent. I later discovered you're using the receipt's BlockNumber to gauge whether something is safe so this works out.

If possible, I would recommend renaming sentAt to something else, perhaps currentHeight?

I would also rename the test TestEthTxAdapter_Perform_ConfirmedWithBytesAndNoDataPrefix from using the language Confirmed to using the more appropriate language Safe: TestEthTxAdapter_Perform_SafeWithBytesAndNoDataPrefix

receipt := models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(safe)}
ethMock.Register("eth_getTransactionReceipt", receipt)
ethMock.Register("eth_blockNumber", utils.Uint64ToHex(safe))

adapter := adapters.EthTx{
Address: address,
Expand Down Expand Up @@ -422,27 +414,65 @@ func TestEthTxAdapter_Perform_WithErrorInvalidInput(t *testing.T) {
assert.Contains(t, output.Error(), "Cannot connect to nodes")
}

func TestEthTxAdapter_Perform_PendingConfirmations_WithErrorInTxManager(t *testing.T) {
func TestEthTxAdapter_Perform_PendingConfirmations_WithFatalErrorInTxManager(t *testing.T) {
t.Parallel()

app, cleanup := cltest.NewApplicationWithKey(t)
defer cleanup()

store := app.Store
ethMock := app.MockEthClient()
ethMock.Register("eth_getTransactionCount", `0x0100`)
ethMock := app.MockEthClient(cltest.Strict)
ethMock.Register("eth_getTransactionCount", `0x17`)
assert.Nil(t, app.Start())

require.NoError(t, app.WaitForConnection())

adapter := adapters.EthTx{
Address: cltest.NewAddress(),
FunctionSelector: models.HexToFunctionSelector("0xb3f98adc"),
}
input := cltest.RunResultWithResult("")
input := cltest.RunResultWithResult(cltest.NewHash().String())
input.Status = models.RunStatusPendingConfirmations
ethMock.RegisterError("eth_blockNumber", "Cannot connect to nodes")
ethMock.RegisterError("eth_blockNumber", "Invalid node id")
output := adapter.Perform(input, store)

assert.False(t, output.HasError())
ethMock.AssertAllCalled()

assert.Equal(t, models.RunStatusErrored, output.Status)
assert.NotNil(t, output.Error())
}

func TestEthTxAdapter_Perform_PendingConfirmations_WithRecoverableErrorInTxManager(t *testing.T) {
t.Parallel()

app, cleanup := cltest.NewApplicationWithKey(t)
defer cleanup()

store := app.Store
ethMock := app.MockEthClient(cltest.Strict)
ethMock.Register("eth_getTransactionCount", `0x12`)
assert.Nil(t, app.Start())

from := cltest.GetAccountAddress(t, store)
tx := cltest.CreateTx(t, store, from, uint64(14372))
input := cltest.RunResultWithResult(tx.Attempts[0].Hash.String())
input.Status = models.RunStatusPendingConfirmations

ethMock.Register("eth_blockNumber", "0x100")
ethMock.RegisterError("eth_getTransactionReceipt", "Connection reset by peer")

require.NoError(t, app.WaitForConnection())

adapter := adapters.EthTx{
Address: cltest.NewAddress(),
FunctionSelector: models.HexToFunctionSelector("0xb3f98adc"),
}
output := adapter.Perform(input, store)

ethMock.AssertAllCalled()

assert.Equal(t, models.RunStatusPendingConfirmations, output.Status)
assert.NoError(t, output.GetError())
}

func TestEthTxAdapter_DeserializationBytesFormat(t *testing.T) {
Expand All @@ -451,15 +481,18 @@ func TestEthTxAdapter_DeserializationBytesFormat(t *testing.T) {
ctrl := gomock.NewController(t)
txmMock := mocks.NewMockTxManager(ctrl)
store.TxManager = txmMock

txAttempt := &models.TxAttempt{}
tx := &models.Tx{Attempts: []*models.TxAttempt{txAttempt}}
txmMock.EXPECT().Register(gomock.Any())
txmMock.EXPECT().Connected().Return(true).AnyTimes()
txmMock.EXPECT().CreateTxWithGas(gomock.Any(), gomock.Any(), hexutil.MustDecode(
"0x00000000"+
"0000000000000000000000000000000000000000000000000000000000000020"+
"000000000000000000000000000000000000000000000000000000000000000b"+
"68656c6c6f20776f726c64000000000000000000000000000000000000000000"),
gomock.Any(), gomock.Any()).Return(&models.Tx{}, nil)
txmMock.EXPECT().BumpGasUntilSafe(gomock.Any())
gomock.Any(), gomock.Any()).Return(tx, nil)
txmMock.EXPECT().CheckAttempt(txAttempt, uint64(0)).Return(&models.TxReceipt{}, strpkg.Unconfirmed, nil)

task := models.TaskSpec{}
err := json.Unmarshal([]byte(`{"type": "EthTx", "params": {"format": "bytes"}}`), &task)
Expand Down Expand Up @@ -541,8 +574,6 @@ func TestEthTxAdapter_Perform_NoDoubleSpendOnSendTransactionFail(t *testing.T) {
app, cleanup := cltest.NewApplicationWithKey(t)
defer cleanup()
store := app.Store
config := store.Config

ethMock := app.MockEthClient(cltest.Strict)
ethMock.Register("eth_getTransactionCount", `0x1`)

Expand All @@ -558,8 +589,6 @@ func TestEthTxAdapter_Perform_NoDoubleSpendOnSendTransactionFail(t *testing.T) {

hash := cltest.NewHash()
sentAt := uint64(9183)
ethMock.Register("eth_getBalance", "0x100")
ethMock.Register("eth_call", "0x100")

var firstTxData []interface{}
ethMock.Register("eth_sendRawTransaction", hash,
Expand All @@ -582,7 +611,6 @@ func TestEthTxAdapter_Perform_NoDoubleSpendOnSendTransactionFail(t *testing.T) {
// Run the adapter again

confirmed := sentAt + 1
safe := confirmed + config.MinOutgoingConfirmations()
ethMock.Register("eth_blockNumber", utils.Uint64ToHex(confirmed))

var secondTxData []interface{}
Expand All @@ -593,7 +621,6 @@ func TestEthTxAdapter_Perform_NoDoubleSpendOnSendTransactionFail(t *testing.T) {
})
receipt := models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(confirmed)}
ethMock.Register("eth_getTransactionReceipt", receipt)
ethMock.Register("eth_blockNumber", utils.Uint64ToHex(safe))

data = adapter.Perform(input, store)
assert.NoError(t, data.GetError())
Expand Down Expand Up @@ -647,7 +674,8 @@ func TestEthTxAdapter_Perform_NoDoubleSpendOnSendTransactionFailAndNonceChange(t
firstTxData = data
return errors.New("no bueno")
})
ethMock.Register("eth_blockNumber", utils.Uint64ToHex(sentAt))
receipt := models.TxReceipt{Hash: hash, BlockNumber: cltest.Int(sentAt)}
ethMock.Register("eth_getTransactionReceipt", receipt)

adapter := adapters.EthTx{
Address: address,
Expand Down
5 changes: 5 additions & 0 deletions core/internal/cltest/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ func (mock *EthMock) EventuallyAllCalled(t *testing.T) {
g.Eventually(mock.Remaining).Should(gomega.HaveLen(0))
}

// AssertAllCalled immediately checks that all calls have been made
func (mock *EthMock) AssertAllCalled() {
assert.Empty(mock.t, mock.Remaining())
}

// Call will call given method and set the result
func (mock *EthMock) Call(result interface{}, method string, args ...interface{}) error {
mock.mutex.Lock()
Expand Down
15 changes: 6 additions & 9 deletions core/internal/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func TestIntegration_HttpRequestWithHeaders(t *testing.T) {
eth.Context("ethTx.Perform()#1 at block 23456", func(eth *cltest.EthMock) {
eth.Register("eth_blockNumber", utils.Uint64ToHex(sentAt))
eth.Register("eth_sendRawTransaction", attempt1Hash) // Initial tx attempt sent
eth.Register("eth_blockNumber", utils.Uint64ToHex(sentAt))
eth.Register("eth_getTransactionReceipt", unconfirmedReceipt)
})
j := cltest.CreateHelloWorldJobViaWeb(t, app, mockServer.URL)
Expand Down Expand Up @@ -133,7 +132,6 @@ func TestIntegration_FeeBump(t *testing.T) {
eth.Context("ethTx.Perform()#1 at block 23456", func(eth *cltest.EthMock) {
eth.Register("eth_blockNumber", utils.Uint64ToHex(sentAt))
eth.Register("eth_sendRawTransaction", attempt1Hash) // Initial tx attempt sent
eth.Register("eth_blockNumber", utils.Uint64ToHex(sentAt))
eth.Register("eth_getTransactionReceipt", unconfirmedReceipt)
})
j := cltest.CreateHelloWorldJobViaWeb(t, app, mockServer.URL)
Expand Down Expand Up @@ -572,22 +570,21 @@ func TestIntegration_NonceManagement_firstRunWithExistingTxs(t *testing.T) {

eth := app.MockEthClient()
eth.Context("app.Start()", func(eth *cltest.EthMock) {
eth.Register("eth_getTransactionCount", `0x0100`) // activate account nonce
eth.Register("eth_getTransactionCount", `0x100`) // activate account nonce
})
require.NoError(t, app.StartAndConnect())

createCompletedJobRun := func(blockNumber uint64, expectedNonce uint64) {
hash := common.HexToHash("0xb7862c896a6ba2711bccc0410184e46d793ea83b3e05470f1d359ea276d16bb5")
confirmedBlockNumber := blockNumber - app.Store.Config.MinOutgoingConfirmations()

eth.Context("ethTx.Perform()", func(eth *cltest.EthMock) {
eth.Register("eth_blockNumber", utils.Uint64ToHex(blockNumber))
eth.Register("eth_sendRawTransaction", hash)

eth.Register("eth_getTransactionReceipt", models.TxReceipt{
Hash: hash,
BlockNumber: cltest.Int(blockNumber),
BlockNumber: cltest.Int(confirmedBlockNumber),
})
confirmedBlockNumber := blockNumber + app.Store.Config.MinOutgoingConfirmations()
eth.Register("eth_blockNumber", utils.Uint64ToHex(confirmedBlockNumber))
})

jr := cltest.CreateJobRunViaWeb(t, app, j, `{"result":"0x11"}`)
Expand All @@ -599,8 +596,8 @@ func TestIntegration_NonceManagement_firstRunWithExistingTxs(t *testing.T) {
assert.Equal(t, expectedNonce, tx.Nonce)
}

createCompletedJobRun(100, uint64(0x0100))
createCompletedJobRun(200, uint64(0x0101))
createCompletedJobRun(100, uint64(0x100))
createCompletedJobRun(200, uint64(0x101))
}

func TestIntegration_CreateServiceAgreement(t *testing.T) {
Expand Down
Loading