Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deadlock when querying group members (backport #12342) #12381

Merged
merged 2 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions x/group/internal/orm/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,8 @@ func uniqueKeysAddFunc(store sdk.KVStore, secondaryIndexKey interface{}, rowID R
return sdkerrors.Wrap(errors.ErrORMInvalidArgument, "empty index key")
}

it := store.Iterator(PrefixRange(secondaryIndexKeyBytes))
defer it.Close()
if it.Valid() {
return errors.ErrORMUniqueConstraint
if err := checkUniqueIndexKey(store, secondaryIndexKeyBytes); err != nil {
return err
}

indexKey, err := buildKeyFromParts([]interface{}{secondaryIndexKey, []byte(rowID)})
Expand All @@ -144,6 +142,16 @@ func uniqueKeysAddFunc(store sdk.KVStore, secondaryIndexKey interface{}, rowID R
return nil
}

// checkUniqueIndexKey checks that the given secondary index key is unique
func checkUniqueIndexKey(store sdk.KVStore, secondaryIndexKeyBytes []byte) error {
it := store.Iterator(PrefixRange(secondaryIndexKeyBytes))
defer it.Close()
if it.Valid() {
return errors.ErrORMUniqueConstraint
}
return nil
}

// multiKeyAddFunc allows multiple entries for a key
func multiKeyAddFunc(store sdk.KVStore, secondaryIndexKey interface{}, rowID RowID) error {
secondaryIndexKeyBytes, err := keyPartBytes(secondaryIndexKey, false)
Expand Down
24 changes: 16 additions & 8 deletions x/group/internal/orm/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ func (a table) Has(store sdk.KVStore, key RowID) bool {
return false
}
pStore := prefix.NewStore(store, a.prefix[:])
it := pStore.Iterator(PrefixRange(key))
defer it.Close()
return it.Valid()
return pStore.Has(key)
}

// GetOne load the object persisted for the given RowID into the dest parameter.
Expand Down Expand Up @@ -252,11 +250,9 @@ func (a table) Export(store sdk.KVStore, dest ModelSlicePtr) (uint64, error) {
// data should be a slice of structs that implement PrimaryKeyed.
func (a table) Import(store sdk.KVStore, data interface{}, _ uint64) error {
// Clear all data
pStore := prefix.NewStore(store, a.prefix[:])
it := pStore.Iterator(nil, nil)
defer it.Close()
for ; it.Valid(); it.Next() {
if err := a.Delete(store, it.Key()); err != nil {
keys := a.keys(store)
for _, key := range keys {
if err := a.Delete(store, key); err != nil {
return err
}
}
Expand All @@ -282,6 +278,18 @@ func (a table) Import(store sdk.KVStore, data interface{}, _ uint64) error {
return nil
}

func (a table) keys(store sdk.KVStore) [][]byte {
pStore := prefix.NewStore(store, a.prefix[:])
it := pStore.Iterator(nil, nil)
defer it.Close()

var keys [][]byte
for ; it.Valid(); it.Next() {
keys = append(keys, it.Key())
}
return keys
}

// typeSafeIterator is initialized with a type safe RowGetter only.
type typeSafeIterator struct {
store sdk.KVStore
Expand Down
8 changes: 4 additions & 4 deletions x/group/internal/orm/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,12 @@ func NewTypeSafeRowGetter(prefixKey [2]byte, model reflect.Type, cdc codec.Codec
}

pStore := prefix.NewStore(store, prefixKey[:])
it := pStore.Iterator(PrefixRange(rowID))
defer it.Close()
if !it.Valid() {
bz := pStore.Get(rowID)
if len(bz) == 0 {
return sdkerrors.ErrNotFound
}
return cdc.Unmarshal(it.Value(), dest)

return cdc.Unmarshal(bz, dest)
}
}

Expand Down
129 changes: 75 additions & 54 deletions x/group/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,12 @@ func (k Keeper) GetGroupSequence(ctx sdk.Context) uint64 {
return k.groupTable.Sequence().CurVal(ctx.KVStore(k.key))
}

// iterateProposalsByVPEnd iterates over all proposals whose voting_period_end is after the `endTime` time argument.
func (k Keeper) iterateProposalsByVPEnd(ctx sdk.Context, endTime time.Time, cb func(proposal group.Proposal) (bool, error)) error {
// proposalsByVPEnd returns all proposals whose voting_period_end is after the `endTime` time argument.
func (k Keeper) proposalsByVPEnd(ctx sdk.Context, endTime time.Time) (proposals []group.Proposal, err error) {
timeBytes := sdk.FormatTimeBytes(endTime)
it, err := k.proposalsByVotingPeriodEnd.PrefixScan(ctx.KVStore(k.key), nil, timeBytes)
if err != nil {
return err
return proposals, err
}
defer it.Close()

Expand All @@ -248,19 +248,12 @@ func (k Keeper) iterateProposalsByVPEnd(ctx sdk.Context, endTime time.Time, cb f
break
}
if err != nil {
return err
}

stop, err := cb(proposal)
if err != nil {
return err
}
if stop {
break
return proposals, err
}
proposals = append(proposals, proposal)
}

return nil
return proposals, nil
}

// pruneProposal deletes a proposal from state.
Expand All @@ -279,76 +272,101 @@ func (k Keeper) pruneProposal(ctx sdk.Context, proposalID uint64) error {
// abortProposals iterates through all proposals by group policy index
// and marks submitted proposals as aborted.
func (k Keeper) abortProposals(ctx sdk.Context, groupPolicyAddr sdk.AccAddress) error {
proposalIt, err := k.proposalByGroupPolicyIndex.Get(ctx.KVStore(k.key), groupPolicyAddr.Bytes())
proposals, err := k.proposalsByGroupPolicy(ctx, groupPolicyAddr)
if err != nil {
return err
}

for _, proposalInfo := range proposals {
// Mark all proposals still in the voting phase as aborted.
if proposalInfo.Status == group.PROPOSAL_STATUS_SUBMITTED {
proposalInfo.Status = group.PROPOSAL_STATUS_ABORTED

if err := k.proposalTable.Update(ctx.KVStore(k.key), proposalInfo.Id, &proposalInfo); err != nil {
return err
}
}
}
return nil
}

// proposalsByGroupPolicy returns all proposals for a given group policy.
func (k Keeper) proposalsByGroupPolicy(ctx sdk.Context, groupPolicyAddr sdk.AccAddress) ([]group.Proposal, error) {
proposalIt, err := k.proposalByGroupPolicyIndex.Get(ctx.KVStore(k.key), groupPolicyAddr.Bytes())
if err != nil {
return nil, err
}
defer proposalIt.Close()

var proposals []group.Proposal
for {
var proposalInfo group.Proposal
_, err = proposalIt.LoadNext(&proposalInfo)
if errors.ErrORMIteratorDone.Is(err) {
break
}
if err != nil {
return err
return proposals, err
}

// Mark all proposals still in the voting phase as aborted.
if proposalInfo.Status == group.PROPOSAL_STATUS_SUBMITTED {
proposalInfo.Status = group.PROPOSAL_STATUS_ABORTED

if err := k.proposalTable.Update(ctx.KVStore(k.key), proposalInfo.Id, &proposalInfo); err != nil {
return err
}
}
proposals = append(proposals, proposalInfo)
}
return nil
return proposals, nil
}

// pruneVotes prunes all votes for a proposal from state.
func (k Keeper) pruneVotes(ctx sdk.Context, proposalID uint64) error {
store := ctx.KVStore(k.key)
it, err := k.voteByProposalIndex.Get(store, proposalID)
votes, err := k.votesByProposal(ctx, proposalID)
if err != nil {
return err
}

for _, v := range votes {
err = k.voteTable.Delete(ctx.KVStore(k.key), &v)
if err != nil {
return err
}
}

return nil
}

// votesByProposal returns all votes for a given proposal.
func (k Keeper) votesByProposal(ctx sdk.Context, proposalID uint64) ([]group.Vote, error) {
it, err := k.voteByProposalIndex.Get(ctx.KVStore(k.key), proposalID)
if err != nil {
return nil, err
}
defer it.Close()

var votes []group.Vote
for {
var vote group.Vote
_, err = it.LoadNext(&vote)
if errors.ErrORMIteratorDone.Is(err) {
break
}
if err != nil {
return err
}

err = k.voteTable.Delete(store, &vote)
if err != nil {
return err
return votes, err
}
votes = append(votes, vote)
}

return nil
return votes, nil
}

// PruneProposals prunes all proposals that are expired, i.e. whose
// `voting_period + max_execution_period` is greater than the current block
// time.
func (k Keeper) PruneProposals(ctx sdk.Context) error {
err := k.iterateProposalsByVPEnd(ctx, ctx.BlockTime().Add(-k.config.MaxExecutionPeriod), func(proposal group.Proposal) (bool, error) {
proposals, err := k.proposalsByVPEnd(ctx, ctx.BlockTime().Add(-k.config.MaxExecutionPeriod))
if err != nil {
return nil
}
for _, proposal := range proposals {
err := k.pruneProposal(ctx, proposal.Id)
if err != nil {
return true, err
return err
}

return false, nil
})
if err != nil {
return err
}

return nil
Expand All @@ -358,36 +376,39 @@ func (k Keeper) PruneProposals(ctx sdk.Context) error {
// has ended, tallies their votes, prunes them, and updates the proposal's
// `FinalTallyResult` field.
func (k Keeper) TallyProposalsAtVPEnd(ctx sdk.Context) error {
return k.iterateProposalsByVPEnd(ctx, ctx.BlockTime(), func(proposal group.Proposal) (bool, error) {
proposals, err := k.proposalsByVPEnd(ctx, ctx.BlockTime())
if err != nil {
return nil
}
for _, proposal := range proposals {
policyInfo, err := k.getGroupPolicyInfo(ctx, proposal.GroupPolicyAddress)
if err != nil {
return true, sdkerrors.Wrap(err, "group policy")
return sdkerrors.Wrap(err, "group policy")
}

electorate, err := k.getGroupInfo(ctx, policyInfo.GroupId)
if err != nil {
return true, sdkerrors.Wrap(err, "group")
return sdkerrors.Wrap(err, "group")
}

proposalId := proposal.Id
if proposal.Status == group.PROPOSAL_STATUS_ABORTED || proposal.Status == group.PROPOSAL_STATUS_WITHDRAWN {
if err := k.pruneProposal(ctx, proposalId); err != nil {
return true, err
proposalID := proposal.Id
if err := k.pruneProposal(ctx, proposalID); err != nil {
Fixed Show fixed Hide fixed
return err
}
if err := k.pruneVotes(ctx, proposalId); err != nil {
return true, err
if err := k.pruneVotes(ctx, proposalID); err != nil {
return err
}
} else {
err = k.doTallyAndUpdate(ctx, &proposal, electorate, policyInfo)
if err != nil {
return true, sdkerrors.Wrap(err, "doTallyAndUpdate")
return sdkerrors.Wrap(err, "doTallyAndUpdate")
}

if err := k.proposalTable.Update(ctx.KVStore(k.key), proposal.Id, &proposal); err != nil {
return true, sdkerrors.Wrap(err, "proposal update")
return sdkerrors.Wrap(err, "proposal update")
}
}

return false, nil
})
}
return nil
}
34 changes: 34 additions & 0 deletions x/group/keeper/keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,40 @@ func TestKeeperTestSuite(t *testing.T) {
suite.Run(t, new(TestSuite))
}

// Testing a deadlock issue when querying group members
// https://github.com/cosmos/cosmos-sdk/issues/12111
func (s *TestSuite) TestCreateGroupWithLotsOfMembers() {
for i := 50; i < 70; i++ {
membersResp := s.createGroupAndGetMembers(i)
s.Require().Equal(len(membersResp), i)
}
}

func (s *TestSuite) createGroupAndGetMembers(numMembers int) []*group.GroupMember {
addressPool := simapp.AddTestAddrsIncremental(s.app, s.sdkCtx, numMembers, sdk.NewInt(30000000))
members := make([]group.MemberRequest, numMembers)
for i := 0; i < len(members); i++ {
members[i] = group.MemberRequest{
Address: addressPool[i].String(),
Weight: "1",
}
}

g, err := s.keeper.CreateGroup(s.ctx, &group.MsgCreateGroup{
Admin: members[0].Address,
Members: members,
})
s.Require().NoErrorf(err, "failed to create group with %d members", len(members))
s.T().Logf("group %d created with %d members", g.GroupId, len(members))

groupMemberResp, err := s.keeper.GroupMembers(s.ctx, &group.QueryGroupMembersRequest{GroupId: g.GroupId})
s.Require().NoError(err)

s.T().Logf("got %d members from group %d", len(groupMemberResp.Members), g.GroupId)

return groupMemberResp.Members
}

func (s *TestSuite) TestCreateGroup() {
addrs := s.addrs
addr1 := addrs[0]
Expand Down