Skip to content

Commit

Permalink
Support pending transaction in mempool (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen authored and stevenlanders committed Jan 30, 2024
1 parent cefd40f commit 434a593
Show file tree
Hide file tree
Showing 18 changed files with 174 additions and 51 deletions.
5 changes: 3 additions & 2 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ func (cli *grpcClient) Info(ctx context.Context, params *types.RequestInfo) (*ty
return cli.client.Info(ctx, types.ToRequestInfo(params).GetInfo(), grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true))
func (cli *grpcClient) CheckTx(ctx context.Context, params *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
resCheckTx, err := cli.client.CheckTx(ctx, types.ToRequestCheckTx(params).GetCheckTx(), grpc.WaitForReady(true))
return &types.ResponseCheckTxV2{ResponseCheckTx: resCheckTx}, err
}

func (cli *grpcClient) Query(ctx context.Context, params *types.RequestQuery) (*types.ResponseQuery, error) {
Expand Down
8 changes: 4 additions & 4 deletions abci/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,12 @@ func (cli *socketClient) Info(ctx context.Context, req *types.RequestInfo) (*typ
return res.GetInfo(), nil
}

func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
func (cli *socketClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
res, err := cli.doRequest(ctx, types.ToRequestCheckTx(req))
if err != nil {
return nil, err
}
return res.GetCheckTx(), nil
return &types.ResponseCheckTxV2{ResponseCheckTx: res.GetCheckTx()}, nil
}

func (cli *socketClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
Expand Down
4 changes: 2 additions & 2 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ func (app *Application) FinalizeBlock(_ context.Context, req *types.RequestFinal
return &types.ResponseFinalizeBlock{TxResults: respTxs, ValidatorUpdates: app.ValUpdates, AppHash: appHash}, nil
}

func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}, nil
func (*Application) CheckTx(_ context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTxV2, error) {
return &types.ResponseCheckTxV2{ResponseCheckTx: &types.ResponseCheckTx{Code: code.CodeTypeOK, GasWanted: 1}}, nil
}

func (app *Application) Commit(_ context.Context) (*types.ResponseCommit, error) {
Expand Down
8 changes: 8 additions & 0 deletions abci/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,11 @@ func (app *gRPCApplication) Flush(_ context.Context, req *types.RequestFlush) (*
func (app *gRPCApplication) Commit(ctx context.Context, req *types.RequestCommit) (*types.ResponseCommit, error) {
return app.Application.Commit(ctx)
}

func (app *gRPCApplication) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
resV2, err := app.Application.CheckTx(ctx, req)
if err != nil {
return &types.ResponseCheckTx{}, err
}
return resV2.ResponseCheckTx, nil
}
6 changes: 3 additions & 3 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Application interface {
Query(context.Context, *RequestQuery) (*ResponseQuery, error) // Query for state

// Mempool Connection
CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTx, error) // Validate a tx for the mempool
CheckTx(context.Context, *RequestCheckTx) (*ResponseCheckTxV2, error) // Validate a tx for the mempool

// Consensus Connection
InitChain(context.Context, *RequestInitChain) (*ResponseInitChain, error) // Initialize blockchain w validators/other info from TendermintCore
Expand Down Expand Up @@ -51,8 +51,8 @@ func (BaseApplication) Info(_ context.Context, req *RequestInfo) (*ResponseInfo,
return &ResponseInfo{}, nil
}

func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) {
return &ResponseCheckTx{Code: CodeTypeOK}, nil
func (BaseApplication) CheckTx(_ context.Context, req *RequestCheckTx) (*ResponseCheckTxV2, error) {
return &ResponseCheckTxV2{ResponseCheckTx: &ResponseCheckTx{Code: CodeTypeOK}}, nil
}

func (BaseApplication) Commit(_ context.Context) (*ResponseCommit, error) {
Expand Down
4 changes: 2 additions & 2 deletions abci/types/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ func ToResponseInfo(res *ResponseInfo) *Response {
}
}

func ToResponseCheckTx(res *ResponseCheckTx) *Response {
func ToResponseCheckTx(res *ResponseCheckTxV2) *Response {
return &Response{
Value: &Response_CheckTx{res},
Value: &Response_CheckTx{res.ResponseCheckTx},
}
}

Expand Down
8 changes: 4 additions & 4 deletions abci/types/mocks/application.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,21 @@ func MarshalTxResults(r []*ExecTxResult) ([][]byte, error) {
}
return s, nil
}

type PendingTxCheckerResponse int

const (
Accepted PendingTxCheckerResponse = iota
Rejected
Pending
)

type PendingTxChecker func() PendingTxCheckerResponse

// V2 response type contains non-protobuf fields, so non-local ABCI clients will not be able
// to utilize the new fields in V2 type (but still be backwards-compatible)
type ResponseCheckTxV2 struct {
*ResponseCheckTx
IsPendingTransaction bool
Checker PendingTxChecker // must not be nil if IsPendingTransaction is true
}
8 changes: 4 additions & 4 deletions internal/consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,18 +308,18 @@ func (app *CounterApplication) FinalizeBlock(_ context.Context, req *abci.Reques
return res, nil
}

func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) {
app.mu.Lock()
defer app.mu.Unlock()

txValue := txAsUint64(req.Tx)
if txValue != uint64(app.mempoolTxCount) {
return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Code: code.CodeTypeBadNonce,
}, nil
}}, nil
}
app.mempoolTxCount++
return &abci.ResponseCheckTx{Code: code.CodeTypeOK}, nil
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{Code: code.CodeTypeOK}}, nil
}

func txAsUint64(tx []byte) uint64 {
Expand Down
45 changes: 37 additions & 8 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ type TxMempool struct {
// index. i.e. older transactions are first.
timestampIndex *WrappedTxList

// pendingTxs stores transactions that are not valid yet but might become valid
// if its checker returns Accepted
pendingTxs *PendingTxs

// A read/write lock is used to safe guard updates, insertions and deletions
// from the mempool. A read-lock is implicitly acquired when executing CheckTx,
// however, a caller must explicitly grab a write-lock via Lock when updating
Expand Down Expand Up @@ -120,6 +124,7 @@ func NewTxMempool(
timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp)
}),
pendingTxs: NewPendingTxs(),
failedCheckTxCounts: map[types.NodeID]uint64{},
peerManager: peerManager,
}
Expand Down Expand Up @@ -286,17 +291,25 @@ func (txmp *TxMempool) CheckTx(
height: txmp.height,
}

// only add new transaction if checkTx passes
if err == nil {
err = txmp.addNewTransaction(wtx, res, txInfo)
// only add new transaction if checkTx passes and is not pending
if !res.IsPendingTransaction {
err = txmp.addNewTransaction(wtx, res.ResponseCheckTx, txInfo)

if err != nil {
return err
if err != nil {
return err
}
} else {
// otherwise add to pending txs store
if res.Checker == nil {
return errors.New("no checker available for pending transaction")
}
txmp.pendingTxs.Insert(wtx, res, txInfo)
}
}

if cb != nil {
cb(res)
cb(res.ResponseCheckTx)
}

return nil
Expand Down Expand Up @@ -470,6 +483,7 @@ func (txmp *TxMempool) Update(
}
}

txmp.handlePendingTransactions()
txmp.purgeExpiredTxs(blockHeight)

// If there any uncommitted transactions left in the mempool, we either
Expand Down Expand Up @@ -633,7 +647,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
//
// This method is NOT executed for the initial CheckTx on a new transaction;
// that case is handled by addNewTransaction instead.
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckTx) {
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckTxV2) {
if txmp.recheckCursor == nil {
return
}
Expand Down Expand Up @@ -676,10 +690,11 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT
if !txmp.txStore.IsTxRemoved(wtx.hash) {
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(tx, res)
err = txmp.postCheck(tx, res.ResponseCheckTx)
}

if res.Code == abci.CodeTypeOK && err == nil {
// we will treat a transaction that turns pending in a recheck as invalid and evict it
if res.Code == abci.CodeTypeOK && err == nil && !res.IsPendingTransaction {
wtx.priority = res.Priority
} else {
txmp.logger.Debug(
Expand Down Expand Up @@ -904,3 +919,17 @@ func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string
jsonData, _ := json.Marshal(logs)
return string(jsonData)
}

func (txmp *TxMempool) handlePendingTransactions() {
accepted, rejected := txmp.pendingTxs.EvaluatePendingTransactions()
for _, tx := range accepted {
if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse.ResponseCheckTx, tx.txInfo); err != nil {
txmp.logger.Error(fmt.Sprintf("error adding pending transaction: %s", err))
}
}
if !txmp.config.KeepInvalidTxsInCache {
for _, tx := range rejected {
txmp.cache.Remove(tx.tx.tx)
}
}
}
16 changes: 8 additions & 8 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type testTx struct {
priority int64
}

func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTxV2, error) {
var (
priority int64
sender string
Expand All @@ -48,29 +48,29 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
if len(parts) == 3 {
v, err := strconv.ParseInt(string(parts[2]), 10, 64)
if err != nil {
return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Code: 100,
GasWanted: 1,
}, nil
}}, nil
}

priority = v
sender = string(parts[0])
} else {
return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Code: 101,
GasWanted: 1,
}, nil
}}, nil
}

return &abci.ResponseCheckTx{
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Sender: sender,
Code: code.CodeTypeOK,
GasWanted: 1,
}, nil
}}, nil
}

func setup(t testing.TB, app abciclient.Client, cacheSize int, options ...TxMempoolOption) *TxMempool {
Expand Down Expand Up @@ -727,4 +727,4 @@ func TestAppendCheckTxErr(t *testing.T) {
require.NoError(t, err)
require.Equal(t, len(data), 1)
require.Equal(t, data[0]["log"], "sample error msg")
}
}
Loading

0 comments on commit 434a593

Please sign in to comment.