diff --git a/baseapp/abci.go b/baseapp/abci.go index 254350274c..2feabf2104 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -161,15 +161,20 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc // internal CheckTx state if the AnteHandler passes. Otherwise, the ResponseCheckTx // will contain releveant error information. Regardless of tx execution outcome, // the ResponseCheckTx will contain relevant gas execution context. -func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { - tx, err := app.txDecoder(req.Tx) +func (app *BaseApp) CheckTxSync(req abci.RequestCheckTx) abci.ResponseCheckTx { + if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck { + panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type)) + } + + tx, err := app.preCheckTx(req.Tx) if err != nil { return sdkerrors.ResponseCheckTx(err, 0, 0, app.trace) } - if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck { - panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type)) - } + waits, signals := app.checkAccountWGs.Register(tx) + + app.checkAccountWGs.Wait(waits) + defer app.checkAccountWGs.Done(signals) gInfo, err := app.checkTx(req.Tx, tx, req.Type == abci.CheckTxType_Recheck) if err != nil { @@ -182,6 +187,22 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { } } +func (app *BaseApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxCallback) { + if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck { + panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type)) + } + + reqCheckTx := &RequestCheckTxAsync{ + txBytes: req.Tx, + recheck: req.Type == abci.CheckTxType_Recheck, + callback: callback, + prepare: waitGroup1(), + } + app.chCheckTx <- reqCheckTx + + go app.prepareCheckTx(reqCheckTx) +} + // BeginRecheckTx implements the ABCI interface and set the check state based on the given header func (app *BaseApp) BeginRecheckTx(req abci.RequestBeginRecheckTx) abci.ResponseBeginRecheckTx { // NOTE: This is safe because Tendermint holds a lock on the mempool for Rechecking. diff --git a/baseapp/accountlock.go b/baseapp/accountlock.go deleted file mode 100644 index b2ec65b87e..0000000000 --- a/baseapp/accountlock.go +++ /dev/null @@ -1,88 +0,0 @@ -package baseapp - -import ( - "encoding/binary" - "sort" - "sync" - - sdk "github.com/cosmos/cosmos-sdk/types" -) - -// NOTE should 1 <= sampleBytes <= 4. If modify it, you should revise `getAddressKey()` as well -const sampleBytes = 2 - -type AccountLock struct { - accMtx [1 << (sampleBytes * 8)]sync.Mutex -} - -func (al *AccountLock) Lock(ctx sdk.Context, tx sdk.Tx) []uint32 { - if !ctx.IsCheckTx() || ctx.IsReCheckTx() { - return nil - } - - signers := getSigners(tx) - accKeys := getUniqSortedAddressKey(signers) - - for _, key := range accKeys { - al.accMtx[key].Lock() - } - - return accKeys -} - -func (al *AccountLock) Unlock(accKeys []uint32) { - // NOTE reverse order - for i, length := 0, len(accKeys); i < length; i++ { - key := accKeys[length-1-i] - al.accMtx[key].Unlock() - } -} - -func getSigners(tx sdk.Tx) []sdk.AccAddress { - seen := map[string]bool{} - var signers []sdk.AccAddress - for _, msg := range tx.GetMsgs() { - for _, addr := range msg.GetSigners() { - if !seen[addr.String()] { - signers = append(signers, addr) - seen[addr.String()] = true - } - } - } - return signers -} - -func getUniqSortedAddressKey(addrs []sdk.AccAddress) []uint32 { - accKeys := make([]uint32, 0, len(addrs)) - for _, addr := range addrs { - accKeys = append(accKeys, getAddressKey(addr)) - } - - accKeys = uniq(accKeys) - sort.Sort(uint32Slice(accKeys)) - - return accKeys -} - -func getAddressKey(addr sdk.AccAddress) uint32 { - return uint32(binary.BigEndian.Uint16(addr)) -} - -func uniq(u []uint32) []uint32 { - seen := map[uint32]bool{} - var ret []uint32 - for _, v := range u { - if !seen[v] { - ret = append(ret, v) - seen[v] = true - } - } - return ret -} - -// Uint32Slice attaches the methods of Interface to []uint32, sorting in increasing order. -type uint32Slice []uint32 - -func (p uint32Slice) Len() int { return len(p) } -func (p uint32Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p uint32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/baseapp/accountwgs.go b/baseapp/accountwgs.go new file mode 100644 index 0000000000..e9d3f74777 --- /dev/null +++ b/baseapp/accountwgs.go @@ -0,0 +1,85 @@ +package baseapp + +import ( + "sync" + + sdk "github.com/cosmos/cosmos-sdk/types" +) + +type AccountWGs struct { + mtx sync.Mutex + wgs map[string]*sync.WaitGroup +} + +func NewAccountWGs() *AccountWGs { + return &AccountWGs{ + wgs: make(map[string]*sync.WaitGroup), + } +} + +func (aw *AccountWGs) Register(tx sdk.Tx) (waits []*sync.WaitGroup, signals []*AccountWG) { + signers := getUniqSigners(tx) + + aw.mtx.Lock() + defer aw.mtx.Unlock() + for _, signer := range signers { + if wg := aw.wgs[signer]; wg != nil { + waits = append(waits, wg) + } + sig := waitGroup1() + aw.wgs[signer] = sig + signals = append(signals, NewAccountWG(signer, sig)) + } + + return waits, signals +} + +func (aw *AccountWGs) Wait(waits []*sync.WaitGroup) { + for _, wait := range waits { + wait.Wait() + } +} + +func (aw *AccountWGs) Done(signals []*AccountWG) { + aw.mtx.Lock() + defer aw.mtx.Unlock() + + for _, signal := range signals { + signal.wg.Done() + if aw.wgs[signal.acc] == signal.wg { + delete(aw.wgs, signal.acc) + } + } +} + +func getUniqSigners(tx sdk.Tx) []string { + seen := map[string]bool{} + var signers []string + for _, msg := range tx.GetMsgs() { + for _, addr := range msg.GetSigners() { + if !seen[addr.String()] { + signers = append(signers, string(addr)) + seen[addr.String()] = true + } + } + } + return signers +} + +type AccountWG struct { + acc string + wg *sync.WaitGroup +} + +func NewAccountWG(acc string, wg *sync.WaitGroup) *AccountWG { + return &AccountWG{ + acc: acc, + wg: wg, + } +} + +func waitGroup1() (wg *sync.WaitGroup) { + wg = &sync.WaitGroup{} + wg.Add(1) + return wg +} diff --git a/baseapp/accountlock_test.go b/baseapp/accountwgs_test.go similarity index 57% rename from baseapp/accountlock_test.go rename to baseapp/accountwgs_test.go index cb23944bef..7eecc39ee8 100644 --- a/baseapp/accountlock_test.go +++ b/baseapp/accountwgs_test.go @@ -1,76 +1,67 @@ package baseapp import ( - "reflect" - "sort" - "sync" "testing" "github.com/stretchr/testify/require" - abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto/secp256k1" sdk "github.com/cosmos/cosmos-sdk/types" ) -func TestAccountLock(t *testing.T) { +func TestConvertByteSliceToString(t *testing.T) { + b := []byte{65, 66, 67, 0, 65, 66, 67} + s := string(b) + require.Equal(t, len(b), len(s)) + require.Equal(t, uint8(0), s[3]) +} + +func TestRegister(t *testing.T) { app := setupBaseApp(t) - ctx := app.NewContext(true, abci.Header{}) privs := newTestPrivKeys(3) tx := newTestTx(privs) - accKeys := app.accountLock.Lock(ctx, tx) - - for _, accKey := range accKeys { - require.True(t, isMutexLock(&app.accountLock.accMtx[accKey])) - } + waits, signals := app.checkAccountWGs.Register(tx) - app.accountLock.Unlock(accKeys) + require.Equal(t, 0, len(waits)) + require.Equal(t, 3, len(signals)) - for _, accKey := range accKeys { - require.False(t, isMutexLock(&app.accountLock.accMtx[accKey])) + for _, signal := range signals { + require.Equal(t, app.checkAccountWGs.wgs[signal.acc], signal.wg) } } -func TestUnlockDoNothingWithNil(t *testing.T) { +func TestDontPanicWithNil(t *testing.T) { app := setupBaseApp(t) - require.NotPanics(t, func() { app.accountLock.Unlock(nil) }) -} - -func TestGetSigner(t *testing.T) { - privs := newTestPrivKeys(3) - tx := newTestTx(privs) - signers := getSigners(tx) - require.Equal(t, getAddrs(privs), signers) + require.NotPanics(t, func() { app.checkAccountWGs.Wait(nil) }) + require.NotPanics(t, func() { app.checkAccountWGs.Done(nil) }) } -func TestGetUniqSortedAddressKey(t *testing.T) { +func TestGetUniqSigners(t *testing.T) { privs := newTestPrivKeys(3) addrs := getAddrs(privs) addrs = append(addrs, addrs[1], addrs[0]) require.Equal(t, 5, len(addrs)) - accKeys := getUniqSortedAddressKey(addrs) + tx := newTestTx(privs) + signers := getUniqSigners(tx) // length should be reduced because `duplicated` is removed - require.Less(t, len(accKeys), len(addrs)) + require.Less(t, len(signers), len(addrs)) // check uniqueness - for i, iv := range accKeys { - for j, jv := range accKeys { + for i, iv := range signers { + for j, jv := range signers { if i != j { require.True(t, iv != jv) } } } - - // should be sorted - require.True(t, sort.IsSorted(uint32Slice(accKeys))) } type AccountLockTestTx struct { @@ -111,9 +102,3 @@ func newTestTx(privs []crypto.PrivKey) sdk.Tx { } return AccountLockTestTx{Msgs: msgs} } - -// Hack (too slow) -func isMutexLock(mtx *sync.Mutex) bool { - state := reflect.ValueOf(mtx).Elem().FieldByName("state") - return state.Int() == 1 -} diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 71a7052963..e33d3848f9 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -76,7 +76,9 @@ type BaseApp struct { // nolint: maligned deliverState *state // for DeliverTx checkStateMtx sync.RWMutex - accountLock AccountLock + + checkAccountWGs *AccountWGs + chCheckTx chan *RequestCheckTxAsync // an inter-block write-through cache provided to the context during deliverState interBlockCache sdk.MultiStorePersistentCache @@ -118,17 +120,19 @@ func NewBaseApp( ) *BaseApp { app := &BaseApp{ - logger: logger, - name: name, - db: db, - cms: store.NewCommitMultiStore(db), - storeLoader: DefaultStoreLoader, - router: NewRouter(), - queryRouter: NewQueryRouter(), - txDecoder: txDecoder, - fauxMerkleMode: false, - trace: false, - metrics: NopMetrics(), + logger: logger, + name: name, + db: db, + cms: store.NewCommitMultiStore(db), + storeLoader: DefaultStoreLoader, + router: NewRouter(), + queryRouter: NewQueryRouter(), + txDecoder: txDecoder, + fauxMerkleMode: false, + checkAccountWGs: NewAccountWGs(), + chCheckTx: make(chan *RequestCheckTxAsync, 10000), // TODO config channel buffer size. It might be good to set it tendermint mempool.size + trace: false, + metrics: NopMetrics(), } for _, option := range options { option(app) @@ -138,6 +142,8 @@ func NewBaseApp( app.cms.SetInterBlockCache(app.interBlockCache) } + app.startReactors() + return app } @@ -517,25 +523,36 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context return ctx.WithMultiStore(msCache), msCache } +// stateless checkTx +func (app *BaseApp) preCheckTx(txBytes []byte) (tx sdk.Tx, err error) { + defer func() { + if r := recover(); r != nil { + err = app.recoverToError(r) + } + }() + + tx, err = app.txDecoder(txBytes) + if err != nil { + return tx, err + } + + msgs := tx.GetMsgs() + err = validateBasicTxMsgs(msgs) + + return tx, err +} + func (app *BaseApp) checkTx(txBytes []byte, tx sdk.Tx, recheck bool) (gInfo sdk.GasInfo, err error) { ctx := app.getCheckContextForTx(txBytes, recheck) gasCtx := &ctx defer func() { if r := recover(); r != nil { - err = app.recoverToError(r, ctx.GasMeter()) + err = app.recoverToErrorWithGas(r, ctx.GasMeter()) } gInfo = sdk.GasInfo{GasWanted: gasCtx.GasMeter().Limit(), GasUsed: gasCtx.GasMeter().GasConsumed()} }() - msgs := tx.GetMsgs() - if err = validateBasicTxMsgs(msgs); err != nil { - return gInfo, err - } - - accKeys := app.accountLock.Lock(ctx, tx) - defer app.accountLock.Unlock(accKeys) - var anteCtx sdk.Context anteCtx, err = app.anteTx(ctx, txBytes, tx, false) if !anteCtx.IsZero() { @@ -593,7 +610,7 @@ func (app *BaseApp) runTx(txBytes []byte, tx sdk.Tx, simulate bool) (gInfo sdk.G defer func() { if r := recover(); r != nil { - err = app.recoverToError(r, ctx.GasMeter()) + err = app.recoverToErrorWithGas(r, ctx.GasMeter()) result = nil } @@ -699,7 +716,7 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg) (*sdk.Result, error }, nil } -func (app *BaseApp) recoverToError(r interface{}, gasMeter sdk.GasMeter) error { +func (app *BaseApp) recoverToErrorWithGas(r interface{}, gasMeter sdk.GasMeter) error { switch rType := r.(type) { // TODO: Use ErrOutOfGas instead of ErrorOutOfGas which would allow us // to keep the stracktrace. @@ -712,11 +729,15 @@ func (app *BaseApp) recoverToError(r interface{}, gasMeter sdk.GasMeter) error { ) default: - app.logger.Error("recoverToError", "r", r, "stack", string(debug.Stack())) - return sdkerrors.Wrap( - sdkerrors.ErrPanic, fmt.Sprintf( - "recovered: %v\nstack:\n%v", r, string(debug.Stack()), - ), - ) + return app.recoverToError(r) } } + +func (app *BaseApp) recoverToError(r interface{}) error { + app.logger.Error("recoverToError", "r", r, "stack", string(debug.Stack())) + return sdkerrors.Wrap( + sdkerrors.ErrPanic, fmt.Sprintf( + "recovered: %v\nstack:\n%v", r, string(debug.Stack()), + ), + ) +} diff --git a/baseapp/baseapp_test.go b/baseapp/baseapp_test.go index 3551898457..e653ad150b 100644 --- a/baseapp/baseapp_test.go +++ b/baseapp/baseapp_test.go @@ -771,7 +771,7 @@ func TestCheckTx(t *testing.T) { tx := newTxCounter(i, 0) txBytes, err := codec.MarshalBinaryLengthPrefixed(tx) require.NoError(t, err) - r := app.CheckTx(abci.RequestCheckTx{Tx: txBytes}) + r := app.CheckTxSync(abci.RequestCheckTx{Tx: txBytes}) assert.True(t, r.IsOK(), fmt.Sprintf("%v", r)) } diff --git a/baseapp/reactor.go b/baseapp/reactor.go new file mode 100644 index 0000000000..5e54661438 --- /dev/null +++ b/baseapp/reactor.go @@ -0,0 +1,59 @@ +package baseapp + +import ( + "sync" + + abci "github.com/tendermint/tendermint/abci/types" + + sdk "github.com/cosmos/cosmos-sdk/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" +) + +func (app *BaseApp) startReactors() { + go app.checkTxAsyncReactor() +} + +type RequestCheckTxAsync struct { + txBytes []byte + recheck bool + callback abci.CheckTxCallback + prepare *sync.WaitGroup + tx sdk.Tx + err error +} + +func (app *BaseApp) checkTxAsyncReactor() { + for req := range app.chCheckTx { + req.prepare.Wait() + if req.err != nil { + req.callback(sdkerrors.ResponseCheckTx(req.err, 0, 0, app.trace)) + continue + } + + waits, signals := app.checkAccountWGs.Register(req.tx) + + go app.checkTxAsync(req, waits, signals) + } +} + +func (app *BaseApp) prepareCheckTx(req *RequestCheckTxAsync) { + defer req.prepare.Done() + req.tx, req.err = app.preCheckTx(req.txBytes) +} + +func (app *BaseApp) checkTxAsync(req *RequestCheckTxAsync, waits []*sync.WaitGroup, signals []*AccountWG) { + app.checkAccountWGs.Wait(waits) + defer app.checkAccountWGs.Done(signals) + + gInfo, err := app.checkTx(req.txBytes, req.tx, req.recheck) + + if err != nil { + req.callback(sdkerrors.ResponseCheckTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)) + return + } + + req.callback(abci.ResponseCheckTx{ + GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints? + GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints? + }) +} diff --git a/go.mod b/go.mod index 6d7f013700..df8ef25491 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,6 @@ require ( replace ( github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 - github.com/tendermint/iavl => github.com/line/iavl v0.14.4-0.20201217063301-6b67687bfae9 - github.com/tendermint/tendermint => github.com/line/tendermint v0.33.10-0.20210113083242-8316c03342d5 + github.com/tendermint/iavl => github.com/line/iavl v0.14.4-0.20210122121727-7c33089230bd + github.com/tendermint/tendermint => github.com/line/tendermint v0.33.10-0.20210125064725-d1c50067c78a ) diff --git a/go.sum b/go.sum index 4ff962515c..68306fe99e 100644 --- a/go.sum +++ b/go.sum @@ -263,10 +263,10 @@ github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOS github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= -github.com/line/iavl v0.14.4-0.20201217063301-6b67687bfae9 h1:n6YHVdTld8D0dAogBUcXTaqhk1ZMTAR9Phy4xdMVWDY= -github.com/line/iavl v0.14.4-0.20201217063301-6b67687bfae9/go.mod h1:eG6hI8RbMxL1nR+nJBykXD//gKjUpKCAT2tvi9V93sA= -github.com/line/tendermint v0.33.10-0.20210113083242-8316c03342d5 h1:Fo4wCTrWWAmjqebHzOQ9IGSltnrV5Ea/OfCN2My2PM4= -github.com/line/tendermint v0.33.10-0.20210113083242-8316c03342d5/go.mod h1:0yUs9eIuuDq07nQql9BmI30FtYGcEC60Tu5JzB5IezM= +github.com/line/iavl v0.14.4-0.20210122121727-7c33089230bd h1:IdJCRyraxeVrsrOTopF47yzZtRrWyCeAZh9iUiIHwqw= +github.com/line/iavl v0.14.4-0.20210122121727-7c33089230bd/go.mod h1:eG6hI8RbMxL1nR+nJBykXD//gKjUpKCAT2tvi9V93sA= +github.com/line/tendermint v0.33.10-0.20210125064725-d1c50067c78a h1:X8RHWLo+gqS7Dx9wenEl7oO1Zcy5W+RWhVyrhI7iL9M= +github.com/line/tendermint v0.33.10-0.20210125064725-d1c50067c78a/go.mod h1:0yUs9eIuuDq07nQql9BmI30FtYGcEC60Tu5JzB5IezM= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=