Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: introduce mutex for state and lastCommitInfo to avoid race conditions #22692

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i

### Bug Fixes


* (baseapp) [#22692](https://github.com/cosmos/cosmos-sdk/pull/22692) Add mutex locks for `state` and make `lastCommitInfo` atomic to prevent race conditions between `Commit` and `CreateQueryContext`.
* (query) [23002](https://github.com/cosmos/cosmos-sdk/pull/23002) Fix collection filtered pagination.


### API Breaking Changes

* (x/params) [#22995](https://github.com/cosmos/cosmos-sdk/pull/22995) Remove `x/params`. Migrate to the new params system introduced in `v0.47` as demonstrated [here](https://github.com/cosmos/cosmos-sdk/blob/main/UPGRADING.md#xparams).
Expand Down
93 changes: 50 additions & 43 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon
// initialize states with a correct header
app.setState(execModeFinalize, initHeader)
app.setState(execModeCheck, initHeader)
finalizeState := app.getState(execModeFinalize)

// Store the consensus params in the BaseApp's param store. Note, this must be
// done after the finalizeBlockState and context have been set as it's persisted
// to state.
if req.ConsensusParams != nil {
err := app.StoreConsensusParams(app.finalizeBlockState.Context(), *req.ConsensusParams)
err := app.StoreConsensusParams(finalizeState.Context(), *req.ConsensusParams)
if err != nil {
return nil, err
}
Expand All @@ -86,13 +87,14 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon
// handler, the block height is zero by default. However, after Commit is called
// the height needs to reflect the true block height.
initHeader.Height = req.InitialHeight
app.checkState.SetContext(app.checkState.Context().WithBlockHeader(initHeader).
checkState := app.getState(execModeCheck)
checkState.SetContext(checkState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Time: req.Time,
}))
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockHeader(initHeader).
finalizeState.SetContext(finalizeState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Expand All @@ -105,9 +107,9 @@ func (app *BaseApp) InitChain(req *abci.InitChainRequest) (*abci.InitChainRespon
}

// add block gas meter for any genesis transactions (allow infinite gas)
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter()))
finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter()))

res, err := app.initChainer(app.finalizeBlockState.Context(), req)
res, err := app.initChainer(finalizeState.Context(), req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -437,7 +439,8 @@ func (app *BaseApp) PrepareProposal(req *abci.PrepareProposalRequest) (resp *abc
return nil, errors.New("PrepareProposal called with invalid height")
}

app.prepareProposalState.SetContext(app.getContextForProposal(app.prepareProposalState.Context(), req.Height).
prepareProposalState := app.getState(execModePrepareProposal)
prepareProposalState.SetContext(app.getContextForProposal(prepareProposalState.Context(), req.Height).
WithVoteInfos(toVoteInfo(req.LocalLastCommit.Votes)). // this is a set of votes that are not finalized yet, wait for commit
WithBlockHeight(req.Height).
WithProposer(req.ProposerAddress).
Expand All @@ -454,9 +457,9 @@ func (app *BaseApp) PrepareProposal(req *abci.PrepareProposalRequest) (resp *abc
Time: req.Time,
}))

app.prepareProposalState.SetContext(app.prepareProposalState.Context().
WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.Context())).
WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.Context())))
prepareProposalState.SetContext(prepareProposalState.Context().
WithConsensusParams(app.GetConsensusParams(prepareProposalState.Context())).
WithBlockGasMeter(app.getBlockGasMeter(prepareProposalState.Context())))

defer func() {
if err := recover(); err != nil {
Expand All @@ -471,7 +474,7 @@ func (app *BaseApp) PrepareProposal(req *abci.PrepareProposalRequest) (resp *abc
}
}()

resp, err = app.prepareProposal(app.prepareProposalState.Context(), req)
resp, err = app.prepareProposal(prepareProposalState.Context(), req)
if err != nil {
app.logger.Error("failed to prepare proposal", "height", req.Height, "time", req.Time, "err", err)
return &abci.PrepareProposalResponse{Txs: req.Txs}, nil
Expand Down Expand Up @@ -529,7 +532,8 @@ func (app *BaseApp) ProcessProposal(req *abci.ProcessProposalRequest) (resp *abc
app.setState(execModeFinalize, header)
}

app.processProposalState.SetContext(app.getContextForProposal(app.processProposalState.Context(), req.Height).
processProposalState := app.getState(execModeProcessProposal)
processProposalState.SetContext(app.getContextForProposal(processProposalState.Context(), req.Height).
WithVoteInfos(req.ProposedLastCommit.Votes). // this is a set of votes that are not finalized yet, wait for commit
WithBlockHeight(req.Height).
WithHeaderHash(req.Hash).
Expand All @@ -548,9 +552,9 @@ func (app *BaseApp) ProcessProposal(req *abci.ProcessProposalRequest) (resp *abc
Time: req.Time,
}))

app.processProposalState.SetContext(app.processProposalState.Context().
WithConsensusParams(app.GetConsensusParams(app.processProposalState.Context())).
WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.Context())))
processProposalState.SetContext(processProposalState.Context().
WithConsensusParams(app.GetConsensusParams(processProposalState.Context())).
WithBlockGasMeter(app.getBlockGasMeter(processProposalState.Context())))

defer func() {
if err := recover(); err != nil {
Expand All @@ -565,7 +569,7 @@ func (app *BaseApp) ProcessProposal(req *abci.ProcessProposalRequest) (resp *abc
}
}()

resp, err = app.processProposal(app.processProposalState.Context(), req)
resp, err = app.processProposal(processProposalState.Context(), req)
if err != nil {
app.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abci.ProcessProposalResponse{Status: abci.PROCESS_PROPOSAL_STATUS_REJECT}, nil
Expand Down Expand Up @@ -605,7 +609,7 @@ func (app *BaseApp) ExtendVote(_ context.Context, req *abci.ExtendVoteRequest) (
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.Context().CacheContext()
ctx, _ = app.getState(execModeFinalize).Context().CacheContext()
} else {
ms := app.cms.CacheMultiStore()
ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height)
Expand Down Expand Up @@ -686,7 +690,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.Context().CacheContext()
ctx, _ = app.getState(execModeFinalize).Context().CacheContext()
} else {
ms := app.cms.CacheMultiStore()
ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height)
Expand Down Expand Up @@ -775,12 +779,14 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
// finalizeBlockState should be set on InitChain or ProcessProposal. If it is
// nil, it means we are replaying this block and we need to set the state here
// given that during block replay ProcessProposal is not executed by CometBFT.
if app.finalizeBlockState == nil {
finalizeState := app.getState(execModeFinalize)
if finalizeState == nil {
app.setState(execModeFinalize, header)
finalizeState = app.getState(execModeFinalize)
beer-1 marked this conversation as resolved.
Show resolved Hide resolved
}

// Context is now updated with Header information.
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().
finalizeState.SetContext(finalizeState.Context().
WithBlockHeader(header).
WithHeaderHash(req.Hash).
WithHeaderInfo(coreheader.Info{
Expand All @@ -790,7 +796,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
Hash: req.Hash,
AppHash: app.LastCommitID().Hash,
}).
WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.Context())).
WithConsensusParams(app.GetConsensusParams(finalizeState.Context())).
WithVoteInfos(req.DecidedLastCommit.Votes).
WithExecMode(sdk.ExecModeFinalize).
WithCometInfo(corecomet.Info{
Expand All @@ -801,11 +807,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
}))

// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
gasMeter := app.getBlockGasMeter(finalizeState.Context())
finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(gasMeter))

if app.checkState != nil {
app.checkState.SetContext(app.checkState.Context().
if checkState := app.getState(execModeCheck); checkState != nil {
checkState.SetContext(checkState.Context().
WithBlockGasMeter(gasMeter).
WithHeaderHash(req.Hash))
}
Expand Down Expand Up @@ -833,8 +839,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
events = append(events, beginBlock.Events...)

// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
gasMeter = app.getBlockGasMeter(finalizeState.Context())
finalizeState.SetContext(finalizeState.Context().WithBlockGasMeter(gasMeter))

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
Expand Down Expand Up @@ -863,11 +869,11 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
txResults = append(txResults, response)
}

if app.finalizeBlockState.ms.TracingEnabled() {
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
if finalizeState.ms.TracingEnabled() {
finalizeState.ms = finalizeState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
beer-1 marked this conversation as resolved.
Show resolved Hide resolved
}

endBlock, err := app.endBlock(app.finalizeBlockState.Context())
endBlock, err := app.endBlock(finalizeState.Context())
if err != nil {
return nil, err
}
Expand All @@ -881,7 +887,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
}

events = append(events, endBlock.Events...)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())
cp := app.GetConsensusParams(finalizeState.Context())

return &abci.FinalizeBlockResponse{
Events: events,
Expand All @@ -905,7 +911,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin
defer func() {
// call the streaming service hooks with the FinalizeBlock messages
for _, streamingListener := range app.streamingManager.ABCIListeners {
if streamErr := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); streamErr != nil {
if streamErr := streamingListener.ListenFinalizeBlock(app.getState(execModeFinalize).Context(), *req, *res); streamErr != nil {
app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err)
if app.streamingManager.StopNodeOnErr {
// if StopNodeOnErr is set, we should return the streamErr in order to stop the node
Expand All @@ -931,7 +937,8 @@ func (app *BaseApp) FinalizeBlock(req *abci.FinalizeBlockRequest) (res *abci.Fin
}

// if it was aborted, we need to reset the state
app.finalizeBlockState = nil
app.clearState(execModeFinalize)

app.optimisticExec.Reset()
}

Expand Down Expand Up @@ -970,11 +977,12 @@ func (app *BaseApp) checkHalt(height int64, time time.Time) error {
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
header := app.finalizeBlockState.Context().BlockHeader()
finalizeState := app.getState(execModeFinalize)
header := finalizeState.Context().BlockHeader()
retainHeight := app.GetBlockRetentionHeight(header.Height)

if app.precommiter != nil {
app.precommiter(app.finalizeBlockState.Context())
app.precommiter(finalizeState.Context())
}

rms, ok := app.cms.(*rootmulti.Store)
Expand All @@ -990,7 +998,7 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {

abciListeners := app.streamingManager.ABCIListeners
if len(abciListeners) > 0 {
ctx := app.finalizeBlockState.Context()
ctx := finalizeState.Context()
blockHeight := ctx.BlockHeight()
changeSet := app.cms.PopStateCache()

Expand Down Expand Up @@ -1018,11 +1026,10 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
// NOTE: This is safe because CometBFT holds a lock on the mempool for
// Commit. Use the header from this latest block.
app.setState(execModeCheck, header)

app.finalizeBlockState = nil
app.clearState(execModeFinalize)

if app.prepareCheckStater != nil {
app.prepareCheckStater(app.checkState.Context())
app.prepareCheckStater(app.getState(execModeCheck).Context())
}

// The SnapshotIfApplicable method will create the snapshot by starting the goroutine
Expand All @@ -1040,7 +1047,7 @@ func (app *BaseApp) workingHash() []byte {
// Write the FinalizeBlock state into branched storage and commit the MultiStore.
// The write to the FinalizeBlock state writes all state transitions to the root
// MultiStore (app.cms) so when Commit() is called it persists those values.
app.finalizeBlockState.ms.Write()
app.getState(execModeFinalize).ms.Write()

// Get the hash of all writes in order to return the apphash to the comet in finalizeBlock.
commitHash := app.cms.WorkingHash()
Expand Down Expand Up @@ -1187,7 +1194,7 @@ func (app *BaseApp) FilterPeerByID(info string) *abci.QueryResponse {
// access any state changes made in InitChain.
func (app *BaseApp) getContextForProposal(ctx sdk.Context, height int64) sdk.Context {
if height == app.initialHeight {
ctx, _ = app.finalizeBlockState.Context().CacheContext()
ctx, _ = app.getState(execModeFinalize).Context().CacheContext()

// clear all context data set during InitChain to avoid inconsistent behavior
ctx = ctx.WithHeaderInfo(coreheader.Info{}).WithBlockHeader(cmtproto.Header{})
Expand Down Expand Up @@ -1288,8 +1295,8 @@ func (app *BaseApp) CreateQueryContextWithCheckHeader(height int64, prove, check
var header *cmtproto.Header
isLatest := height == 0
for _, state := range []*state{
app.checkState,
app.finalizeBlockState,
app.getState(execModeCheck),
app.getState(execModeFinalize),
beer-1 marked this conversation as resolved.
Show resolved Hide resolved
} {
if state != nil {
// branch the commit multi-store for safety
Expand Down Expand Up @@ -1403,7 +1410,7 @@ func (app *BaseApp) GetBlockRetentionHeight(commitHeight int64) int64 {
// evidence parameters instead of computing an estimated number of blocks based
// on the unbonding period and block commitment time as the two should be
// equivalent.
cp := app.GetConsensusParams(app.finalizeBlockState.Context())
cp := app.GetConsensusParams(app.getState(execModeFinalize).Context())
if cp.Evidence != nil && cp.Evidence.MaxAgeNumBlocks > 0 {
retentionHeight = commitHeight - cp.Evidence.MaxAgeNumBlocks
}
Expand Down
49 changes: 49 additions & 0 deletions baseapp/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/rand"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -2779,3 +2780,51 @@ func TestABCI_Proposal_FailReCheckTx(t *testing.T) {
require.NotEmpty(t, res.TxResults[0].Events)
require.True(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res))
}

func TestABCI_Race_Commit_Query(t *testing.T) {
suite := NewBaseAppSuite(t, baseapp.SetChainID("test-chain-id"))
app := suite.baseApp

_, err := app.InitChain(&abci.InitChainRequest{
ChainId: "test-chain-id",
ConsensusParams: &cmtproto.ConsensusParams{Block: &cmtproto.BlockParams{MaxGas: 5000000}},
InitialHeight: 1,
})
require.NoError(t, err)
_, err = app.Commit()
require.NoError(t, err)

counter := atomic.Uint64{}
counter.Store(0)

ctx, cancel := context.WithCancel(context.Background())
queryCreator := func() {
for {
select {
case <-ctx.Done():
return
default:
_, err := app.CreateQueryContextWithCheckHeader(0, false, false)
require.NoError(t, err)

counter.Add(1)
}
}
}

for i := 0; i < 100; i++ {
go queryCreator()
}
beer-1 marked this conversation as resolved.
Show resolved Hide resolved

for i := 0; i < 1000; i++ {
_, err = app.FinalizeBlock(&abci.FinalizeBlockRequest{Height: app.LastBlockHeight() + 1})
require.NoError(t, err)

_, err = app.Commit()
require.NoError(t, err)
}

cancel()

require.Equal(t, int64(1001), app.GetContextForCheckTx(nil).BlockHeight())
}
Loading
Loading