Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

epoching: API: epoch_msgs/{epoch_num} -> all events during this epoch #108

Merged
merged 14 commits into from
Sep 4, 2022
10 changes: 6 additions & 4 deletions proto/babylon/epoching/v1/epoching.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ message QueuedMessage {
bytes tx_id = 1;
// msg_id is the original message ID, i.e., hash of the marshaled message
bytes msg_id = 2;
// block_height is the height when this msg is submitted to Babylon
uint64 block_height = 3;
// msg is the actual message that is sent by a user and is queued by the epoching module
oneof msg {
cosmos.staking.v1beta1.MsgCreateValidator msg_create_validator = 3;
cosmos.staking.v1beta1.MsgDelegate msg_delegate = 4;
cosmos.staking.v1beta1.MsgUndelegate msg_undelegate = 5;
cosmos.staking.v1beta1.MsgBeginRedelegate msg_begin_redelegate = 6;
cosmos.staking.v1beta1.MsgCreateValidator msg_create_validator = 4;
cosmos.staking.v1beta1.MsgDelegate msg_delegate = 5;
cosmos.staking.v1beta1.MsgUndelegate msg_undelegate = 6;
cosmos.staking.v1beta1.MsgBeginRedelegate msg_begin_redelegate = 7;
// TODO: after we bump to Cosmos SDK v0.46, add MsgCancelUnbondingDelegation
}
}
9 changes: 5 additions & 4 deletions proto/babylon/epoching/v1/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ message EventEndEpoch {
message EventHandleQueuedMsg {
string original_event_type = 1;
uint64 epoch_number = 2;
bytes tx_id = 3;
bytes msg_id = 4;
repeated bytes original_attributes = 5 [
uint64 height = 3;
bytes tx_id = 4;
bytes msg_id = 5;
repeated bytes original_attributes = 6 [
(gogoproto.customtype) = "github.com/tendermint/tendermint/abci/types.EventAttribute"
];
string error = 6;
string error = 7;
}

message EventSlashThreshold {
Expand Down
5 changes: 4 additions & 1 deletion proto/babylon/epoching/v1/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ service Query {

// EpochMsgs queries the messages of a given epoch
rpc EpochMsgs(QueryEpochMsgsRequest) returns (QueryEpochMsgsResponse) {
option (google.api.http).get = "/babylon/epoching/v1/epoch_msgs";
option (google.api.http).get = "/babylon/epoching/v1/epoch_msgs/{epoch_num}";
}
}

Expand All @@ -49,6 +49,9 @@ message QueryCurrentEpochResponse {

// QueryEpochMsgsRequest is request type for the Query/EpochMsgs RPC method
message QueryEpochMsgsRequest {
// epoch_num is the number of epoch of the requested msg queue
uint64 epoch_num = 1;

// pagination defines whether to have the pagination in the response
cosmos.base.query.v1beta1.PageRequest pagination = 2;
}
Expand Down
13 changes: 7 additions & 6 deletions x/epoching/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ func BeginBlocker(ctx sdk.Context, k keeper.Keeper, req abci.RequestBeginBlock)
epoch := k.GetEpoch(ctx)
if epoch.IsFirstBlockOfNextEpoch(ctx) {
// increase epoch number
IncEpoch := k.IncEpoch(ctx)
incEpoch := k.IncEpoch(ctx)
// init the msg queue of this new epoch
k.InitMsgQueue(ctx)
// init the slashed voting power of this new epoch
k.InitSlashedVotingPower(ctx)
// store the current validator set
k.InitValidatorSet(ctx)
// trigger AfterEpochBegins hook
k.AfterEpochBegins(ctx, IncEpoch.EpochNumber)
k.AfterEpochBegins(ctx, incEpoch.EpochNumber)
// emit BeginEpoch event
err := ctx.EventManager().EmitTypedEvent(
&types.EventBeginEpoch{
EpochNumber: IncEpoch.EpochNumber,
EpochNumber: incEpoch.EpochNumber,
},
)
if err != nil {
Expand All @@ -59,7 +61,7 @@ func EndBlocker(ctx sdk.Context, k keeper.Keeper) []abci.ValidatorUpdate {
epoch := k.GetEpoch(ctx)
if epoch.IsLastBlock(ctx) {
// get all msgs in the msg queue
queuedMsgs := k.GetEpochMsgs(ctx)
queuedMsgs := k.GetCurrentEpochMsgs(ctx)
// forward each msg in the msg queue to the right keeper
for _, msg := range queuedMsgs {
res, err := k.HandleQueuedMsg(ctx, msg)
Expand All @@ -72,6 +74,7 @@ func EndBlocker(ctx sdk.Context, k keeper.Keeper) []abci.ValidatorUpdate {
err := ctx.EventManager().EmitTypedEvent(
&types.EventHandleQueuedMsg{
EpochNumber: epoch.EpochNumber,
Height: msg.BlockHeight,
TxId: msg.TxId,
MsgId: msg.MsgId,
Error: err.Error(),
Expand Down Expand Up @@ -102,8 +105,6 @@ func EndBlocker(ctx sdk.Context, k keeper.Keeper) []abci.ValidatorUpdate {

// update validator set
validatorSetUpdate = k.ApplyAndReturnValidatorSetUpdates(ctx)
// clear the current msg queue
k.ClearEpochMsgs(ctx)
// trigger AfterEpochEnds hook
k.AfterEpochEnds(ctx, epoch.EpochNumber)
// emit EndEpoch event
Expand Down
4 changes: 2 additions & 2 deletions x/epoching/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ func InitGenesis(ctx sdk.Context, k keeper.Keeper, genState types.GenesisState)
k.SetParams(ctx, genState.Params)
// init epoch number
k.InitEpoch(ctx)
// init msg queue length
k.InitQueueLength(ctx)
// init msg queue
k.InitMsgQueue(ctx)
// init validator set
k.InitValidatorSet(ctx)
// init slashed voting power
Expand Down
119 changes: 69 additions & 50 deletions x/epoching/keeper/epoch_msg_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,69 +9,76 @@ import (
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

// InitQueueLength initialises the msg queue length to 0
func (k Keeper) InitQueueLength(ctx sdk.Context) {
store := ctx.KVStore(k.storeKey)
// InitMsgQueue initialises the msg queue length of the current epoch to 0
func (k Keeper) InitMsgQueue(ctx sdk.Context) {
store := k.msgQueueLengthStore(ctx)

epochNumber := k.GetEpoch(ctx).EpochNumber
epochNumberBytes := sdk.Uint64ToBigEndian(epochNumber)
queueLenBytes := sdk.Uint64ToBigEndian(0)
store.Set(types.QueueLengthKey, queueLenBytes)
store.Set(epochNumberBytes, queueLenBytes)
}

// GetQueueLength fetches the number of queued messages
func (k Keeper) GetQueueLength(ctx sdk.Context) uint64 {
store := ctx.KVStore(k.storeKey)
// GetQueueLength fetches the number of queued messages of a given epoch
func (k Keeper) GetQueueLength(ctx sdk.Context, epochNumber uint64) uint64 {
store := k.msgQueueLengthStore(ctx)
epochNumberBytes := sdk.Uint64ToBigEndian(epochNumber)

// get queue len in bytes from DB
bz := store.Get(types.QueueLengthKey)
bz := store.Get(epochNumberBytes)
if bz == nil {
panic(types.ErrUnknownQueueLen)
return 0 // BBN has not reached this epoch yet
}
// unmarshal
return sdk.BigEndianToUint64(bz)
}

// setQueueLength sets the msg queue length
func (k Keeper) setQueueLength(ctx sdk.Context, queueLen uint64) {
store := ctx.KVStore(k.storeKey)

queueLenBytes := sdk.Uint64ToBigEndian(queueLen)
store.Set(types.QueueLengthKey, queueLenBytes)
// GetQueueLength fetches the number of queued messages of the current epoch
func (k Keeper) GetCurrentQueueLength(ctx sdk.Context) uint64 {
epochNumber := k.GetEpoch(ctx).EpochNumber
return k.GetQueueLength(ctx, epochNumber)
}

// incQueueLength adds the queue length by 1
func (k Keeper) incQueueLength(ctx sdk.Context) {
queueLen := k.GetQueueLength(ctx)
// incCurrentQueueLength adds the queue length of the current epoch by 1
func (k Keeper) incCurrentQueueLength(ctx sdk.Context) {
store := k.msgQueueLengthStore(ctx)

epochNumber := k.GetEpoch(ctx).EpochNumber
epochNumberBytes := sdk.Uint64ToBigEndian(epochNumber)

queueLen := k.GetQueueLength(ctx, epochNumber)
incrementedQueueLen := queueLen + 1
k.setQueueLength(ctx, incrementedQueueLen)
incrementedQueueLenBytes := sdk.Uint64ToBigEndian(incrementedQueueLen)

store.Set(epochNumberBytes, incrementedQueueLenBytes)
}

// EnqueueMsg enqueues a message to the queue of the current epoch
func (k Keeper) EnqueueMsg(ctx sdk.Context, msg types.QueuedMessage) {
// prefix: QueuedMsgKey
store := ctx.KVStore(k.storeKey)
queuedMsgStore := prefix.NewStore(store, types.QueuedMsgKey)
epochNumber := k.GetEpoch(ctx).EpochNumber
store := k.msgQueueStore(ctx, epochNumber)

// key: queueLenBytes
queueLen := k.GetQueueLength(ctx)
// key: index, in this case = queueLenBytes
queueLen := k.GetCurrentQueueLength(ctx)
queueLenBytes := sdk.Uint64ToBigEndian(queueLen)
// value: msgBytes
msgBytes, err := k.cdc.Marshal(&msg)
if err != nil {
panic(sdkerrors.Wrap(types.ErrMarshal, err.Error()))
}
queuedMsgStore.Set(queueLenBytes, msgBytes)
store.Set(queueLenBytes, msgBytes)

// increment queue length
k.incQueueLength(ctx)
k.incCurrentQueueLength(ctx)
}

// GetEpochMsgs returns the set of messages queued in the current epoch
func (k Keeper) GetEpochMsgs(ctx sdk.Context) []*types.QueuedMessage {
// GetEpochMsgs returns the set of messages queued in a given epoch
func (k Keeper) GetEpochMsgs(ctx sdk.Context, epochNumber uint64) []*types.QueuedMessage {
queuedMsgs := []*types.QueuedMessage{}
store := ctx.KVStore(k.storeKey)
store := k.msgQueueStore(ctx, epochNumber)

// add each queued msg to queuedMsgs
iterator := sdk.KVStorePrefixIterator(store, types.QueuedMsgKey)
iterator := sdk.KVStorePrefixIterator(store, nil)
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
queuedMsgBytes := iterator.Value()
Expand All @@ -85,20 +92,10 @@ func (k Keeper) GetEpochMsgs(ctx sdk.Context) []*types.QueuedMessage {
return queuedMsgs
}

// ClearEpochMsgs removes all messages in the queue
func (k Keeper) ClearEpochMsgs(ctx sdk.Context) {
store := ctx.KVStore(k.storeKey)

// remove all epoch msgs
iterator := sdk.KVStorePrefixIterator(store, types.QueuedMsgKey)
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
key := iterator.Key()
store.Delete(key)
}

// set queue len to zero
k.setQueueLength(ctx, 0)
// GetCurrentEpochMsgs returns the set of messages queued in the current epoch
func (k Keeper) GetCurrentEpochMsgs(ctx sdk.Context) []*types.QueuedMessage {
epochNumber := k.GetEpoch(ctx).EpochNumber
return k.GetEpochMsgs(ctx, epochNumber)
}

// HandleQueuedMsg unwraps a QueuedMessage and forwards it to the staking module
Expand All @@ -123,20 +120,21 @@ func (k Keeper) HandleQueuedMsg(ctx sdk.Context, msg *types.QueuedMessage) (*sdk

// Create a new Context based off of the existing Context with a MultiStore branch
// in case message processing fails. At this point, the MultiStore is a branch of a branch.
handlerCtx, msCache := cacheTxContext(ctx, msg.TxId, msg.MsgId)
handlerCtx, msCache := cacheTxContext(ctx, msg.TxId, msg.MsgId, msg.BlockHeight)

// handle the unwrapped message
result, err := handler(handlerCtx, unwrappedMsgWithType)

if err == nil {
msCache.Write()
if err != nil {
return result, err
}

return result, err
msCache.Write()

return result, nil
}

// based on a function with the same name in `baseapp.go``
func cacheTxContext(ctx sdk.Context, txid []byte, msgid []byte) (sdk.Context, sdk.CacheMultiStore) {
func cacheTxContext(ctx sdk.Context, txid []byte, msgid []byte, height uint64) (sdk.Context, sdk.CacheMultiStore) {
ms := ctx.MultiStore()
// TODO: https://github.com/cosmos/cosmos-sdk/issues/2824
msCache := ms.CacheMultiStore()
Expand All @@ -146,10 +144,31 @@ func cacheTxContext(ctx sdk.Context, txid []byte, msgid []byte) (sdk.Context, sd
map[string]interface{}{
"txHash": fmt.Sprintf("%X", txid),
"msgHash": fmt.Sprintf("%X", msgid),
"height": fmt.Sprintf("%d", height),
},
),
).(sdk.CacheMultiStore)
}

return ctx.WithMultiStore(msCache), msCache
}

// msgQueueStore returns the queue of msgs of a given epoch
// prefix: MsgQueueKey || epochNumber
// key: index
// value: msg
func (k Keeper) msgQueueStore(ctx sdk.Context, epochNumber uint64) prefix.Store {
store := ctx.KVStore(k.storeKey)
msgQueueStore := prefix.NewStore(store, types.MsgQueueKey)
epochNumberBytes := sdk.Uint64ToBigEndian(epochNumber)
return prefix.NewStore(msgQueueStore, epochNumberBytes)
}

// msgQueueLengthStore returns the length of the msg queue of a given epoch
// prefix: QueueLengthKey
// key: epochNumber
// value: queue length
func (k Keeper) msgQueueLengthStore(ctx sdk.Context) prefix.Store {
store := ctx.KVStore(k.storeKey)
return prefix.NewStore(store, types.QueueLengthKey)
}
Loading