Skip to content

Commit

Permalink
fix: deadlock when querying group members (#12342)
Browse files Browse the repository at this point in the history
  • Loading branch information
blushi authored Jun 28, 2022
1 parent cc83d74 commit 18306a1
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 67 deletions.
16 changes: 12 additions & 4 deletions x/group/internal/orm/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,8 @@ func uniqueKeysAddFunc(store sdk.KVStore, secondaryIndexKey interface{}, rowID R
return sdkerrors.Wrap(errors.ErrORMInvalidArgument, "empty index key")
}

it := store.Iterator(PrefixRange(secondaryIndexKeyBytes))
defer it.Close()
if it.Valid() {
return errors.ErrORMUniqueConstraint
if err := checkUniqueIndexKey(store, secondaryIndexKeyBytes); err != nil {
return err
}

indexKey, err := buildKeyFromParts([]interface{}{secondaryIndexKey, []byte(rowID)})
Expand All @@ -144,6 +142,16 @@ func uniqueKeysAddFunc(store sdk.KVStore, secondaryIndexKey interface{}, rowID R
return nil
}

// checkUniqueIndexKey checks that the given secondary index key is unique
func checkUniqueIndexKey(store sdk.KVStore, secondaryIndexKeyBytes []byte) error {
it := store.Iterator(PrefixRange(secondaryIndexKeyBytes))
defer it.Close()
if it.Valid() {
return errors.ErrORMUniqueConstraint
}
return nil
}

// multiKeyAddFunc allows multiple entries for a key
func multiKeyAddFunc(store sdk.KVStore, secondaryIndexKey interface{}, rowID RowID) error {
secondaryIndexKeyBytes, err := keyPartBytes(secondaryIndexKey, false)
Expand Down
24 changes: 16 additions & 8 deletions x/group/internal/orm/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ func (a table) Has(store sdk.KVStore, key RowID) bool {
return false
}
pStore := prefix.NewStore(store, a.prefix[:])
it := pStore.Iterator(PrefixRange(key))
defer it.Close()
return it.Valid()
return pStore.Has(key)
}

// GetOne load the object persisted for the given RowID into the dest parameter.
Expand Down Expand Up @@ -252,11 +250,9 @@ func (a table) Export(store sdk.KVStore, dest ModelSlicePtr) (uint64, error) {
// data should be a slice of structs that implement PrimaryKeyed.
func (a table) Import(store sdk.KVStore, data interface{}, _ uint64) error {
// Clear all data
pStore := prefix.NewStore(store, a.prefix[:])
it := pStore.Iterator(nil, nil)
defer it.Close()
for ; it.Valid(); it.Next() {
if err := a.Delete(store, it.Key()); err != nil {
keys := a.keys(store)
for _, key := range keys {
if err := a.Delete(store, key); err != nil {
return err
}
}
Expand All @@ -282,6 +278,18 @@ func (a table) Import(store sdk.KVStore, data interface{}, _ uint64) error {
return nil
}

func (a table) keys(store sdk.KVStore) [][]byte {
pStore := prefix.NewStore(store, a.prefix[:])
it := pStore.Iterator(nil, nil)
defer it.Close()

var keys [][]byte
for ; it.Valid(); it.Next() {
keys = append(keys, it.Key())
}
return keys
}

// typeSafeIterator is initialized with a type safe RowGetter only.
type typeSafeIterator struct {
store sdk.KVStore
Expand Down
8 changes: 4 additions & 4 deletions x/group/internal/orm/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ func NewTypeSafeRowGetter(prefixKey [2]byte, model reflect.Type, cdc codec.Codec
}

pStore := prefix.NewStore(store, prefixKey[:])
it := pStore.Iterator(PrefixRange(rowID))
defer it.Close()
if !it.Valid() {
bz := pStore.Get(rowID)
if len(bz) == 0 {
return sdkerrors.ErrNotFound
}
return cdc.Unmarshal(it.Value(), dest)

return cdc.Unmarshal(bz, dest)
}
}

Expand Down
123 changes: 72 additions & 51 deletions x/group/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,12 @@ func (k Keeper) GetGroupSequence(ctx sdk.Context) uint64 {
return k.groupTable.Sequence().CurVal(ctx.KVStore(k.key))
}

// iterateProposalsByVPEnd iterates over all proposals whose voting_period_end is after the `endTime` time argument.
func (k Keeper) iterateProposalsByVPEnd(ctx sdk.Context, endTime time.Time, cb func(proposal group.Proposal) (bool, error)) error {
// proposalsByVPEnd returns all proposals whose voting_period_end is after the `endTime` time argument.
func (k Keeper) proposalsByVPEnd(ctx sdk.Context, endTime time.Time) (proposals []group.Proposal, err error) {
timeBytes := sdk.FormatTimeBytes(endTime)
it, err := k.proposalsByVotingPeriodEnd.PrefixScan(ctx.KVStore(k.key), nil, timeBytes)
if err != nil {
return err
return proposals, err
}
defer it.Close()

Expand All @@ -248,19 +248,12 @@ func (k Keeper) iterateProposalsByVPEnd(ctx sdk.Context, endTime time.Time, cb f
break
}
if err != nil {
return err
}

stop, err := cb(proposal)
if err != nil {
return err
}
if stop {
break
return proposals, err
}
proposals = append(proposals, proposal)
}

return nil
return proposals, nil
}

// pruneProposal deletes a proposal from state.
Expand All @@ -279,76 +272,101 @@ func (k Keeper) pruneProposal(ctx sdk.Context, proposalID uint64) error {
// abortProposals iterates through all proposals by group policy index
// and marks submitted proposals as aborted.
func (k Keeper) abortProposals(ctx sdk.Context, groupPolicyAddr sdk.AccAddress) error {
proposalIt, err := k.proposalByGroupPolicyIndex.Get(ctx.KVStore(k.key), groupPolicyAddr.Bytes())
proposals, err := k.proposalsByGroupPolicy(ctx, groupPolicyAddr)
if err != nil {
return err
}

for _, proposalInfo := range proposals {
// Mark all proposals still in the voting phase as aborted.
if proposalInfo.Status == group.PROPOSAL_STATUS_SUBMITTED {
proposalInfo.Status = group.PROPOSAL_STATUS_ABORTED

if err := k.proposalTable.Update(ctx.KVStore(k.key), proposalInfo.Id, &proposalInfo); err != nil {
return err
}
}
}
return nil
}

// proposalsByGroupPolicy returns all proposals for a given group policy.
func (k Keeper) proposalsByGroupPolicy(ctx sdk.Context, groupPolicyAddr sdk.AccAddress) ([]group.Proposal, error) {
proposalIt, err := k.proposalByGroupPolicyIndex.Get(ctx.KVStore(k.key), groupPolicyAddr.Bytes())
if err != nil {
return nil, err
}
defer proposalIt.Close()

var proposals []group.Proposal
for {
var proposalInfo group.Proposal
_, err = proposalIt.LoadNext(&proposalInfo)
if errors.ErrORMIteratorDone.Is(err) {
break
}
if err != nil {
return err
return proposals, err
}

// Mark all proposals still in the voting phase as aborted.
if proposalInfo.Status == group.PROPOSAL_STATUS_SUBMITTED {
proposalInfo.Status = group.PROPOSAL_STATUS_ABORTED

if err := k.proposalTable.Update(ctx.KVStore(k.key), proposalInfo.Id, &proposalInfo); err != nil {
return err
}
}
proposals = append(proposals, proposalInfo)
}
return nil
return proposals, nil
}

// pruneVotes prunes all votes for a proposal from state.
func (k Keeper) pruneVotes(ctx sdk.Context, proposalID uint64) error {
store := ctx.KVStore(k.key)
it, err := k.voteByProposalIndex.Get(store, proposalID)
votes, err := k.votesByProposal(ctx, proposalID)
if err != nil {
return err
}

for _, v := range votes {
err = k.voteTable.Delete(ctx.KVStore(k.key), &v)
if err != nil {
return err
}
}

return nil
}

// votesByProposal returns all votes for a given proposal.
func (k Keeper) votesByProposal(ctx sdk.Context, proposalID uint64) ([]group.Vote, error) {
it, err := k.voteByProposalIndex.Get(ctx.KVStore(k.key), proposalID)
if err != nil {
return nil, err
}
defer it.Close()

var votes []group.Vote
for {
var vote group.Vote
_, err = it.LoadNext(&vote)
if errors.ErrORMIteratorDone.Is(err) {
break
}
if err != nil {
return err
}

err = k.voteTable.Delete(store, &vote)
if err != nil {
return err
return votes, err
}
votes = append(votes, vote)
}

return nil
return votes, nil
}

// PruneProposals prunes all proposals that are expired, i.e. whose
// `voting_period + max_execution_period` is greater than the current block
// time.
func (k Keeper) PruneProposals(ctx sdk.Context) error {
err := k.iterateProposalsByVPEnd(ctx, ctx.BlockTime().Add(-k.config.MaxExecutionPeriod), func(proposal group.Proposal) (bool, error) {
proposals, err := k.proposalsByVPEnd(ctx, ctx.BlockTime().Add(-k.config.MaxExecutionPeriod))
if err != nil {
return nil
}
for _, proposal := range proposals {
err := k.pruneProposal(ctx, proposal.Id)
if err != nil {
return true, err
return err
}

return false, nil
})
if err != nil {
return err
}

return nil
Expand All @@ -358,36 +376,39 @@ func (k Keeper) PruneProposals(ctx sdk.Context) error {
// has ended, tallies their votes, prunes them, and updates the proposal's
// `FinalTallyResult` field.
func (k Keeper) TallyProposalsAtVPEnd(ctx sdk.Context) error {
return k.iterateProposalsByVPEnd(ctx, ctx.BlockTime(), func(proposal group.Proposal) (bool, error) {
proposals, err := k.proposalsByVPEnd(ctx, ctx.BlockTime())
if err != nil {
return nil
}
for _, proposal := range proposals {
policyInfo, err := k.getGroupPolicyInfo(ctx, proposal.GroupPolicyAddress)
if err != nil {
return true, sdkerrors.Wrap(err, "group policy")
return sdkerrors.Wrap(err, "group policy")
}

electorate, err := k.getGroupInfo(ctx, policyInfo.GroupId)
if err != nil {
return true, sdkerrors.Wrap(err, "group")
return sdkerrors.Wrap(err, "group")
}

proposalID := proposal.Id
if proposal.Status == group.PROPOSAL_STATUS_ABORTED || proposal.Status == group.PROPOSAL_STATUS_WITHDRAWN {
if err := k.pruneProposal(ctx, proposalID); err != nil {
return true, err
return err
}
if err := k.pruneVotes(ctx, proposalID); err != nil {
return true, err
return err
}
} else {
err = k.doTallyAndUpdate(ctx, &proposal, electorate, policyInfo)
if err != nil {
return true, sdkerrors.Wrap(err, "doTallyAndUpdate")
return sdkerrors.Wrap(err, "doTallyAndUpdate")
}

if err := k.proposalTable.Update(ctx.KVStore(k.key), proposal.Id, &proposal); err != nil {
return true, sdkerrors.Wrap(err, "proposal update")
return sdkerrors.Wrap(err, "proposal update")
}
}

return false, nil
})
}
return nil
}
32 changes: 32 additions & 0 deletions x/group/keeper/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,38 @@ func TestKeeperTestSuite(t *testing.T) {
suite.Run(t, new(TestSuite))
}

func (s *TestSuite) TestCreateGroupWithLotsOfMembers() {
for i := 50; i < 70; i++ {
membersResp := s.createGroupAndGetMembers(i)
s.Require().Equal(len(membersResp), i)
}
}

func (s *TestSuite) createGroupAndGetMembers(numMembers int) []*group.GroupMember {
addressPool := simtestutil.AddTestAddrsIncremental(s.bankKeeper, s.stakingKeeper, s.sdkCtx, numMembers, sdk.NewInt(30000000))
members := make([]group.MemberRequest, numMembers)
for i := 0; i < len(members); i++ {
members[i] = group.MemberRequest{
Address: addressPool[i].String(),
Weight: "1",
}
}

g, err := s.groupKeeper.CreateGroup(s.ctx, &group.MsgCreateGroup{
Admin: members[0].Address,
Members: members,
})
s.Require().NoErrorf(err, "failed to create group with %d members", len(members))
s.T().Logf("group %d created with %d members", g.GroupId, len(members))

groupMemberResp, err := s.groupKeeper.GroupMembers(s.ctx, &group.QueryGroupMembersRequest{GroupId: g.GroupId})
s.Require().NoError(err)

s.T().Logf("got %d members from group %d", len(groupMemberResp.Members), g.GroupId)

return groupMemberResp.Members
}

func (s *TestSuite) TestCreateGroup() {
addrs := s.addrs
addr1 := addrs[0]
Expand Down

0 comments on commit 18306a1

Please sign in to comment.