diff --git a/UPGRADING.md b/UPGRADING.md index bf8b2fa8b0d1..d1b769cd0596 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -236,6 +236,20 @@ If you are still using the legacy wiring, you must enable unordered transactions } ``` +* Create or update the App's `Preblocker()` method to call the unordered tx + manager's `OnNewBlock()` method. + + ```go + ... + app.SetPreblocker(app.PreBlocker) + ... + + func (app *SimApp) PreBlocker(ctx sdk.Context, req *abci.RequestFinalizeBlock) (*sdk.ResponsePreBlock, error) { + app.UnorderedTxManager.OnNewBlock(ctx.BlockTime()) + return app.ModuleManager.PreBlock(ctx, req) + } + ``` + * Create or update the App's `Close()` method to close the unordered tx manager. Note, this is critical as it ensures the manager's state is written to file such that when the node restarts, it can recover the state to provide replay diff --git a/baseapp/abci_utils.go b/baseapp/abci_utils.go index f379415e7849..63c13fa8620b 100644 --- a/baseapp/abci_utils.go +++ b/baseapp/abci_utils.go @@ -292,36 +292,42 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan invalidTxs []sdk.Tx // invalid txs to be removed out of the loop to avoid dead lock ) h.mempool.SelectBy(ctx, req.Txs, func(memTx sdk.Tx) bool { - signerData, err := h.signerExtAdapter.GetSigners(memTx) - if err != nil { - // propagate the error to the caller - resError = err - return false - } - - // If the signers aren't in selectedTxsSignersSeqs then we haven't seen them before - // so we add them and continue given that we don't need to check the sequence. - shouldAdd := true + unorderedTx, ok := memTx.(sdk.TxWithUnordered) + isUnordered := ok && unorderedTx.GetUnordered() txSignersSeqs := make(map[string]uint64) - for _, signer := range signerData { - signerKey := string(signer.Signer) - seq, ok := selectedTxsSignersSeqs[signerKey] - if !ok { - txSignersSeqs[signerKey] = signer.Sequence - continue + + // if the tx is unordered, we don't need to check the sequence, we just add it + if !isUnordered { + signerData, err := h.signerExtAdapter.GetSigners(memTx) + if err != nil { + // propagate the error to the caller + resError = err + return false } - // If we have seen this signer before in this block, we must make - // sure that the current sequence is seq+1; otherwise is invalid - // and we skip it. - if seq+1 != signer.Sequence { - shouldAdd = false - break + // If the signers aren't in selectedTxsSignersSeqs then we haven't seen them before + // so we add them and continue given that we don't need to check the sequence. + shouldAdd := true + for _, signer := range signerData { + signerKey := string(signer.Signer) + seq, ok := selectedTxsSignersSeqs[signerKey] + if !ok { + txSignersSeqs[signerKey] = signer.Sequence + continue + } + + // If we have seen this signer before in this block, we must make + // sure that the current sequence is seq+1; otherwise is invalid + // and we skip it. + if seq+1 != signer.Sequence { + shouldAdd = false + break + } + txSignersSeqs[signerKey] = signer.Sequence + } + if !shouldAdd { + return true } - txSignersSeqs[signerKey] = signer.Sequence - } - if !shouldAdd { - return true } // NOTE: Since transaction verification was already executed in CheckTx, @@ -338,18 +344,21 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan } txsLen := len(h.txSelector.SelectedTxs(ctx)) - for sender, seq := range txSignersSeqs { - // If txsLen != selectedTxsNums is true, it means that we've - // added a new tx to the selected txs, so we need to update - // the sequence of the sender. - if txsLen != selectedTxsNums { - selectedTxsSignersSeqs[sender] = seq - } else if _, ok := selectedTxsSignersSeqs[sender]; !ok { - // The transaction hasn't been added but it passed the - // verification, so we know that the sequence is correct. - // So we set this sender's sequence to seq-1, in order - // to avoid unnecessary calls to PrepareProposalVerifyTx. - selectedTxsSignersSeqs[sender] = seq - 1 + // If the tx is unordered, we don't need to update the sender sequence. + if !isUnordered { + for sender, seq := range txSignersSeqs { + // If txsLen != selectedTxsNums is true, it means that we've + // added a new tx to the selected txs, so we need to update + // the sequence of the sender. + if txsLen != selectedTxsNums { + selectedTxsSignersSeqs[sender] = seq + } else if _, ok := selectedTxsSignersSeqs[sender]; !ok { + // The transaction hasn't been added but it passed the + // verification, so we know that the sequence is correct. + // So we set this sender's sequence to seq-1, in order + // to avoid unnecessary calls to PrepareProposalVerifyTx. + selectedTxsSignersSeqs[sender] = seq - 1 + } } } selectedTxsNums = txsLen diff --git a/runtime/app.go b/runtime/app.go index 0c2d070bea5b..f9a74a29a208 100644 --- a/runtime/app.go +++ b/runtime/app.go @@ -173,6 +173,9 @@ func (a *App) Close() error { // PreBlocker application updates every pre block func (a *App) PreBlocker(ctx sdk.Context, _ *abci.FinalizeBlockRequest) error { + if a.UnorderedTxManager != nil { + a.UnorderedTxManager.OnNewBlock(ctx.BlockTime()) + } return a.ModuleManager.PreBlock(ctx) } diff --git a/simapp/app.go b/simapp/app.go index 91f0607cf763..a3380a2f8171 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -689,6 +689,7 @@ func (app *SimApp) Name() string { return app.BaseApp.Name() } // PreBlocker application updates every pre block func (app *SimApp) PreBlocker(ctx sdk.Context, _ *abci.FinalizeBlockRequest) error { + app.UnorderedTxManager.OnNewBlock(ctx.BlockTime()) return app.ModuleManager.PreBlock(ctx) } diff --git a/types/mempool/priority_nonce.go b/types/mempool/priority_nonce.go index a8002c61f142..320ffa337229 100644 --- a/types/mempool/priority_nonce.go +++ b/types/mempool/priority_nonce.go @@ -222,6 +222,16 @@ func (mp *PriorityNonceMempool[C]) Insert(ctx context.Context, tx sdk.Tx) error sender := sig.Signer.String() priority := mp.cfg.TxPriority.GetTxPriority(ctx, tx) nonce := sig.Sequence + + // if it's an unordered tx, we use the gas instead of the nonce + if unordered, ok := tx.(sdk.TxWithUnordered); ok && unordered.GetUnordered() { + gasLimit, err := unordered.GetGasLimit() + nonce = gasLimit + if err != nil { + return err + } + } + key := txMeta[C]{nonce: nonce, priority: priority, sender: sender} senderIndex, ok := mp.senderIndices[sender] @@ -458,6 +468,15 @@ func (mp *PriorityNonceMempool[C]) Remove(tx sdk.Tx) error { sender := sig.Signer.String() nonce := sig.Sequence + // if it's an unordered tx, we use the gas instead of the nonce + if unordered, ok := tx.(sdk.TxWithUnordered); ok && unordered.GetUnordered() { + gasLimit, err := unordered.GetGasLimit() + nonce = gasLimit + if err != nil { + return err + } + } + scoreKey := txMeta[C]{nonce: nonce, sender: sender} score, ok := mp.scores[scoreKey] if !ok { diff --git a/types/mempool/sender_nonce.go b/types/mempool/sender_nonce.go index 09b0afab69ae..928dafc8d08c 100644 --- a/types/mempool/sender_nonce.go +++ b/types/mempool/sender_nonce.go @@ -145,6 +145,15 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error { snm.senders[sender] = senderTxs } + // if it's an unordered tx, we use the gas instead of the nonce + if unordered, ok := tx.(sdk.TxWithUnordered); ok && unordered.GetUnordered() { + gasLimit, err := unordered.GetGasLimit() + nonce = gasLimit + if err != nil { + return err + } + } + senderTxs.Set(nonce, tx) key := txKey{nonce: nonce, address: sender} @@ -227,6 +236,15 @@ func (snm *SenderNonceMempool) Remove(tx sdk.Tx) error { sender := sdk.AccAddress(sig.PubKey.Address()).String() nonce := sig.Sequence + // if it's an unordered tx, we use the gas instead of the nonce + if unordered, ok := tx.(sdk.TxWithUnordered); ok && unordered.GetUnordered() { + gasLimit, err := unordered.GetGasLimit() + nonce = gasLimit + if err != nil { + return err + } + } + senderTxs, found := snm.senders[sender] if !found { return ErrTxNotFound diff --git a/x/auth/ante/unorderedtx/manager.go b/x/auth/ante/unorderedtx/manager.go index 8b5a91ed2a01..2d103b709ec8 100644 --- a/x/auth/ante/unorderedtx/manager.go +++ b/x/auth/ante/unorderedtx/manager.go @@ -57,7 +57,9 @@ type Manager struct { func NewManager(dataDir string) *Manager { path := filepath.Join(dataDir, dirName) if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { - _ = os.Mkdir(path, os.ModePerm) + if err = os.MkdirAll(path, os.ModePerm); err != nil { + panic(fmt.Errorf("failed to create unordered txs directory: %w", err)) + } } m := &Manager{ @@ -157,18 +159,14 @@ func (m *Manager) OnNewBlock(blockTime time.Time) { m.blockCh <- blockTime } -func (m *Manager) exportSnapshot(height uint64, snapshotWriter func([]byte) error) error { +func (m *Manager) exportSnapshot(_ uint64, snapshotWriter func([]byte) error) error { var buf bytes.Buffer w := bufio.NewWriter(&buf) keys := slices.SortedFunc(maps.Keys(m.txHashes), func(i, j TxHash) int { return bytes.Compare(i[:], j[:]) }) - timestamp := time.Unix(int64(height), 0) for _, txHash := range keys { timeoutTime := m.txHashes[txHash] - if timestamp.After(timeoutTime) { - // skip expired txs that have yet to be purged - continue - } + // right now we dont have access block time at this flow, so we would just include the expired txs // and let it be purge during purge loop chunk := unorderedTxToBytes(txHash, uint64(timeoutTime.Unix())) @@ -185,8 +183,8 @@ func (m *Manager) exportSnapshot(height uint64, snapshotWriter func([]byte) erro return snapshotWriter(buf.Bytes()) } -// flushToFile writes all unexpired unordered transactions along with their TTL -// to file, overwriting the existing file if it exists. +// flushToFile writes all unordered transactions (including expired if not pruned yet) +// along with their TTL to file, overwriting the existing file if it exists. func (m *Manager) flushToFile() error { f, err := os.Create(filepath.Join(m.dataDir, dirName, fileName)) if err != nil { @@ -251,6 +249,8 @@ func (m *Manager) purgeLoop() { } } +// batchReceive receives block time from the channel until the context is done +// or the channel is closed. func (m *Manager) batchReceive() (time.Time, bool) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/x/auth/ante/unorderedtx/snapshotter.go b/x/auth/ante/unorderedtx/snapshotter.go index 39142e956bb0..4c855c9e3167 100644 --- a/x/auth/ante/unorderedtx/snapshotter.go +++ b/x/auth/ante/unorderedtx/snapshotter.go @@ -81,8 +81,9 @@ func (s *Snapshotter) restore(height uint64, payloadReader snapshot.ExtensionPay timestamp := binary.BigEndian.Uint64(payload[i+txHashSize : i+chunkSize]) - // purge any expired txs - if timestamp != 0 && timestamp > height { + // add all txs, we don't care at this point if they are expired, + // we'll let the purge loop handle that + if timestamp != 0 { s.m.Add(txHash, time.Unix(int64(timestamp), 0)) } diff --git a/x/auth/ante/unorderedtx/snapshotter_test.go b/x/auth/ante/unorderedtx/snapshotter_test.go index a2e66bfd869f..fc5ce003d896 100644 --- a/x/auth/ante/unorderedtx/snapshotter_test.go +++ b/x/auth/ante/unorderedtx/snapshotter_test.go @@ -39,12 +39,20 @@ func TestSnapshotter(t *testing.T) { err = s.RestoreExtension(50, 2, pr) require.Error(t, err) - // restore with timestamp > timeout time which should result in no unordered txs synced + // restore with timestamp > timeout time which should result in all unordered txs synced, + // even the ones that have timed out. txm2 := unorderedtx.NewManager(dataDir) s2 := unorderedtx.NewSnapshotter(txm2) - err = s2.RestoreExtension(uint64(currentTime.Add(time.Second*200).Unix()), unorderedtx.SnapshotFormat, pr) + err = s2.RestoreExtension(1, unorderedtx.SnapshotFormat, pr) require.NoError(t, err) - require.Empty(t, txm2.Size()) + require.Equal(t, 100, txm2.Size()) + + // start the manager and wait a bit for the background purge loop to run + txm2.Start() + time.Sleep(time.Millisecond * 5) + txm2.OnNewBlock(currentTime.Add(time.Second * 200)) + time.Sleep(time.Second * 5) // the loop runs every 5 seconds, so we need to wait for that + require.Equal(t, 0, txm2.Size()) // restore with timestamp < timeout time which should result in all unordered txs synced txm3 := unorderedtx.NewManager(dataDir)