Skip to content

Commit

Permalink
feat: impl checkTxAsyncReactor() (#168)
Browse files Browse the repository at this point in the history
* feat: impl `checkTxAsyncReactor()`

* chore: revise comment

* chore: divide `cb` into `prepareCb` and `checkTxCb`

* chore: revise `mock/mempool.CheckTxAsync()` to do nothing

* fix: check whether `prepareCb` is `nil`
  • Loading branch information
jinsan-line authored Jan 25, 2021
1 parent 65d890b commit bc7ff22
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 32 deletions.
13 changes: 7 additions & 6 deletions consensus/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,23 +155,24 @@ func TestMempoolRmBadTx(t *testing.T) {
resEndRecheckTx := app.EndRecheckTx(abci.RequestEndRecheckTx{})
assert.Equal(t, code.CodeTypeOK, resEndRecheckTx.Code)

emptyMempoolCh := make(chan struct{})
checkTxErrorCh := make(chan error)
checkTxRespCh := make(chan struct{})
emptyMempoolCh := make(chan struct{})
go func() {
// Try to send the tx through the mempool.
// CheckTx should not err, but the app should return a bad abci code
// and the tx should get removed from the pool
err := assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(r *abci.Response) {
assertMempool(cs.txNotifier).CheckTxAsync(txBytes, mempl.TxInfo{}, func(err error) {
checkTxErrorCh <- err
}, func(r *abci.Response) {
if r.GetCheckTx().Code != code.CodeTypeBadNonce {
t.Errorf("expected checktx to return bad nonce, got %v", r)
return
}
checkTxRespCh <- struct{}{}
})
if err != nil {
t.Errorf("error after CheckTx: %v", err)
return
}

<-checkTxErrorCh

// check for the tx
for {
Expand Down
4 changes: 2 additions & 2 deletions mempool/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func BenchmarkReapWithCheckTxAsync(b *testing.B) {
for i := 0; i < size; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTxAsync(tx, TxInfo{}, nil)
mempool.CheckTxAsync(tx, TxInfo{}, nil, nil)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -66,7 +66,7 @@ func BenchmarkCheckTxAsync(b *testing.B) {
for i := 0; i < b.N; i++ {
tx := make([]byte, 8)
binary.BigEndian.PutUint64(tx, uint64(i))
mempool.CheckTxAsync(tx, TxInfo{}, nil)
mempool.CheckTxAsync(tx, TxInfo{}, nil, nil)
}
}

Expand Down
51 changes: 36 additions & 15 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ type CListMempool struct {
updateMtx sync.RWMutex
preCheck PreCheckFunc

chReqCheckTx chan *requestCheckTxAsync

wal *auto.AutoFile // a log of mempool txs
txs *clist.CList // concurrent linked-list of good txs
proxyAppConn proxy.AppConnMempool
Expand All @@ -64,6 +66,13 @@ type CListMempool struct {
metrics *Metrics
}

type requestCheckTxAsync struct {
tx types.Tx
txInfo TxInfo
prepareCb func(error)
checkTxCb func(*abci.Response)
}

var _ Mempool = &CListMempool{}

// CListMempoolOption sets an optional parameter on the mempool.
Expand All @@ -81,6 +90,7 @@ func NewCListMempool(
proxyAppConn: proxyAppConn,
txs: clist.New(),
height: height,
chReqCheckTx: make(chan *requestCheckTxAsync, config.Size),
logger: log.NewNopLogger(),
metrics: NopMetrics(),
}
Expand All @@ -93,6 +103,7 @@ func NewCListMempool(
for _, option := range options {
option(mempool)
}
go mempool.checkTxAsyncReactor()
return mempool
}

Expand Down Expand Up @@ -225,39 +236,49 @@ func (mem *CListMempool) CheckTxSync(tx types.Tx, txInfo TxInfo) (res *abci.Resp
return res, err
}

// It blocks if we're waiting on Update() or Reap().
// cb: A callback from the CheckTx command.
// It gets called from another goroutine.
// CONTRACT: Either cb will get called, or err returned.
//
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, cb func(*abci.Response)) (err error) {
func (mem *CListMempool) CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*abci.Response)) {
mem.chReqCheckTx <- &requestCheckTxAsync{tx: tx, txInfo: txInfo, prepareCb: prepareCb, checkTxCb: checkTxCb}
}

func (mem *CListMempool) checkTxAsyncReactor() {
for req := range mem.chReqCheckTx {
mem.checkTxAsync(req.tx, req.txInfo, req.prepareCb, req.checkTxCb)
}
}

// It blocks if we're waiting on Update() or Reap().
func (mem *CListMempool) checkTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*abci.Response)) {
mem.updateMtx.RLock()
// use defer to unlock mutex because application (*local client*) might panic
defer func() {
if err != nil {
mem.updateMtx.RUnlock()
return
}

if r := recover(); r != nil {
mem.updateMtx.RUnlock()
panic(r)
}
}()

if err = mem.prepareCheckTx(tx, txInfo); err != nil {
return err
err := mem.prepareCheckTx(tx, txInfo)
if prepareCb != nil {
prepareCb(err)
}
if err != nil {
mem.updateMtx.RUnlock()
return
}

// CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas)
reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
reqRes.SetCallback(func(res *abci.Response) {
mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, cb)
mem.updateMtx.RUnlock()
mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, res, func(response *abci.Response) {
if checkTxCb != nil {
checkTxCb(response)
}
mem.updateMtx.RUnlock()
})
})

return err
}

// CONTRACT: `caller` should held `mem.updateMtx.RLock()`
Expand Down
2 changes: 1 addition & 1 deletion mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Mempool interface {
// CheckTx executes a new transaction against the application to determine
// its validity and whether it should be added to the mempool.
CheckTxSync(tx types.Tx, txInfo TxInfo) (*abci.Response, error)
CheckTxAsync(tx types.Tx, txInfo TxInfo, callback func(*abci.Response)) error
CheckTxAsync(tx types.Tx, txInfo TxInfo, prepareCb func(error), checkTxCb func(*abci.Response))

// ReapMaxBytesMaxGas reaps transactions from the mempool up to maxBytes
// bytes total with the condition that the total gasWanted must be less than
Expand Down
9 changes: 5 additions & 4 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
if src != nil {
txInfo.SenderP2PID = src.ID()
}
err := memR.mempool.CheckTxAsync(msg.Tx, txInfo, nil)
if err != nil {
memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err)
}
memR.mempool.CheckTxAsync(msg.Tx, txInfo, func(err error) {
if err != nil {
memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err)
}
}, nil)
// broadcasting happens from go routines per peer
default:
memR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
Expand Down
3 changes: 1 addition & 2 deletions mock/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ func (Mempool) Size() int { return 0 }
func (Mempool) CheckTxSync(_ types.Tx, _ mempl.TxInfo) (*abci.Response, error) {
return nil, nil
}
func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(*abci.Response)) error {
return nil
func (Mempool) CheckTxAsync(_ types.Tx, _ mempl.TxInfo, _ func(error), _ func(*abci.Response)) {
}
func (Mempool) ReapMaxBytesMaxGas(_, _ int64) types.Txs { return types.Txs{} }
func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} }
Expand Down
8 changes: 6 additions & 2 deletions rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (
// CheckTx nor DeliverTx results.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async
func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, nil)

chErr := make(chan error)
env.Mempool.CheckTxAsync(tx, mempl.TxInfo{}, func(err error) {
chErr <- err
}, nil)
err := <-chErr
if err != nil {
return nil, err
}

return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil
}

Expand Down

0 comments on commit bc7ff22

Please sign in to comment.