Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move events to message server layer instead of core level #5264

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions modules/core/04-channel/keeper/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ func emitChannelClosedEvent(ctx sdk.Context, packet exported.PacketI, channel ty
})
}

// emitChannelUpgradeInitEvent emits a channel upgrade init event
func emitChannelUpgradeInitEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
// EmitChannelUpgradeInitEvent emits a channel upgrade init event
func EmitChannelUpgradeInitEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeInit,
Expand All @@ -292,8 +292,8 @@ func emitChannelUpgradeInitEvent(ctx sdk.Context, portID string, channelID strin
})
}

// emitChannelUpgradeTryEvent emits a channel upgrade try event
func emitChannelUpgradeTryEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
// EmitChannelUpgradeTryEvent emits a channel upgrade try event
func EmitChannelUpgradeTryEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeTry,
Expand All @@ -313,8 +313,8 @@ func emitChannelUpgradeTryEvent(ctx sdk.Context, portID string, channelID string
})
}

// emitChannelUpgradeAckEvent emits a channel upgrade ack event
func emitChannelUpgradeAckEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
// EmitChannelUpgradeAckEvent emits a channel upgrade ack event
func EmitChannelUpgradeAckEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeAck,
Expand All @@ -334,8 +334,8 @@ func emitChannelUpgradeAckEvent(ctx sdk.Context, portID string, channelID string
})
}

// emitChannelUpgradeConfirmEvent emits a channel upgrade confirm event
func emitChannelUpgradeConfirmEvent(ctx sdk.Context, portID, channelID string, currentChannel types.Channel) {
// EmitChannelUpgradeConfirmEvent emits a channel upgrade confirm event
func EmitChannelUpgradeConfirmEvent(ctx sdk.Context, portID, channelID string, currentChannel types.Channel) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeConfirm,
Expand All @@ -353,8 +353,8 @@ func emitChannelUpgradeConfirmEvent(ctx sdk.Context, portID, channelID string, c
})
}

// emitChannelUpgradeOpenEvent emits a channel upgrade open event
func emitChannelUpgradeOpenEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel) {
// EmitChannelUpgradeOpenEvent emits a channel upgrade open event
func EmitChannelUpgradeOpenEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeOpen,
Expand All @@ -375,8 +375,8 @@ func emitChannelUpgradeOpenEvent(ctx sdk.Context, portID string, channelID strin
})
}

// emitChannelUpgradeTimeoutEvent emits an upgrade timeout event.
func emitChannelUpgradeTimeoutEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
// EmitChannelUpgradeTimeoutEvent emits an upgrade timeout event.
func EmitChannelUpgradeTimeoutEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeTimeout,
Expand All @@ -397,8 +397,8 @@ func emitChannelUpgradeTimeoutEvent(ctx sdk.Context, portID string, channelID st
})
}

// emitErrorReceiptEvent emits an error receipt event
func emitErrorReceiptEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, err error) {
// EmitErrorReceiptEvent emits an error receipt event
func EmitErrorReceiptEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, err error) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeError,
Expand All @@ -416,8 +416,8 @@ func emitErrorReceiptEvent(ctx sdk.Context, portID string, channelID string, cur
})
}

// emitChannelUpgradeCancelEvent emits an upgraded cancelled event.
func emitChannelUpgradeCancelEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
// EmitChannelUpgradeCancelEvent emits an upgraded cancelled event.
func EmitChannelUpgradeCancelEvent(ctx sdk.Context, portID string, channelID string, currentChannel types.Channel, upgrade types.Upgrade) {
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(
types.EventTypeChannelUpgradeCancel,
Expand Down
67 changes: 28 additions & 39 deletions modules/core/04-channel/keeper/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (k Keeper) ChanUpgradeInit(

// WriteUpgradeInitChannel writes a channel which has successfully passed the UpgradeInit handshake step.
// An event is emitted for the handshake step.
func (k Keeper) WriteUpgradeInitChannel(ctx sdk.Context, portID, channelID string, upgrade types.Upgrade) types.Channel {
func (k Keeper) WriteUpgradeInitChannel(ctx sdk.Context, portID, channelID string, upgrade types.Upgrade, upgradeVersion string) (types.Channel, types.Upgrade) {
defer telemetry.IncrCounter(1, "ibc", "channel", "upgrade-init")

channel, found := k.GetChannel(ctx, portID, channelID)
Expand All @@ -56,13 +56,14 @@ func (k Keeper) WriteUpgradeInitChannel(ctx sdk.Context, portID, channelID strin

channel.UpgradeSequence++

upgrade.Fields.Version = upgradeVersion

k.SetChannel(ctx, portID, channelID, channel)
k.SetUpgrade(ctx, portID, channelID, upgrade)

k.Logger(ctx).Info("channel state updated", "port-id", portID, "channel-id", channelID, "state", channel.State, "upgrade-sequence", fmt.Sprintf("%d", channel.UpgradeSequence))

emitChannelUpgradeInitEvent(ctx, portID, channelID, channel, upgrade)
return channel
return channel, upgrade
}

// ChanUpgradeTry is called by a module to accept the first step of a channel upgrade handshake initiated by
Expand All @@ -77,23 +78,23 @@ func (k Keeper) ChanUpgradeTry(
proofCounterpartyChannel,
proofCounterpartyUpgrade []byte,
proofHeight clienttypes.Height,
) (types.Upgrade, error) {
) (types.Channel, types.Upgrade, error) {
channel, found := k.GetChannel(ctx, portID, channelID)
if !found {
return types.Upgrade{}, errorsmod.Wrapf(types.ErrChannelNotFound, "port ID (%s) channel ID (%s)", portID, channelID)
return types.Channel{}, types.Upgrade{}, errorsmod.Wrapf(types.ErrChannelNotFound, "port ID (%s) channel ID (%s)", portID, channelID)
}

if !channel.IsOpen() {
return types.Upgrade{}, errorsmod.Wrapf(types.ErrInvalidChannelState, "expected %s, got %s", types.OPEN, channel.State)
return types.Channel{}, types.Upgrade{}, errorsmod.Wrapf(types.ErrInvalidChannelState, "expected %s, got %s", types.OPEN, channel.State)
}

connection, found := k.connectionKeeper.GetConnection(ctx, channel.ConnectionHops[0])
if !found {
return types.Upgrade{}, errorsmod.Wrap(connectiontypes.ErrConnectionNotFound, channel.ConnectionHops[0])
return types.Channel{}, types.Upgrade{}, errorsmod.Wrap(connectiontypes.ErrConnectionNotFound, channel.ConnectionHops[0])
}

if connection.GetState() != int32(connectiontypes.OPEN) {
return types.Upgrade{}, errorsmod.Wrapf(
return types.Channel{}, types.Upgrade{}, errorsmod.Wrapf(
connectiontypes.ErrInvalidConnectionState, "connection state is not OPEN (got %s)", connectiontypes.State(connection.GetState()).String(),
)
}
Expand Down Expand Up @@ -127,14 +128,14 @@ func (k Keeper) ChanUpgradeTry(
// NOTE: OnChanUpgradeInit will not be executed by the application
upgrade, err = k.ChanUpgradeInit(ctx, portID, channelID, proposedUpgradeFields)
if err != nil {
return types.Upgrade{}, errorsmod.Wrap(err, "failed to initialize upgrade")
return types.Channel{}, types.Upgrade{}, errorsmod.Wrap(err, "failed to initialize upgrade")
}

channel = k.WriteUpgradeInitChannel(ctx, portID, channelID, upgrade)
channel, upgrade = k.WriteUpgradeInitChannel(ctx, portID, channelID, upgrade, upgrade.Fields.Version)
}

if err := k.checkForUpgradeCompatibility(ctx, proposedUpgradeFields, counterpartyUpgradeFields); err != nil {
return types.Upgrade{}, errorsmod.Wrap(err, "failed upgrade compatibility check")
return types.Channel{}, types.Upgrade{}, errorsmod.Wrap(err, "failed upgrade compatibility check")
}

// construct expected counterparty channel from information in state
Expand All @@ -158,11 +159,11 @@ func (k Keeper) ChanUpgradeTry(
channel.Counterparty.ChannelId,
counterpartyChannel,
); err != nil {
return types.Upgrade{}, errorsmod.Wrap(err, "failed to verify counterparty channel state")
return types.Channel{}, types.Upgrade{}, errorsmod.Wrap(err, "failed to verify counterparty channel state")
}

if counterpartyUpgradeSequence < channel.UpgradeSequence {
return upgrade, types.NewUpgradeError(channel.UpgradeSequence-1, errorsmod.Wrapf(
return channel, upgrade, types.NewUpgradeError(channel.UpgradeSequence-1, errorsmod.Wrapf(
types.ErrInvalidUpgradeSequence, "counterparty upgrade sequence < current upgrade sequence (%d < %d)", counterpartyUpgradeSequence, channel.UpgradeSequence,
))
}
Expand All @@ -176,14 +177,14 @@ func (k Keeper) ChanUpgradeTry(
channel.Counterparty.ChannelId,
types.NewUpgrade(counterpartyUpgradeFields, types.Timeout{}),
); err != nil {
return types.Upgrade{}, errorsmod.Wrap(err, "failed to verify counterparty upgrade")
return types.Channel{}, types.Upgrade{}, errorsmod.Wrap(err, "failed to verify counterparty upgrade")
}

if err := k.startFlushing(ctx, portID, channelID, &upgrade); err != nil {
return types.Upgrade{}, err
return types.Channel{}, types.Upgrade{}, err
}

return upgrade, nil
return channel, upgrade, nil
}

// WriteUpgradeTryChannel writes the channel end and upgrade to state after successfully passing the UpgradeTry handshake step.
Expand All @@ -200,7 +201,6 @@ func (k Keeper) WriteUpgradeTryChannel(ctx sdk.Context, portID, channelID string
k.SetUpgrade(ctx, portID, channelID, upgrade)

k.Logger(ctx).Info("channel state updated", "port-id", portID, "channel-id", channelID, "previous-state", types.OPEN, "new-state", channel.State)
emitChannelUpgradeTryEvent(ctx, portID, channelID, channel, upgrade)

return channel, upgrade
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func (k Keeper) ChanUpgradeAck(
// WriteUpgradeAckChannel writes a channel which has successfully passed the UpgradeAck handshake step as well as
// setting the upgrade for that channel.
// An event is emitted for the handshake step.
func (k Keeper) WriteUpgradeAckChannel(ctx sdk.Context, portID, channelID string, counterpartyUpgrade types.Upgrade) {
func (k Keeper) WriteUpgradeAckChannel(ctx sdk.Context, portID, channelID string, counterpartyUpgrade types.Upgrade) (types.Channel, types.Upgrade) {
defer telemetry.IncrCounter(1, "ibc", "channel", "upgrade-ack")

channel, found := k.GetChannel(ctx, portID, channelID)
Expand All @@ -340,7 +340,7 @@ func (k Keeper) WriteUpgradeAckChannel(ctx sdk.Context, portID, channelID string
k.SetUpgrade(ctx, portID, channelID, upgrade)

k.Logger(ctx).Info("channel state updated", "port-id", portID, "channel-id", channelID, "state", channel.State.String())
emitChannelUpgradeAckEvent(ctx, portID, channelID, channel, upgrade)
return channel, upgrade
}

// ChanUpgradeConfirm is called on the chain which is on FLUSHING after chanUpgradeAck is called on the counterparty.
Expand Down Expand Up @@ -421,7 +421,7 @@ func (k Keeper) ChanUpgradeConfirm(
// If the channel has no in-flight packets, its state is updated to indicate that flushing has completed. Otherwise, the counterparty upgrade is set
// and the channel state is left unchanged.
// An event is emitted for the handshake step.
func (k Keeper) WriteUpgradeConfirmChannel(ctx sdk.Context, portID, channelID string, counterpartyUpgrade types.Upgrade) {
func (k Keeper) WriteUpgradeConfirmChannel(ctx sdk.Context, portID, channelID string, counterpartyUpgrade types.Upgrade) types.Channel {
defer telemetry.IncrCounter(1, "ibc", "channel", "upgrade-confirm")

channel, found := k.GetChannel(ctx, portID, channelID)
Expand All @@ -440,8 +440,7 @@ func (k Keeper) WriteUpgradeConfirmChannel(ctx sdk.Context, portID, channelID st
// this gets read when timing out and acknowledging packets.
k.SetCounterpartyUpgrade(ctx, portID, channelID, counterpartyUpgrade)
}

emitChannelUpgradeConfirmEvent(ctx, portID, channelID, channel)
return channel
}

// ChanUpgradeOpen is called by a module to complete the channel upgrade handshake and move the channel back to an OPEN state.
Expand Down Expand Up @@ -530,7 +529,7 @@ func (k Keeper) ChanUpgradeOpen(
// WriteUpgradeOpenChannel writes the agreed upon upgrade fields to the channel, and sets the channel state back to OPEN. This can be called in one of two cases:
// - In the UpgradeConfirm step of the handshake if both sides have already flushed all in-flight packets.
// - In the UpgradeOpen step of the handshake.
func (k Keeper) WriteUpgradeOpenChannel(ctx sdk.Context, portID, channelID string) {
func (k Keeper) WriteUpgradeOpenChannel(ctx sdk.Context, portID, channelID string) types.Channel {
channel, found := k.GetChannel(ctx, portID, channelID)
if !found {
panic(fmt.Errorf("could not find existing channel when updating channel state, channelID: %s, portID: %s", channelID, portID))
Expand All @@ -554,7 +553,7 @@ func (k Keeper) WriteUpgradeOpenChannel(ctx sdk.Context, portID, channelID strin
k.deleteUpgradeInfo(ctx, portID, channelID)

k.Logger(ctx).Info("channel state updated", "port-id", portID, "channel-id", channelID, "previous-state", previousState.String(), "new-state", types.OPEN.String())
emitChannelUpgradeOpenEvent(ctx, portID, channelID, channel)
return channel
}

// ChanUpgradeCancel is called by a module to cancel a channel upgrade that is in progress.
Expand Down Expand Up @@ -636,7 +635,7 @@ func (k Keeper) WriteUpgradeCancelChannel(ctx sdk.Context, portID, channelID str
channel = k.restoreChannel(ctx, portID, channelID, errorReceipt.Sequence, channel, types.NewUpgradeError(errorReceipt.Sequence, types.ErrInvalidUpgrade))

k.Logger(ctx).Info("channel state updated", "port-id", portID, "channel-id", channelID, "previous-state", previousState, "new-state", types.OPEN.String())
emitChannelUpgradeCancelEvent(ctx, portID, channelID, channel, upgrade)
EmitChannelUpgradeCancelEvent(ctx, portID, channelID, channel, upgrade)
}

// ChanUpgradeTimeout times out an outstanding upgrade.
Expand Down Expand Up @@ -738,7 +737,7 @@ func (k Keeper) ChanUpgradeTimeout(
func (k Keeper) WriteUpgradeTimeoutChannel(
ctx sdk.Context,
portID, channelID string,
) {
) (types.Channel, types.Upgrade) {
defer telemetry.IncrCounter(1, "ibc", "channel", "upgrade-timeout")

channel, found := k.GetChannel(ctx, portID, channelID)
Expand All @@ -754,7 +753,8 @@ func (k Keeper) WriteUpgradeTimeoutChannel(
channel = k.restoreChannel(ctx, portID, channelID, channel.UpgradeSequence, channel, types.NewUpgradeError(channel.UpgradeSequence, types.ErrUpgradeTimeout))

k.Logger(ctx).Info("channel state restored", "port-id", portID, "channel-id", channelID)
emitChannelUpgradeTimeoutEvent(ctx, portID, channelID, channel, upgrade)

return channel, upgrade
}

// startFlushing will set the upgrade last packet send and continue blocking the upgrade from continuing until all
Expand Down Expand Up @@ -940,18 +940,7 @@ func (k Keeper) restoreChannel(ctx sdk.Context, portID, channelID string, upgrad
// delete state associated with upgrade which is no longer required.
k.deleteUpgradeInfo(ctx, portID, channelID)

k.WriteErrorReceipt(ctx, portID, channelID, err)
k.SetUpgradeErrorReceipt(ctx, portID, channelID, err.GetErrorReceipt())

return channel
}

// WriteErrorReceipt will write an error receipt from the provided UpgradeError.
func (k Keeper) WriteErrorReceipt(ctx sdk.Context, portID, channelID string, upgradeError *types.UpgradeError) {
channel, found := k.GetChannel(ctx, portID, channelID)
if !found {
panic(errorsmod.Wrapf(types.ErrChannelNotFound, "port ID (%s) channel ID (%s)", portID, channelID))
}

k.SetUpgradeErrorReceipt(ctx, portID, channelID, upgradeError.GetErrorReceipt())
emitErrorReceiptEvent(ctx, portID, channelID, channel, upgradeError)
}
Loading
Loading