Skip to content

Commit

Permalink
Merge branch 'main' into HEAD
Browse files Browse the repository at this point in the history
* main:
  [SeiDB] Fix concurrent map access (#411)
  No longer disable dynamic dep generation during ACL dependency generation (#404)
  fix(baseapp): Ensure Panic Recovery in Prepare & Process Handlers (#401)
  Revert removing events for cachekv (#396)
  Add migration handler for disabling seqno (#394)
  • Loading branch information
yzang2019 committed Jan 31, 2024
2 parents f659904 + 219175b commit c70410a
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 36 deletions.
61 changes: 48 additions & 13 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/legacytm"
"github.com/cosmos/cosmos-sdk/utils"
)

// InitChain implements the ABCI interface. It runs the initialization logic
Expand Down Expand Up @@ -929,7 +930,7 @@ func splitPath(requestPath string) (path []string) {
}

// ABCI++
func (app *BaseApp) PrepareProposal(ctx context.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
func (app *BaseApp) PrepareProposal(ctx context.Context, req *abci.RequestPrepareProposal) (resp *abci.ResponsePrepareProposal, err error) {
defer telemetry.MeasureSince(time.Now(), "abci", "prepare_proposal")

header := tmproto.Header{
Expand Down Expand Up @@ -963,21 +964,40 @@ func (app *BaseApp) PrepareProposal(ctx context.Context, req *abci.RequestPrepar

app.preparePrepareProposalState()

defer func() {
if err := recover(); err != nil {
app.logger.Error(
"panic recovered in PrepareProposal",
"height", req.Height,
"time", req.Time,
"panic", err,
)

resp = &abci.ResponsePrepareProposal{
TxRecords: utils.Map(req.Txs, func(tx []byte) *abci.TxRecord {
return &abci.TxRecord{Action: abci.TxRecord_UNMODIFIED, Tx: tx}
}),
}
}
}()

if app.prepareProposalHandler != nil {
res, err := app.prepareProposalHandler(app.prepareProposalState.ctx, req)
resp, err = app.prepareProposalHandler(app.prepareProposalState.ctx, req)
if err != nil {
return nil, err
}

if cp := app.GetConsensusParams(app.prepareProposalState.ctx); cp != nil {
res.ConsensusParamUpdates = cp
resp.ConsensusParamUpdates = cp
}
return res, nil
} else {
return nil, errors.New("no prepare proposal handler")

return resp, nil
}

return nil, errors.New("no prepare proposal handler")
}

func (app *BaseApp) ProcessProposal(ctx context.Context, req *abci.RequestProcessProposal) (*abci.ResponseProcessProposal, error) {
func (app *BaseApp) ProcessProposal(ctx context.Context, req *abci.RequestProcessProposal) (resp *abci.ResponseProcessProposal, err error) {
defer telemetry.MeasureSince(time.Now(), "abci", "process_proposal")

header := tmproto.Header{
Expand Down Expand Up @@ -1018,21 +1038,36 @@ func (app *BaseApp) ProcessProposal(ctx context.Context, req *abci.RequestProces
}

// NOTE: header hash is not set in NewContext, so we manually set it here

app.prepareProcessProposalState(gasMeter, req.Hash)

defer func() {
if err := recover(); err != nil {
app.logger.Error(
"panic recovered in ProcessProposal",
"height", req.Height,
"time", req.Time,
"hash", fmt.Sprintf("%X", req.Hash),
"panic", err,
)

resp = &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}
}
}()

if app.processProposalHandler != nil {
res, err := app.processProposalHandler(app.processProposalState.ctx, req)
resp, err = app.processProposalHandler(app.processProposalState.ctx, req)
if err != nil {
return nil, err
}

if cp := app.GetConsensusParams(app.processProposalState.ctx); cp != nil {
res.ConsensusParamUpdates = cp
resp.ConsensusParamUpdates = cp
}
return res, nil
} else {
return nil, errors.New("no process proposal handler")

return resp, nil
}

return nil, errors.New("no process proposal handler")
}

func (app *BaseApp) FinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
Expand Down
23 changes: 15 additions & 8 deletions store/cachekv/mergeiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
//
// TODO: Optimize by memoizing.
type cacheMergeIterator struct {
parent types.Iterator
cache types.Iterator
ascending bool
storeKey sdktypes.StoreKey
parent types.Iterator
cache types.Iterator
ascending bool
storeKey sdktypes.StoreKey
eventManager *sdktypes.EventManager
}

var _ types.Iterator = (*cacheMergeIterator)(nil)
Expand All @@ -28,12 +29,14 @@ func NewCacheMergeIterator(
parent, cache types.Iterator,
ascending bool,
storeKey sdktypes.StoreKey,
eventManager *sdktypes.EventManager,
) *cacheMergeIterator {
iter := &cacheMergeIterator{
parent: parent,
cache: cache,
ascending: ascending,
storeKey: storeKey,
parent: parent,
cache: cache,
ascending: ascending,
storeKey: storeKey,
eventManager: eventManager,
}

return iter
Expand Down Expand Up @@ -135,12 +138,14 @@ func (iter *cacheMergeIterator) Value() []byte {
// If parent is invalid, get the cache value.
if !iter.parent.Valid() {
value := iter.cache.Value()
iter.eventManager.EmitResourceAccessReadEvent("iterator", iter.storeKey, iter.cache.Key(), value)
return value
}

// If cache is invalid, get the parent value.
if !iter.cache.Valid() {
value := iter.parent.Value()
iter.eventManager.EmitResourceAccessReadEvent("iterator", iter.storeKey, iter.parent.Key(), value)
return value
}

Expand All @@ -151,9 +156,11 @@ func (iter *cacheMergeIterator) Value() []byte {
switch cmp {
case -1: // parent < cache
value := iter.parent.Value()
iter.eventManager.EmitResourceAccessReadEvent("iterator", iter.storeKey, keyP, value)
return value
case 0, 1: // parent >= cache
value := iter.cache.Value()
iter.eventManager.EmitResourceAccessReadEvent("iterator", iter.storeKey, keyC, value)
return value
default:
panic("invalid comparison result")
Expand Down
28 changes: 22 additions & 6 deletions store/cachekv/mergeiterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/types"
sdktypes "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
)

func TestMangerIterator(t *testing.T) {
// initiate mock kvstore
mem := dbadapter.Store{DB: dbm.NewMemDB()}
eventManager := sdktypes.NewEventManager()
kvstore := cachekv.NewStore(mem, types.NewKVStoreKey("CacheKvTest"), types.DefaultCacheSizeLimit)
value := randSlice(defaultValueSizeBz)
startKey := randSlice(32)
Expand All @@ -27,13 +29,27 @@ func TestMangerIterator(t *testing.T) {
cache := kvstore.Iterator(nil, nil)
for ; cache.Valid(); cache.Next() {
}
iter := cachekv.NewCacheMergeIterator(parent, cache, true, types.NewKVStoreKey("CacheKvTest"))
iter := cachekv.NewCacheMergeIterator(parent, cache, true, types.NewKVStoreKey("CacheKvTest"), eventManager)

// get the next value and it should not be nil
nextValue := iter.Value()
require.NotNil(t, nextValue)
// get the next value
iter.Value()

// assert the resource access is still emitted correctly when the cache store is unavailable
require.Equal(t, "access_type", string(eventManager.Events()[0].Attributes[0].Key))
require.Equal(t, "read", string(eventManager.Events()[0].Attributes[0].Value))
require.Equal(t, "store_key", string(eventManager.Events()[0].Attributes[1].Key))
require.Equal(t, "CacheKvTest", string(eventManager.Events()[0].Attributes[1].Value))

// assert event emission when cache is available
cache = kvstore.Iterator(keys[1], keys[2])
iter = cachekv.NewCacheMergeIterator(parent, cache, true, types.NewKVStoreKey("CacheKvTest"), eventManager)

// get the next value
nextValue = iter.Value()
require.NotNil(t, nextValue)
iter.Value()

// assert the resource access is still emitted correctly when the cache store is available
require.Equal(t, "access_type", string(eventManager.Events()[0].Attributes[0].Key))
require.Equal(t, "read", string(eventManager.Events()[0].Attributes[0].Value))
require.Equal(t, "store_key", string(eventManager.Events()[0].Attributes[1].Key))
require.Equal(t, "CacheKvTest", string(eventManager.Events()[0].Attributes[1].Value))
}
7 changes: 6 additions & 1 deletion store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func (store *Store) GetEvents() []abci.Event {

// Implements Store
func (store *Store) ResetEvents() {
store.mtx.Lock()
defer store.mtx.Unlock()
store.eventManager = sdktypes.NewEventManager()
}

Expand All @@ -75,6 +77,7 @@ func (store *Store) getFromCache(key []byte) []byte {
// Get implements types.KVStore.
func (store *Store) Get(key []byte) (value []byte) {
types.AssertValidKey(key)
store.eventManager.EmitResourceAccessReadEvent("get", store.storeKey, key, value)
return store.getFromCache(key)
}

Expand All @@ -83,11 +86,13 @@ func (store *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
types.AssertValidValue(value)
store.setCacheValue(key, value, false, true)
store.eventManager.EmitResourceAccessWriteEvent("set", store.storeKey, key, value)
}

// Has implements types.KVStore.
func (store *Store) Has(key []byte) bool {
value := store.Get(key)
store.eventManager.EmitResourceAccessReadEvent("has", store.storeKey, key, value)
return value != nil
}

Expand Down Expand Up @@ -189,7 +194,7 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
}()
store.dirtyItems(start, end)
cache = newMemIterator(start, end, store.sortedCache, store.deleted, ascending, store.eventManager, store.storeKey)
return NewCacheMergeIterator(parent, cache, ascending, store.storeKey)
return NewCacheMergeIterator(parent, cache, ascending, store.storeKey, store.eventManager)
}

func (store *Store) VersionExists(version int64) bool {
Expand Down
2 changes: 2 additions & 0 deletions storev2/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStor
if version <= 0 || (rs.lastCommitInfo != nil && version == rs.lastCommitInfo.Version) {
return rs.CacheMultiStore(), nil
}
rs.mtx.RLock()
defer rs.mtx.RUnlock()
stores := make(map[types.StoreKey]types.CacheWrapper)
// add the transient/mem stores registered in current app.
for k, store := range rs.ckvStores {
Expand Down
5 changes: 0 additions & 5 deletions x/accesscontrol/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,6 @@ func (k Keeper) GetMessageDependencies(ctx sdk.Context, msg sdk.Msg) []acltypes.
ctx.Logger().Error(errorMessage)
}
}
if dependencyMapping.DynamicEnabled {
// there was an issue with dynamic generation, so lets disable it
// this will not error, the validation check was done in previous calls already
_ = k.SetDependencyMappingDynamicFlag(ctx, messageKey, false)
}
return dependencyMapping.AccessOps
}

Expand Down
7 changes: 4 additions & 3 deletions x/accesscontrol/keeper/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func TestInvalidGetMessageDependencies(t *testing.T) {
delete(app.AccessControlKeeper.MessageDependencyGeneratorMapper, undelegateKey)
accessOps := app.AccessControlKeeper.GetMessageDependencies(ctx, &stakingUndelegate)
require.Equal(t, types.SynchronousMessageDependencyMapping("").AccessOps, accessOps)
require.False(t, app.AccessControlKeeper.GetResourceDependencyMapping(ctx, undelegateKey).DynamicEnabled)
// no longer gets disabled such that there arent writes in the dependency generation path
require.True(t, app.AccessControlKeeper.GetResourceDependencyMapping(ctx, undelegateKey).DynamicEnabled)
}

func TestWasmDependencyMapping(t *testing.T) {
Expand Down Expand Up @@ -2433,14 +2434,14 @@ func (suite *KeeperTestSuite) TestMessageDependencies() {
req.Equal(delegateStaticMapping.AccessOps, accessOps)
// verify dynamic got disabled
dependencyMapping = app.AccessControlKeeper.GetResourceDependencyMapping(ctx, delegateKey)
req.Equal(false, dependencyMapping.DynamicEnabled)
req.Equal(true, dependencyMapping.DynamicEnabled)

// lets also try with undelegate, but this time there is no dynamic generator, so we disable it as well
accessOps = app.AccessControlKeeper.GetMessageDependencies(ctx, &stakingUndelegate)
req.Equal(undelegateStaticMapping.AccessOps, accessOps)
// verify dynamic got disabled
dependencyMapping = app.AccessControlKeeper.GetResourceDependencyMapping(ctx, undelegateKey)
req.Equal(false, dependencyMapping.DynamicEnabled)
req.Equal(true, dependencyMapping.DynamicEnabled)
}

func (suite *KeeperTestSuite) TestImportContractReferences() {
Expand Down

0 comments on commit c70410a

Please sign in to comment.