Skip to content

Commit

Permalink
Move events to message server layer instead of core level (#5264)
Browse files Browse the repository at this point in the history
* wip: moving events up to the message server layer

* wip: move events out to message server layer

* chore: commenting out event tests

* Fix merge issue.

---------

Co-authored-by: DimitrisJim <d.f.hilliard@gmail.com>
Co-authored-by: Charly <charly@interchain.io>
  • Loading branch information
3 people authored Dec 4, 2023
1 parent f23dfef commit 25c8247
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 188 deletions.
32 changes: 16 additions & 16 deletions modules/core/04-channel/keeper/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ func emitChannelClosedEvent(ctx sdk.Context, packet exported.PacketI, channel ty
})
}

// emitChannelUpgradeInitEvent emits a channel upgrade init event
func emitChannelUpgradeInitEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
// EmitChannelUpgradeInitEvent emits a channel upgrade init event
func EmitChannelUpgradeInitEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeInit,
Expand All @@ -292,8 +292,8 @@ func emitChannelUpgradeInitEvent(ctx sdk.Context, portID string, channelID strin
})
}

// emitChannelUpgradeTryEvent emits a channel upgrade try event
func emitChannelUpgradeTryEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
// EmitChannelUpgradeTryEvent emits a channel upgrade try event
func EmitChannelUpgradeTryEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeTry,
Expand All @@ -313,8 +313,8 @@ func emitChannelUpgradeTryEvent(ctx sdk.Context, portID string, channelID string
})
}

// emitChannelUpgradeAckEvent emits a channel upgrade ack event
func emitChannelUpgradeAckEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
// EmitChannelUpgradeAckEvent emits a channel upgrade ack event
func EmitChannelUpgradeAckEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeAck,
Expand All @@ -334,8 +334,8 @@ func emitChannelUpgradeAckEvent(ctx sdk.Context, portID string, channelID string
})
}

// emitChannelUpgradeConfirmEvent emits a channel upgrade confirm event
func emitChannelUpgradeConfirmEvent(ctx sdk.Context, portID, channelID string, currentChannel types.Channel) {
// EmitChannelUpgradeConfirmEvent emits a channel upgrade confirm event
func EmitChannelUpgradeConfirmEvent(ctx sdk.Context, portID, channelID string, currentChannel types.Channel) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeConfirm,
Expand All @@ -353,8 +353,8 @@ func emitChannelUpgradeConfirmEvent(ctx sdk.Context, portID, channelID string, c
})
}

// emitChannelUpgradeOpenEvent emits a channel upgrade open event
func emitChannelUpgradeOpenEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel) {
// EmitChannelUpgradeOpenEvent emits a channel upgrade open event
func EmitChannelUpgradeOpenEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeOpen,
Expand All @@ -375,8 +375,8 @@ func emitChannelUpgradeOpenEvent(ctx sdk.Context, portID string, channelID strin
})
}

// emitChannelUpgradeTimeoutEvent emits an upgrade timeout event.
func emitChannelUpgradeTimeoutEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
// EmitChannelUpgradeTimeoutEvent emits an upgrade timeout event.
func EmitChannelUpgradeTimeoutEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeTimeout,
Expand All @@ -397,8 +397,8 @@ func emitChannelUpgradeTimeoutEvent(ctx sdk.Context, portID string, channelID st
})
}

// emitErrorReceiptEvent emits an error receipt event
func emitErrorReceiptEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, err error) {
// EmitErrorReceiptEvent emits an error receipt event
func EmitErrorReceiptEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, err error) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeError,
Expand All @@ -416,8 +416,8 @@ func emitErrorReceiptEvent(ctx sdk.Context, portID string, channelID string, cur
})
}

// emitChannelUpgradeCancelEvent emits an upgraded cancelled event.
func emitChannelUpgradeCancelEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
// EmitChannelUpgradeCancelEvent emits an upgraded cancelled event.
func EmitChannelUpgradeCancelEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeCancel,
Expand Down
67 changes: 28 additions & 39 deletions modules/core/04-channel/keeper/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (k Keeper) ChanUpgradeInit(

// WriteUpgradeInitChannel writes a channel which has successfully passed the UpgradeInit handshake step.
// An event is emitted for the handshake step.
func (k Keeper) WriteUpgradeInitChannel(ctx sdk.Context, portID, channelID string, upgrade types.Upgrade) types.Channel {
func (k Keeper) WriteUpgradeInitChannel(ctx sdk.Context, portID, channelID string, upgrade types.Upgrade, upgradeVersion string) (types.Channel, types.Upgrade) {
defer telemetry.IncrCounter(1, "ibc", "channel", "upgrade-init")

channel, found := k.GetChannel(ctx, portID, channelID)
Expand All @@ -56,13 +56,14 @@ func (k Keeper) WriteUpgradeInitChannel(ctx sdk.Context, portID, channelID strin

channel.UpgradeSequence++

upgrade.Fields.Version = upgradeVersion

k.SetChannel(ctx, portID, channelID, channel)
k.SetUpgrade(ctx, portID, channelID, upgrade)

k.Logger(ctx).Info("channel state updated", "port-id", portID, "channel-id", channelID, "state", channel.State, "upgrade-sequence", fmt.Sprintf("%d", channel.UpgradeSequence))

emitChannelUpgradeInitEvent(ctx, portID, channelID, channel, upgrade)
return channel
return channel, upgrade
}

// ChanUpgradeTry is called by a module to accept the first step of a channel upgrade handshake initiated by
Expand All @@ -77,23 +78,23 @@ func (k Keeper) ChanUpgradeTry(
proofCounterpartyChannel,
proofCounterpartyUpgrade []byte,
proofHeight clienttypes.Height,
) (types.Upgrade, error) {
) (types.Channel, types.Upgrade, error) {
channel, found := k.GetChannel(ctx, portID, channelID)
if !found {
return types.Upgrade{}, errorsmod.Wrapf(types.ErrChannelNotFound, "port ID (%s) channel ID (%s)", portID, channelID)
return types.Channel{}, types.Upgrade{}, errorsmod.Wrapf(types.ErrChannelNotFound, "port ID (%s) channel ID (%s)", portID, channelID)
}

if !channel.IsOpen() {
return types.Upgrade{}, errorsmod.Wrapf(types.ErrInvalidChannelState, "expected %s, got %s", types.OPEN, channel.State)
return types.Channel{}, types.Upgrade{}, errorsmod.Wrapf(types.ErrInvalidChannelState, "expected %s, got %s", types.OPEN, channel.State)
}

connection, found := k.connectionKeeper.GetConnection(ctx, channel.ConnectionHops[0])
if !found {
return types.Upgrade{}, errorsmod.Wrap(connectiontypes.ErrConnectionNotFound, channel.ConnectionHops[0])
return types.Channel{}, types.Upgrade{}, errorsmod.Wrap(connectiontypes.ErrConnectionNotFound, channel.ConnectionHops[0])
}

if connection.GetState() != int32(connectiontypes.OPEN) {
return types.Upgrade{}, errorsmod.Wrapf(
return types.Channel{}, types.Upgrade{}, errorsmod.Wrapf(
connectiontypes.ErrInvalidConnectionState, "connection state is not OPEN (got %s)", connectiontypes.State(connection.GetState()).String(),
)
}
Expand Down Expand Up @@ -127,14 +128,14 @@ func (k Keeper) ChanUpgradeTry(
// NOTE: OnChanUpgradeInit will not be executed by the application
upgrade, err = k.ChanUpgradeInit(ctx, portID, channelID, proposedUpgradeFields)
if err != nil {
return types.Upgrade{}, errorsmod.Wrap(err, "failed to initialize upgrade")
return types.Channel{}, types.Upgrade{}, errorsmod.Wrap(err, "failed to initialize upgrade")
}

channel = k.WriteUpgradeInitChannel(ctx, portID, channelID, upgrade)
channel, upgrade = k.WriteUpgradeInitChannel(ctx, portID, channelID, upgrade, upgrade.Fields.Version)
}

if err := k.checkForUpgradeCompatibility(ctx, proposedUpgradeFields, counterpartyUpgradeFields); err != nil {
return types.Upgrade{}, errorsmod.Wrap(err, "failed upgrade compatibility check")
return types.Channel{}, types.Upgrade{}, errorsmod.Wrap(err, "failed upgrade compatibility check")
}

// construct expected counterparty channel from information in state
Expand All @@ -158,11 +159,11 @@ func (k Keeper) ChanUpgradeTry(
channel.Counterparty.ChannelId,
counterpartyChannel,
); err != nil {
return types.Upgrade{}, errorsmod.Wrap(err, "failed to verify counterparty channel state")
return types.Channel{}, types.Upgrade{}, errorsmod.Wrap(err, "failed to verify counterparty channel state")
}

if counterpartyUpgradeSequence < channel.UpgradeSequence {
return upgrade, types.NewUpgradeError(channel.UpgradeSequence-1, errorsmod.Wrapf(
return channel, upgrade, types.NewUpgradeError(channel.UpgradeSequence-1, errorsmod.Wrapf(
types.ErrInvalidUpgradeSequence, "counterparty upgrade sequence < current upgrade sequence (%d < %d)", counterpartyUpgradeSequence, channel.UpgradeSequence,
))
}
Expand All @@ -176,14 +177,14 @@ func (k Keeper) ChanUpgradeTry(
channel.Counterparty.ChannelId,
types.NewUpgrade(counterpartyUpgradeFields, types.Timeout{}),
); err != nil {
return types.Upgrade{}, errorsmod.Wrap(err, "failed to verify counterparty upgrade")
return types.Channel{}, types.Upgrade{}, errorsmod.Wrap(err, "failed to verify counterparty upgrade")
}

if err := k.startFlushing(ctx, portID, channelID, &upgrade); err != nil {
return types.Upgrade{}, err
return types.Channel{}, types.Upgrade{}, err
}

return upgrade, nil
return channel, upgrade, nil
}

// WriteUpgradeTryChannel writes the channel end and upgrade to state after successfully passing the UpgradeTry handshake step.
Expand All @@ -200,7 +201,6 @@ func (k Keeper) WriteUpgradeTryChannel(ctx sdk.Context, portID, channelID string
k.SetUpgrade(ctx, portID, channelID, upgrade)

k.Logger(ctx).Info("channel state updated", "port-id", portID, "channel-id", channelID, "previous-state", types.OPEN, "new-state", channel.State)
emitChannelUpgradeTryEvent(ctx, portID, channelID, channel, upgrade)

return channel, upgrade
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func (k Keeper) ChanUpgradeAck(
// WriteUpgradeAckChannel writes a channel which has successfully passed the UpgradeAck handshake step as well as
// setting the upgrade for that channel.
// An event is emitted for the handshake step.
func (k Keeper) WriteUpgradeAckChannel(ctx sdk.Context, portID, channelID string, counterpartyUpgrade types.Upgrade) {
func (k Keeper) WriteUpgradeAckChannel(ctx sdk.Context, portID, channelID string, counterpartyUpgrade types.Upgrade) (types.Channel, types.Upgrade) {
defer telemetry.IncrCounter(1, "ibc", "channel", "upgrade-ack")

channel, found := k.GetChannel(ctx, portID, channelID)
Expand All @@ -340,7 +340,7 @@ func (k Keeper) WriteUpgradeAckChannel(ctx sdk.Context, portID, channelID string
k.SetUpgrade(ctx, portID, channelID, upgrade)

k.Logger(ctx).Info("channel state updated", "port-id", portID, "channel-id", channelID, "state", channel.State.String())
emitChannelUpgradeAckEvent(ctx, portID, channelID, channel, upgrade)
return channel, upgrade
}

// ChanUpgradeConfirm is called on the chain which is on FLUSHING after chanUpgradeAck is called on the counterparty.
Expand Down Expand Up @@ -421,7 +421,7 @@ func (k Keeper) ChanUpgradeConfirm(
// If the channel has no in-flight packets, its state is updated to indicate that flushing has completed. Otherwise, the counterparty upgrade is set
// and the channel state is left unchanged.
// An event is emitted for the handshake step.
func (k Keeper) WriteUpgradeConfirmChannel(ctx sdk.Context, portID, channelID string, counterpartyUpgrade types.Upgrade) {
func (k Keeper) WriteUpgradeConfirmChannel(ctx sdk.Context, portID, channelID string, counterpartyUpgrade types.Upgrade) types.Channel {
defer telemetry.IncrCounter(1, "ibc", "channel", "upgrade-confirm")

channel, found := k.GetChannel(ctx, portID, channelID)
Expand All @@ -440,8 +440,7 @@ func (k Keeper) WriteUpgradeConfirmChannel(ctx sdk.Context, portID, channelID st
// this gets read when timing out and acknowledging packets.
k.SetCounterpartyUpgrade(ctx, portID, channelID, counterpartyUpgrade)
}

emitChannelUpgradeConfirmEvent(ctx, portID, channelID, channel)
return channel
}

// ChanUpgradeOpen is called by a module to complete the channel upgrade handshake and move the channel back to an OPEN state.
Expand Down Expand Up @@ -530,7 +529,7 @@ func (k Keeper) ChanUpgradeOpen(
// WriteUpgradeOpenChannel writes the agreed upon upgrade fields to the channel, and sets the channel state back to OPEN. This can be called in one of two cases:
// - In the UpgradeConfirm step of the handshake if both sides have already flushed all in-flight packets.
// - In the UpgradeOpen step of the handshake.
func (k Keeper) WriteUpgradeOpenChannel(ctx sdk.Context, portID, channelID string) {
func (k Keeper) WriteUpgradeOpenChannel(ctx sdk.Context, portID, channelID string) types.Channel {
channel, found := k.GetChannel(ctx, portID, channelID)
if !found {
panic(fmt.Errorf("could not find existing channel when updating channel state, channelID: %s, portID: %s", channelID, portID))
Expand All @@ -554,7 +553,7 @@ func (k Keeper) WriteUpgradeOpenChannel(ctx sdk.Context, portID, channelID strin
k.deleteUpgradeInfo(ctx, portID, channelID)

k.Logger(ctx).Info("channel state updated", "port-id", portID, "channel-id", channelID, "previous-state", previousState.String(), "new-state", types.OPEN.String())
emitChannelUpgradeOpenEvent(ctx, portID, channelID, channel)
return channel
}

// ChanUpgradeCancel is called by a module to cancel a channel upgrade that is in progress.
Expand Down Expand Up @@ -636,7 +635,7 @@ func (k Keeper) WriteUpgradeCancelChannel(ctx sdk.Context, portID, channelID str
channel = k.restoreChannel(ctx, portID, channelID, errorReceipt.Sequence, channel, types.NewUpgradeError(errorReceipt.Sequence, types.ErrInvalidUpgrade))

k.Logger(ctx).Info("channel state updated", "port-id", portID, "channel-id", channelID, "previous-state", previousState, "new-state", types.OPEN.String())
emitChannelUpgradeCancelEvent(ctx, portID, channelID, channel, upgrade)
EmitChannelUpgradeCancelEvent(ctx, portID, channelID, channel, upgrade)
}

// ChanUpgradeTimeout times out an outstanding upgrade.
Expand Down Expand Up @@ -738,7 +737,7 @@ func (k Keeper) ChanUpgradeTimeout(
func (k Keeper) WriteUpgradeTimeoutChannel(
ctx sdk.Context,
portID, channelID string,
) {
) (types.Channel, types.Upgrade) {
defer telemetry.IncrCounter(1, "ibc", "channel", "upgrade-timeout")

channel, found := k.GetChannel(ctx, portID, channelID)
Expand All @@ -754,7 +753,8 @@ func (k Keeper) WriteUpgradeTimeoutChannel(
channel = k.restoreChannel(ctx, portID, channelID, channel.UpgradeSequence, channel, types.NewUpgradeError(channel.UpgradeSequence, types.ErrUpgradeTimeout))

k.Logger(ctx).Info("channel state restored", "port-id", portID, "channel-id", channelID)
emitChannelUpgradeTimeoutEvent(ctx, portID, channelID, channel, upgrade)

return channel, upgrade
}

// startFlushing will set the upgrade last packet send and continue blocking the upgrade from continuing until all
Expand Down Expand Up @@ -940,18 +940,7 @@ func (k Keeper) restoreChannel(ctx sdk.Context, portID, channelID string, upgrad
// delete state associated with upgrade which is no longer required.
k.deleteUpgradeInfo(ctx, portID, channelID)

k.WriteErrorReceipt(ctx, portID, channelID, err)
k.SetUpgradeErrorReceipt(ctx, portID, channelID, err.GetErrorReceipt())

return channel
}

// WriteErrorReceipt will write an error receipt from the provided UpgradeError.
func (k Keeper) WriteErrorReceipt(ctx sdk.Context, portID, channelID string, upgradeError *types.UpgradeError) {
channel, found := k.GetChannel(ctx, portID, channelID)
if !found {
panic(errorsmod.Wrapf(types.ErrChannelNotFound, "port ID (%s) channel ID (%s)", portID, channelID))
}

k.SetUpgradeErrorReceipt(ctx, portID, channelID, upgradeError.GetErrorReceipt())
emitErrorReceiptEvent(ctx, portID, channelID, channel, upgradeError)
}
Loading

0 comments on commit 25c8247

Please sign in to comment.