Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

epoching: API: epoch_msgs/{epoch_num} -> all events during this epoch #108

Merged
merged 14 commits into from
Sep 4, 2022
10 changes: 6 additions & 4 deletions proto/babylon/epoching/v1/epoching.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ message QueuedMessage {
bytes tx_id = 1;
// msg_id is the original message ID, i.e., hash of the marshaled message
bytes msg_id = 2;
// block_height is the height when this msg is submitted to Babylon
uint64 block_height = 3;
// msg is the actual message that is sent by a user and is queued by the epoching module
oneof msg {
cosmos.staking.v1beta1.MsgCreateValidator msg_create_validator = 3;
cosmos.staking.v1beta1.MsgDelegate msg_delegate = 4;
cosmos.staking.v1beta1.MsgUndelegate msg_undelegate = 5;
cosmos.staking.v1beta1.MsgBeginRedelegate msg_begin_redelegate = 6;
cosmos.staking.v1beta1.MsgCreateValidator msg_create_validator = 4;
cosmos.staking.v1beta1.MsgDelegate msg_delegate = 5;
cosmos.staking.v1beta1.MsgUndelegate msg_undelegate = 6;
cosmos.staking.v1beta1.MsgBeginRedelegate msg_begin_redelegate = 7;
// TODO: after we bump to Cosmos SDK v0.46, add MsgCancelUnbondingDelegation
}
}
5 changes: 4 additions & 1 deletion proto/babylon/epoching/v1/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ service Query {

// EpochMsgs queries the messages of a given epoch
rpc EpochMsgs(QueryEpochMsgsRequest) returns (QueryEpochMsgsResponse) {
option (google.api.http).get = "/babylon/epoching/v1/epoch_msgs";
option (google.api.http).get = "/babylon/epoching/v1/epoch_msgs/{epoch_num}";
}
}

Expand All @@ -49,6 +49,9 @@ message QueryCurrentEpochResponse {

// QueryEpochMsgsRequest is request type for the Query/EpochMsgs RPC method
message QueryEpochMsgsRequest {
// epoch_num is the number of epoch of the requested msg queue
uint64 epoch_num = 1;

// pagination defines whether to have the pagination in the response
cosmos.base.query.v1beta1.PageRequest pagination = 2;
}
Expand Down
14 changes: 11 additions & 3 deletions x/epoching/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ func BeginBlocker(ctx sdk.Context, k keeper.Keeper, req abci.RequestBeginBlock)
if epoch.IsFirstBlockOfNextEpoch(ctx) {
// increase epoch number
IncEpoch := k.IncEpoch(ctx)
// init epoch msg queue
k.InitMsgQueue(ctx)
// if epoch 1, then copy all queued msgs in epoch 0 to epoch 1
// TODO: more elegant way to do this? e.g., reject all msgs submitted in epoch 0?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand epoch 0 is the genesis block. There the genutil module executes the MsgCreateValidator methods directly, we have no delays. Why is this copying necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There the genutil module executes the MsgCreateValidator methods directly, we have no delays.

Yeah MsgCreateValidator does not go through the msg queue so is not delayed.

Why is this copying necessary?

Cosmos SDK does not execute BeginBlock and EndBlock for the genesis block. If someone submits a wrapped msg (e.g., MsgWrappedDelegate) at genesis block (i.e., in epoch 0), then it will be put in epoch 0's msg queue. However, since epoch 0 does not have EndBlock invocation, the queued msg will never be handled. This corner case was tested by fuzzing tests in epoch_msg_queue_test.go.

This PR works around this issue by copying all queued msgs in epoch 0 to epoch 1 upon BeginBlock of block 1. Another fix I can think of is to reject all wrapped msgs submitted during epoch 0. Do you have any better alternatives?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. I just don't understand how anyone can send anything in the genesis block. The genesis block is created purely based on the genesis.json file, isn't it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's weird that the system handles messages during block 0. So perhaps let's reject validator-related msgs during block 0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it was just an artifact of the simulation? Rejection sounds good to me.

if epoch.EpochNumber == uint64(0) {
epochMsgs := k.GetEpochMsgs(ctx, 0)
for _, msg := range epochMsgs {
k.EnqueueMsg(ctx, *msg)
}
}
// init the slashed voting power of this new epoch
k.InitSlashedVotingPower(ctx)
// store the current validator set
Expand Down Expand Up @@ -59,7 +69,7 @@ func EndBlocker(ctx sdk.Context, k keeper.Keeper) []abci.ValidatorUpdate {
epoch := k.GetEpoch(ctx)
if epoch.IsLastBlock(ctx) {
// get all msgs in the msg queue
queuedMsgs := k.GetEpochMsgs(ctx)
queuedMsgs := k.GetCurrentEpochMsgs(ctx)
// forward each msg in the msg queue to the right keeper
for _, msg := range queuedMsgs {
res, err := k.HandleQueuedMsg(ctx, msg)
Expand Down Expand Up @@ -102,8 +112,6 @@ func EndBlocker(ctx sdk.Context, k keeper.Keeper) []abci.ValidatorUpdate {

// update validator set
validatorSetUpdate = k.ApplyAndReturnValidatorSetUpdates(ctx)
// clear the current msg queue
k.ClearEpochMsgs(ctx)
// trigger AfterEpochEnds hook
k.AfterEpochEnds(ctx, epoch.EpochNumber)
// emit EndEpoch event
Expand Down
2 changes: 1 addition & 1 deletion x/epoching/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func InitGenesis(ctx sdk.Context, k keeper.Keeper, genState types.GenesisState)
// init epoch number
k.InitEpoch(ctx)
// init msg queue length
k.InitQueueLength(ctx)
k.InitMsgQueue(ctx)
// init validator set
k.InitValidatorSet(ctx)
// init slashed voting power
Expand Down
103 changes: 60 additions & 43 deletions x/epoching/keeper/epoch_msg_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,69 +9,76 @@ import (
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

// InitQueueLength initialises the msg queue length to 0
func (k Keeper) InitQueueLength(ctx sdk.Context) {
store := ctx.KVStore(k.storeKey)
// InitQueueLength initialises the msg queue length of the current epoch to 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// InitQueueLength initialises the msg queue length of the current epoch to 0
// InitMsgQueue initialises the msg queue length of the current epoch to 0

func (k Keeper) InitMsgQueue(ctx sdk.Context) {
store := k.msgQueueLengthStore(ctx)

epochNumber := k.GetEpoch(ctx).EpochNumber
epochNumberBytes := sdk.Uint64ToBigEndian(epochNumber)
queueLenBytes := sdk.Uint64ToBigEndian(0)
store.Set(types.QueueLengthKey, queueLenBytes)
store.Set(epochNumberBytes, queueLenBytes)
}

// GetQueueLength fetches the number of queued messages
func (k Keeper) GetQueueLength(ctx sdk.Context) uint64 {
store := ctx.KVStore(k.storeKey)
// GetQueueLength fetches the number of queued messages of a given epoch
func (k Keeper) GetQueueLength(ctx sdk.Context, epochNumber uint64) uint64 {
store := k.msgQueueLengthStore(ctx)
epochNumberBytes := sdk.Uint64ToBigEndian(epochNumber)

// get queue len in bytes from DB
bz := store.Get(types.QueueLengthKey)
bz := store.Get(epochNumberBytes)
if bz == nil {
panic(types.ErrUnknownQueueLen)
aakoshh marked this conversation as resolved.
Show resolved Hide resolved
}
// unmarshal
return sdk.BigEndianToUint64(bz)
}

// setQueueLength sets the msg queue length
func (k Keeper) setQueueLength(ctx sdk.Context, queueLen uint64) {
store := ctx.KVStore(k.storeKey)

queueLenBytes := sdk.Uint64ToBigEndian(queueLen)
store.Set(types.QueueLengthKey, queueLenBytes)
// GetQueueLength fetches the number of queued messages of the current epoch
func (k Keeper) GetCurrentQueueLength(ctx sdk.Context) uint64 {
epochNumber := k.GetEpoch(ctx).EpochNumber
return k.GetQueueLength(ctx, epochNumber)
}

// incQueueLength adds the queue length by 1
func (k Keeper) incQueueLength(ctx sdk.Context) {
queueLen := k.GetQueueLength(ctx)
// incCurrentQueueLength adds the queue length of the current epoch by 1
func (k Keeper) incCurrentQueueLength(ctx sdk.Context) {
store := k.msgQueueLengthStore(ctx)

epochNumber := k.GetEpoch(ctx).EpochNumber
epochNumberBytes := sdk.Uint64ToBigEndian(epochNumber)

queueLen := k.GetQueueLength(ctx, epochNumber)
incrementedQueueLen := queueLen + 1
k.setQueueLength(ctx, incrementedQueueLen)
incrementedQueueLenBytes := sdk.Uint64ToBigEndian(incrementedQueueLen)

store.Set(epochNumberBytes, incrementedQueueLenBytes)
}

// EnqueueMsg enqueues a message to the queue of the current epoch
func (k Keeper) EnqueueMsg(ctx sdk.Context, msg types.QueuedMessage) {
// prefix: QueuedMsgKey
store := ctx.KVStore(k.storeKey)
queuedMsgStore := prefix.NewStore(store, types.QueuedMsgKey)
epochNumber := k.GetEpoch(ctx).EpochNumber
store := k.msgQueueStore(ctx, epochNumber)

// key: queueLenBytes
queueLen := k.GetQueueLength(ctx)
// key: index, in this case = queueLenBytes
queueLen := k.GetCurrentQueueLength(ctx)
queueLenBytes := sdk.Uint64ToBigEndian(queueLen)
// value: msgBytes
msgBytes, err := k.cdc.Marshal(&msg)
if err != nil {
panic(sdkerrors.Wrap(types.ErrMarshal, err.Error()))
}
queuedMsgStore.Set(queueLenBytes, msgBytes)
store.Set(queueLenBytes, msgBytes)

// increment queue length
k.incQueueLength(ctx)
k.incCurrentQueueLength(ctx)
}

// GetEpochMsgs returns the set of messages queued in the current epoch
func (k Keeper) GetEpochMsgs(ctx sdk.Context) []*types.QueuedMessage {
// GetEpochMsgs returns the set of messages queued in a given epoch
func (k Keeper) GetEpochMsgs(ctx sdk.Context, epochNumber uint64) []*types.QueuedMessage {
queuedMsgs := []*types.QueuedMessage{}
store := ctx.KVStore(k.storeKey)
store := k.msgQueueStore(ctx, epochNumber)

// add each queued msg to queuedMsgs
iterator := sdk.KVStorePrefixIterator(store, types.QueuedMsgKey)
iterator := store.Iterator(nil, nil)
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
queuedMsgBytes := iterator.Value()
Expand All @@ -85,20 +92,10 @@ func (k Keeper) GetEpochMsgs(ctx sdk.Context) []*types.QueuedMessage {
return queuedMsgs
}

// ClearEpochMsgs removes all messages in the queue
func (k Keeper) ClearEpochMsgs(ctx sdk.Context) {
store := ctx.KVStore(k.storeKey)

// remove all epoch msgs
iterator := sdk.KVStorePrefixIterator(store, types.QueuedMsgKey)
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
key := iterator.Key()
store.Delete(key)
}

// set queue len to zero
k.setQueueLength(ctx, 0)
// GetCurrentEpochMsgs returns the set of messages queued in the current epoch
func (k Keeper) GetCurrentEpochMsgs(ctx sdk.Context) []*types.QueuedMessage {
epochNumber := k.GetEpoch(ctx).EpochNumber
return k.GetEpochMsgs(ctx, epochNumber)
}

// HandleQueuedMsg unwraps a QueuedMessage and forwards it to the staking module
Expand Down Expand Up @@ -153,3 +150,23 @@ func cacheTxContext(ctx sdk.Context, txid []byte, msgid []byte) (sdk.Context, sd

return ctx.WithMultiStore(msCache), msCache
}

// msgQueueStore returns the queue of msgs of a given epoch
// prefix: MsgQueueKey || epochNumber
// key: index
// value: msg
func (k Keeper) msgQueueStore(ctx sdk.Context, epochNumber uint64) prefix.Store {
store := ctx.KVStore(k.storeKey)
msgQueueStore := prefix.NewStore(store, types.MsgQueueKey)
epochNumberBytes := sdk.Uint64ToBigEndian(epochNumber)
return prefix.NewStore(msgQueueStore, epochNumberBytes)
}

// msgQueueLengthStore returns the length of the msg queue of a given epoch
// prefix: QueueLengthKey
// key: epochNumber
// value: queue length
func (k Keeper) msgQueueLengthStore(ctx sdk.Context) prefix.Store {
store := ctx.KVStore(k.storeKey)
return prefix.NewStore(store, types.QueueLengthKey)
}
35 changes: 6 additions & 29 deletions x/epoching/keeper/epoch_msg_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func FuzzEnqueueMsg(f *testing.F) {
helper := testepoching.NewHelper(t)
ctx, keeper := helper.Ctx, helper.EpochingKeeper
// ensure that the epoch msg queue is correct at the genesis
require.Empty(t, keeper.GetEpochMsgs(ctx))
require.Equal(t, uint64(0), keeper.GetQueueLength(ctx))
require.Empty(t, keeper.GetCurrentEpochMsgs(ctx))
require.Equal(t, uint64(0), keeper.GetCurrentQueueLength(ctx))

// Enqueue a random number of msgs
numQueuedMsgs := rand.Uint64() % 100
Expand All @@ -41,17 +41,12 @@ func FuzzEnqueueMsg(f *testing.F) {
}

// ensure that each msg in the queue is correct
epochMsgs := keeper.GetEpochMsgs(ctx)
epochMsgs := keeper.GetCurrentEpochMsgs(ctx)
for i, msg := range epochMsgs {
require.Equal(t, sdk.Uint64ToBigEndian(uint64(i)), msg.TxId)
require.Equal(t, sdk.Uint64ToBigEndian(uint64(i)), msg.MsgId)
require.Nil(t, msg.Msg)
}

// after clearing the msg queue, ensure that the epoch msg queue is empty
keeper.ClearEpochMsgs(ctx)
require.Empty(t, keeper.GetEpochMsgs(ctx))
require.Equal(t, uint64(0), keeper.GetQueueLength(ctx))
})
}

Expand Down Expand Up @@ -86,7 +81,7 @@ func FuzzHandleQueuedMsg_MsgWrappedDelegate(f *testing.F) {
helper.WrappedDelegate(genAddr, val, coinWithOnePower.Amount)
}
// ensure the msgs are queued
epochMsgs := keeper.GetEpochMsgs(ctx)
epochMsgs := keeper.GetCurrentEpochMsgs(ctx)
require.Equal(t, numNewDels, int64(len(epochMsgs)))

// enter the 1st block and thus epoch 1
Expand All @@ -97,12 +92,6 @@ func FuzzHandleQueuedMsg_MsgWrappedDelegate(f *testing.F) {
ctx = helper.GenAndApplyEmptyBlock()
}

// ensure queued msgs have been handled
queueLen := keeper.GetQueueLength(ctx)
require.Equal(t, uint64(0), queueLen)
epochMsgs = keeper.GetEpochMsgs(ctx)
require.Equal(t, 0, len(epochMsgs))

// ensure the voting power has been added w.r.t. the newly delegated tokens
valPower2, err := helper.EpochingKeeper.GetValidatorVotingPower(ctx, 2, val)
require.NoError(t, err)
Expand Down Expand Up @@ -145,7 +134,7 @@ func FuzzHandleQueuedMsg_MsgWrappedUndelegate(f *testing.F) {
helper.WrappedUndelegate(genAddr, val, coinWithOnePower.Amount)
}
// ensure the msgs are queued
epochMsgs := keeper.GetEpochMsgs(ctx)
epochMsgs := keeper.GetCurrentEpochMsgs(ctx)
require.Equal(t, numNewUndels, int64(len(epochMsgs)))

// enter the 1st block and thus epoch 1
Expand All @@ -156,12 +145,6 @@ func FuzzHandleQueuedMsg_MsgWrappedUndelegate(f *testing.F) {
ctx = helper.GenAndApplyEmptyBlock()
}

// ensure queued msgs have been handled
queueLen := keeper.GetQueueLength(ctx)
require.Equal(t, uint64(0), queueLen)
epochMsgs = keeper.GetEpochMsgs(ctx)
require.Equal(t, 0, len(epochMsgs))

// ensure the voting power has been reduced w.r.t. the unbonding tokens
valPower2, err := helper.EpochingKeeper.GetValidatorVotingPower(ctx, 2, val)
require.NoError(t, err)
Expand Down Expand Up @@ -215,7 +198,7 @@ func FuzzHandleQueuedMsg_MsgWrappedBeginRedelegate(f *testing.F) {
helper.WrappedBeginRedelegate(genAddr, val1, val2, coinWithOnePower.Amount)
}
// ensure the msgs are queued
epochMsgs := keeper.GetEpochMsgs(ctx)
epochMsgs := keeper.GetCurrentEpochMsgs(ctx)
require.Equal(t, numNewRedels, int64(len(epochMsgs)))

// enter the 1st block and thus epoch 1
Expand All @@ -226,12 +209,6 @@ func FuzzHandleQueuedMsg_MsgWrappedBeginRedelegate(f *testing.F) {
ctx = helper.GenAndApplyEmptyBlock()
}

// ensure queued msgs have been handled
queueLen := keeper.GetQueueLength(ctx)
require.Equal(t, uint64(0), queueLen)
aakoshh marked this conversation as resolved.
Show resolved Hide resolved
epochMsgs = keeper.GetEpochMsgs(ctx)
require.Equal(t, 0, len(epochMsgs))

// ensure the voting power has been redelegated from val1 to val2
// Note that in Cosmos SDK, redelegation happens upon the next `EndBlock`, rather than waiting for 14 days.
// This is because redelegation does not affect PoS security: upon redelegation requests, no token is leaving the system.
Expand Down
12 changes: 11 additions & 1 deletion x/epoching/keeper/epoch_val_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ func (k Keeper) GetValidatorSet(ctx sdk.Context, epochNumber uint64) types.Valid
return types.NewSortedValidatorSet(vals)
}

func (k Keeper) GetCurrentValidatorSet(ctx sdk.Context) types.ValidatorSet {
epochNumber := k.GetEpoch(ctx).EpochNumber
return k.GetValidatorSet(ctx, epochNumber)
}

func (k Keeper) GetValidatorPubkey(ctx sdk.Context, valAddr sdk.ValAddress) (cryptotypes.PubKey, bool) {
validator, found := k.stk.GetValidator(ctx, valAddr)
if !found {
Expand Down Expand Up @@ -105,6 +110,11 @@ func (k Keeper) GetValidatorVotingPower(ctx sdk.Context, epochNumber uint64, val
return power.Int64(), nil
}

func (k Keeper) GetCurrentValidatorVotingPower(ctx sdk.Context, valAddr sdk.ValAddress) (int64, error) {
epochNumber := k.GetEpoch(ctx).EpochNumber
return k.GetValidatorVotingPower(ctx, epochNumber, valAddr)
}

// GetTotalVotingPower returns the total voting power of a given epoch
func (k Keeper) GetTotalVotingPower(ctx sdk.Context, epochNumber uint64) int64 {
epochNumberBytes := sdk.Uint64ToBigEndian(epochNumber)
Expand All @@ -131,7 +141,7 @@ func (k Keeper) valSetStore(ctx sdk.Context, epochNumber uint64) prefix.Store {
return prefix.NewStore(valSetStore, epochNumberBytes)
}

// votingPowerStore returns the total voting power of the validator set of a give nepoch
// votingPowerStore returns the total voting power of the validator set of a given epoch
// prefix: ValidatorSetKey
// key: epochNumber
// value: total voting power (in int64 as per Cosmos SDK)
Expand Down
9 changes: 6 additions & 3 deletions x/epoching/keeper/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/babylonchain/babylon/x/epoching/types"
"github.com/cosmos/cosmos-sdk/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/query"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -41,9 +40,13 @@ func (k Keeper) CurrentEpoch(c context.Context, req *types.QueryCurrentEpochRequ
// EpochMsgs handles the QueryEpochMsgsRequest query
func (k Keeper) EpochMsgs(c context.Context, req *types.QueryEpochMsgsRequest) (*types.QueryEpochMsgsResponse, error) {
ctx := sdk.UnwrapSDKContext(c)
epoch := k.GetEpoch(ctx)
if epoch.EpochNumber < req.EpochNum {
return nil, types.ErrUnknownEpochNumber
}

var msgs []*types.QueuedMessage
store := ctx.KVStore(k.storeKey)
epochMsgsStore := prefix.NewStore(store, types.QueuedMsgKey)
epochMsgsStore := k.msgQueueStore(ctx, req.EpochNum)

// handle pagination
// TODO (non-urgent): the epoch might end between pagination requests, leading inconsistent results by the time someone gets to the end. Possible fixes:
Expand Down
Loading