Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(unorderedtx): issues reported in audit (backport #21467) #21655

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,20 @@ If you are still using the legacy wiring, you must enable unordered transactions
}
```

* Create or update the App's `Preblocker()` method to call the unordered tx
manager's `OnNewBlock()` method.

```go
...
app.SetPreblocker(app.PreBlocker)
...

func (app *SimApp) PreBlocker(ctx sdk.Context, req *abci.RequestFinalizeBlock) (*sdk.ResponsePreBlock, error) {
app.UnorderedTxManager.OnNewBlock(ctx.BlockTime())
return app.ModuleManager.PreBlock(ctx, req)
}
```

* Create or update the App's `Close()` method to close the unordered tx manager.
Note, this is critical as it ensures the manager's state is written to file
such that when the node restarts, it can recover the state to provide replay
Expand Down
85 changes: 47 additions & 38 deletions baseapp/abci_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,36 +292,42 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
invalidTxs []sdk.Tx // invalid txs to be removed out of the loop to avoid dead lock
)
h.mempool.SelectBy(ctx, req.Txs, func(memTx sdk.Tx) bool {
signerData, err := h.signerExtAdapter.GetSigners(memTx)
if err != nil {
// propagate the error to the caller
resError = err
return false
}

// If the signers aren't in selectedTxsSignersSeqs then we haven't seen them before
// so we add them and continue given that we don't need to check the sequence.
shouldAdd := true
unorderedTx, ok := memTx.(sdk.TxWithUnordered)
isUnordered := ok && unorderedTx.GetUnordered()
txSignersSeqs := make(map[string]uint64)
for _, signer := range signerData {
signerKey := string(signer.Signer)
seq, ok := selectedTxsSignersSeqs[signerKey]
if !ok {
txSignersSeqs[signerKey] = signer.Sequence
continue

// if the tx is unordered, we don't need to check the sequence, we just add it
if !isUnordered {
signerData, err := h.signerExtAdapter.GetSigners(memTx)
if err != nil {
// propagate the error to the caller
resError = err
return false
}

// If we have seen this signer before in this block, we must make
// sure that the current sequence is seq+1; otherwise is invalid
// and we skip it.
if seq+1 != signer.Sequence {
shouldAdd = false
break
// If the signers aren't in selectedTxsSignersSeqs then we haven't seen them before
// so we add them and continue given that we don't need to check the sequence.
shouldAdd := true
for _, signer := range signerData {
signerKey := string(signer.Signer)
seq, ok := selectedTxsSignersSeqs[signerKey]
if !ok {
txSignersSeqs[signerKey] = signer.Sequence
continue
}

// If we have seen this signer before in this block, we must make
// sure that the current sequence is seq+1; otherwise is invalid
// and we skip it.
if seq+1 != signer.Sequence {
shouldAdd = false
break
}
txSignersSeqs[signerKey] = signer.Sequence
}
if !shouldAdd {
return true
}
txSignersSeqs[signerKey] = signer.Sequence
}
if !shouldAdd {
return true
}

// NOTE: Since transaction verification was already executed in CheckTx,
Expand All @@ -338,18 +344,21 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
}

txsLen := len(h.txSelector.SelectedTxs(ctx))
for sender, seq := range txSignersSeqs {
// If txsLen != selectedTxsNums is true, it means that we've
// added a new tx to the selected txs, so we need to update
// the sequence of the sender.
if txsLen != selectedTxsNums {
selectedTxsSignersSeqs[sender] = seq
} else if _, ok := selectedTxsSignersSeqs[sender]; !ok {
// The transaction hasn't been added but it passed the
// verification, so we know that the sequence is correct.
// So we set this sender's sequence to seq-1, in order
// to avoid unnecessary calls to PrepareProposalVerifyTx.
selectedTxsSignersSeqs[sender] = seq - 1
// If the tx is unordered, we don't need to update the sender sequence.
if !isUnordered {
for sender, seq := range txSignersSeqs {
// If txsLen != selectedTxsNums is true, it means that we've
// added a new tx to the selected txs, so we need to update
// the sequence of the sender.
if txsLen != selectedTxsNums {
selectedTxsSignersSeqs[sender] = seq
} else if _, ok := selectedTxsSignersSeqs[sender]; !ok {
// The transaction hasn't been added but it passed the
// verification, so we know that the sequence is correct.
// So we set this sender's sequence to seq-1, in order
// to avoid unnecessary calls to PrepareProposalVerifyTx.
selectedTxsSignersSeqs[sender] = seq - 1
}
}
}
selectedTxsNums = txsLen
Expand Down
3 changes: 3 additions & 0 deletions runtime/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ func (a *App) Close() error {

// PreBlocker application updates every pre block
func (a *App) PreBlocker(ctx sdk.Context, _ *abci.FinalizeBlockRequest) error {
if a.UnorderedTxManager != nil {
a.UnorderedTxManager.OnNewBlock(ctx.BlockTime())
}
return a.ModuleManager.PreBlock(ctx)
}

Expand Down
1 change: 1 addition & 0 deletions simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ func (app *SimApp) Name() string { return app.BaseApp.Name() }

// PreBlocker application updates every pre block
func (app *SimApp) PreBlocker(ctx sdk.Context, _ *abci.FinalizeBlockRequest) error {
app.UnorderedTxManager.OnNewBlock(ctx.BlockTime())
return app.ModuleManager.PreBlock(ctx)
}

Expand Down
19 changes: 19 additions & 0 deletions types/mempool/priority_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error
sender := sig.Signer.String()
priority := mp.cfg.TxPriority.GetTxPriority(ctx, tx)
nonce := sig.Sequence

// if it's an unordered tx, we use the gas instead of the nonce
if unordered, ok := tx.(sdk.TxWithUnordered); ok && unordered.GetUnordered() {
gasLimit, err := unordered.GetGasLimit()
nonce = gasLimit
if err != nil {
return err
}
}

key := txMeta[C]{nonce: nonce, priority: priority, sender: sender}

senderIndex, ok := mp.senderIndices[sender]
Expand Down Expand Up @@ -458,6 +468,15 @@ func (mp *PriorityNonceMempool[C]) Remove(tx sdk.Tx) error {
sender := sig.Signer.String()
nonce := sig.Sequence

// if it's an unordered tx, we use the gas instead of the nonce
if unordered, ok := tx.(sdk.TxWithUnordered); ok && unordered.GetUnordered() {
gasLimit, err := unordered.GetGasLimit()
nonce = gasLimit
if err != nil {
return err
}
}

scoreKey := txMeta[C]{nonce: nonce, sender: sender}
score, ok := mp.scores[scoreKey]
if !ok {
Expand Down
18 changes: 18 additions & 0 deletions types/mempool/sender_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
snm.senders[sender] = senderTxs
}

// if it's an unordered tx, we use the gas instead of the nonce
if unordered, ok := tx.(sdk.TxWithUnordered); ok && unordered.GetUnordered() {
gasLimit, err := unordered.GetGasLimit()
nonce = gasLimit
if err != nil {
return err
}
}

senderTxs.Set(nonce, tx)

key := txKey{nonce: nonce, address: sender}
Expand Down Expand Up @@ -227,6 +236,15 @@ func (snm *SenderNonceMempool) Remove(tx sdk.Tx) error {
sender := sdk.AccAddress(sig.PubKey.Address()).String()
nonce := sig.Sequence

// if it's an unordered tx, we use the gas instead of the nonce
if unordered, ok := tx.(sdk.TxWithUnordered); ok && unordered.GetUnordered() {
gasLimit, err := unordered.GetGasLimit()
nonce = gasLimit
if err != nil {
return err
}
}

senderTxs, found := snm.senders[sender]
if !found {
return ErrTxNotFound
Expand Down
18 changes: 9 additions & 9 deletions x/auth/ante/unorderedtx/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ type Manager struct {
func NewManager(dataDir string) *Manager {
path := filepath.Join(dataDir, dirName)
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
_ = os.Mkdir(path, os.ModePerm)
if err = os.MkdirAll(path, os.ModePerm); err != nil {
panic(fmt.Errorf("failed to create unordered txs directory: %w", err))
}
}

m := &Manager{
Expand Down Expand Up @@ -157,18 +159,14 @@ func (m *Manager) OnNewBlock(blockTime time.Time) {
m.blockCh <- blockTime
}

func (m *Manager) exportSnapshot(height uint64, snapshotWriter func([]byte) error) error {
func (m *Manager) exportSnapshot(_ uint64, snapshotWriter func([]byte) error) error {
var buf bytes.Buffer
w := bufio.NewWriter(&buf)

keys := slices.SortedFunc(maps.Keys(m.txHashes), func(i, j TxHash) int { return bytes.Compare(i[:], j[:]) })
timestamp := time.Unix(int64(height), 0)
for _, txHash := range keys {
timeoutTime := m.txHashes[txHash]
if timestamp.After(timeoutTime) {
// skip expired txs that have yet to be purged
continue
}

// right now we dont have access block time at this flow, so we would just include the expired txs
// and let it be purge during purge loop
chunk := unorderedTxToBytes(txHash, uint64(timeoutTime.Unix()))
Expand All @@ -185,8 +183,8 @@ func (m *Manager) exportSnapshot(height uint64, snapshotWriter func([]byte) erro
return snapshotWriter(buf.Bytes())
}

// flushToFile writes all unexpired unordered transactions along with their TTL
// to file, overwriting the existing file if it exists.
// flushToFile writes all unordered transactions (including expired if not pruned yet)
// along with their TTL to file, overwriting the existing file if it exists.
func (m *Manager) flushToFile() error {
f, err := os.Create(filepath.Join(m.dataDir, dirName, fileName))
if err != nil {
Expand Down Expand Up @@ -251,6 +249,8 @@ func (m *Manager) purgeLoop() {
}
}

// batchReceive receives block time from the channel until the context is done
// or the channel is closed.
func (m *Manager) batchReceive() (time.Time, bool) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
5 changes: 3 additions & 2 deletions x/auth/ante/unorderedtx/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ func (s *Snapshotter) restore(height uint64, payloadReader snapshot.ExtensionPay

timestamp := binary.BigEndian.Uint64(payload[i+txHashSize : i+chunkSize])

// purge any expired txs
if timestamp != 0 && timestamp > height {
// add all txs, we don't care at this point if they are expired,
// we'll let the purge loop handle that
if timestamp != 0 {
s.m.Add(txHash, time.Unix(int64(timestamp), 0))
}

Expand Down
14 changes: 11 additions & 3 deletions x/auth/ante/unorderedtx/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,20 @@ func TestSnapshotter(t *testing.T) {
err = s.RestoreExtension(50, 2, pr)
require.Error(t, err)

// restore with timestamp > timeout time which should result in no unordered txs synced
// restore with timestamp > timeout time which should result in all unordered txs synced,
// even the ones that have timed out.
txm2 := unorderedtx.NewManager(dataDir)
s2 := unorderedtx.NewSnapshotter(txm2)
err = s2.RestoreExtension(uint64(currentTime.Add(time.Second*200).Unix()), unorderedtx.SnapshotFormat, pr)
err = s2.RestoreExtension(1, unorderedtx.SnapshotFormat, pr)
require.NoError(t, err)
require.Empty(t, txm2.Size())
require.Equal(t, 100, txm2.Size())

// start the manager and wait a bit for the background purge loop to run
txm2.Start()
time.Sleep(time.Millisecond * 5)
txm2.OnNewBlock(currentTime.Add(time.Second * 200))
time.Sleep(time.Second * 5) // the loop runs every 5 seconds, so we need to wait for that
require.Equal(t, 0, txm2.Size())

// restore with timestamp < timeout time which should result in all unordered txs synced
txm3 := unorderedtx.NewManager(dataDir)
Expand Down
Loading