Skip to content

Commit

Permalink
Merge PR #663: Properly handle timeouts and packet sequence numbers o…
Browse files Browse the repository at this point in the history
…n `ORDERED` channels

* bump gaia version to v7.0.0

* add function for returning an order from a string

* add tests for `OrderFromString()` and `StringFromOrder()`

* add case for timing out packets on ordered channels

* WIP: test interchain accounts support

* only attempt to relay packets when there are open channels

* increase wait time for packets to be relayed in timeout test

* use block time instead of consensus state time when checking timestamp timeout

* correctly set the timestamp timeout in `SendTransferMsg()`

* remove msg for relaying a single packet

* handle packet sequences correctly for `ORDERED` channels

* refactor how we handle packet sequence numbers on `ORDERED` channels

* remove code for finishing the channel handshake on `ORDERED` channels for now.

* condense make command for running gaia integration tests

* invert if statement to improve readability

* remove redundant comment

* replace switch statements with successive if statements
  • Loading branch information
jtieri authored May 24, 2022
1 parent 060558d commit e0a2858
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 242 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ test-integration:
@go test -mod=readonly -v -timeout 20m ./_test/

test-gaia:
@go test -mod=readonly -v -run TestGaiaToGaiaRelaying ./_test/
@go test -mod=readonly -v -run TestRelayAllChannelsOnConnection ./_test/
@go test -mod=readonly -v -run 'TestGaiaToGaiaRelaying|TestGaiaToGaiaRelaying|TestUnorderedChannelBlockHeightTimeout|TestUnorderedChannelTimestampTimeout' ./_test

test-akash:
@go test -mod=readonly -v -run TestAkashToGaiaRelaying ./_test/
Expand Down
2 changes: 1 addition & 1 deletion _test/relayer_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func TestUnorderedChannelTimestampTimeout(t *testing.T) {
filter := relayer.ChannelFilter{}
_ = relayer.StartRelayer(ctx, log, src, dst, filter, 2*cmd.MB, 5)

time.Sleep(time.Second * 5)
time.Sleep(time.Second * 10)

// check balance on src against expected
srcGot, err := src.ChainProvider.QueryBalance(ctx, src.ChainProvider.Key())
Expand Down
51 changes: 0 additions & 51 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -34,7 +33,6 @@ Most of these commands take a [path] argument. Make sure:
linkCmd(a),
linkThenStartCmd(a),
relayMsgsCmd(a),
relayMsgCmd(a),
relayAcksCmd(a),
xfersend(a),
lineBreakCommand(),
Expand Down Expand Up @@ -697,55 +695,6 @@ $ %s tx link-then-start demo-path --timeout 5s`, appName, appName)),
return overrideFlag(a.Viper, channelParameterFlags(a.Viper, clientParameterFlags(a.Viper, strategyFlag(a.Viper, retryFlag(a.Viper, timeoutFlag(a.Viper, cmd))))))
}

func relayMsgCmd(a *appState) *cobra.Command {
cmd := &cobra.Command{
Use: "relay-packet path_name src_channel_id seq_num",
Aliases: []string{"relay-pkt"},
Short: "relay a non-relayed packet with a specific sequence number, in both directions",
Args: withUsage(cobra.ExactArgs(3)),
Example: strings.TrimSpace(fmt.Sprintf(`
$ %s transact relay-packet demo-path channel-1 1
$ %s tx relay-pkt demo-path channel-1 1`,
appName, appName,
)),
RunE: func(cmd *cobra.Command, args []string) error {
c, src, dst, err := a.Config.ChainsFromPath(args[0])
if err != nil {
return err
}

if err = ensureKeysExist(c); err != nil {
return err
}

maxTxSize, maxMsgLength, err := GetStartOptions(cmd)
if err != nil {
return err
}

seqNum, err := strconv.Atoi(args[2])
if err != nil {
return err
}

channelID := args[1]
channel, err := relayer.QueryChannel(cmd.Context(), c[src], channelID)
if err != nil {
return err
}

sp, err := relayer.UnrelayedSequences(cmd.Context(), c[src], c[dst], channel)
if err != nil {
return err
}

return relayer.RelayPacket(cmd.Context(), a.Log, c[src], c[dst], sp, maxTxSize, maxMsgLength, uint64(seqNum), channel)
},
}

return strategyFlag(a.Viper, cmd)
}

func relayMsgsCmd(a *appState) *cobra.Command {
cmd := &cobra.Command{
Use: "relay-packets path_name src_channel_id",
Expand Down
227 changes: 67 additions & 160 deletions relayer/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,16 @@ func UnrelayedSequences(ctx context.Context, src, dst *Chain, srcChannel *chanty
return nil, err
}

var (
srcUnreceivedPackets, dstUnreceivedPackets []uint64
)

eg, egCtx = errgroup.WithContext(ctx) // Re-set eg and egCtx after previous Wait.
eg.Go(func() error {
// Query all packets sent by src that have been received by dst
// Query all packets sent by src that have not been received by dst.
return retry.Do(func() error {
var err error
rs.Src, err = dst.ChainProvider.QueryUnreceivedPackets(egCtx, uint64(dsth), srcChannel.Counterparty.ChannelId, srcChannel.Counterparty.PortId, srcPacketSeq)
srcUnreceivedPackets, err = dst.ChainProvider.QueryUnreceivedPackets(egCtx, uint64(dsth), srcChannel.Counterparty.ChannelId, srcChannel.Counterparty.PortId, srcPacketSeq)
return err
}, retry.Context(ctx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
dst.log.Info(
Expand All @@ -119,10 +123,10 @@ func UnrelayedSequences(ctx context.Context, src, dst *Chain, srcChannel *chanty
})

eg.Go(func() error {
// Query all packets sent by dst that have been received by src
// Query all packets sent by dst that have not been received by src.
return retry.Do(func() error {
var err error
rs.Dst, err = src.ChainProvider.QueryUnreceivedPackets(egCtx, uint64(srch), srcChannel.ChannelId, srcChannel.PortId, dstPacketSeq)
dstUnreceivedPackets, err = src.ChainProvider.QueryUnreceivedPackets(egCtx, uint64(srch), srcChannel.ChannelId, srcChannel.PortId, dstPacketSeq)
return err
}, retry.Context(ctx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
src.log.Info(
Expand All @@ -140,6 +144,43 @@ func UnrelayedSequences(ctx context.Context, src, dst *Chain, srcChannel *chanty
return nil, err
}

// If this is an UNORDERED channel we can return at this point.
if srcChannel.Ordering != chantypes.ORDERED {
rs.Src = srcUnreceivedPackets
rs.Dst = dstUnreceivedPackets
return rs, nil
}

// For ordered channels we want to only relay the packet whose sequence number is equal to
// the expected next packet receive sequence from the counterparty.
if len(srcUnreceivedPackets) > 0 {
nextSeqResp, err := dst.ChainProvider.QueryNextSeqRecv(ctx, dsth, srcChannel.Counterparty.ChannelId, srcChannel.Counterparty.PortId)
if err != nil {
return nil, err
}

for _, seq := range srcUnreceivedPackets {
if seq == nextSeqResp.NextSequenceReceive {
rs.Src = append(rs.Src, seq)
break
}
}
}

if len(dstUnreceivedPackets) > 0 {
nextSeqResp, err := src.ChainProvider.QueryNextSeqRecv(ctx, srch, srcChannel.ChannelId, srcChannel.PortId)
if err != nil {
return nil, err
}

for _, seq := range dstUnreceivedPackets {
if seq == nextSeqResp.NextSequenceReceive {
rs.Dst = append(rs.Dst, seq)
break
}
}
}

return rs, nil
}

Expand Down Expand Up @@ -404,12 +445,14 @@ func RelayPackets(ctx context.Context, log *zap.Logger, src, dst *Chain, sp *Rel
eg, egCtx := errgroup.WithContext(ctx)
// add messages for sequences on src
eg.Go(func() error {
return AddMessagesForSequences(egCtx, sp.Src, src, dst, srch, dsth, &msgs.Src, &msgs.Dst, srcChannel.ChannelId, srcChannel.PortId, srcChannel.Counterparty.ChannelId, srcChannel.Counterparty.PortId)
return AddMessagesForSequences(egCtx, sp.Src, src, dst, srch, dsth, &msgs.Src, &msgs.Dst,
srcChannel.ChannelId, srcChannel.PortId, srcChannel.Counterparty.ChannelId, srcChannel.Counterparty.PortId, srcChannel.Ordering)
})

// add messages for sequences on dst
eg.Go(func() error {
return AddMessagesForSequences(egCtx, sp.Dst, dst, src, dsth, srch, &msgs.Dst, &msgs.Src, srcChannel.Counterparty.ChannelId, srcChannel.Counterparty.PortId, srcChannel.ChannelId, srcChannel.PortId)
return AddMessagesForSequences(egCtx, sp.Dst, dst, src, dsth, srch, &msgs.Dst, &msgs.Src,
srcChannel.Counterparty.ChannelId, srcChannel.Counterparty.PortId, srcChannel.ChannelId, srcChannel.PortId, srcChannel.Ordering)
})

if err = eg.Wait(); err != nil {
Expand Down Expand Up @@ -471,15 +514,25 @@ func RelayPackets(ctx context.Context, log *zap.Logger, src, dst *Chain, sp *Rel

// AddMessagesForSequences constructs RecvMsgs and TimeoutMsgs from sequence numbers on a src chain
// and adds them to the appropriate queue of msgs for both src and dst
func AddMessagesForSequences(ctx context.Context, sequences []uint64, src, dst *Chain, srch, dsth int64, srcMsgs, dstMsgs *[]provider.RelayerMessage, srcChanID, srcPortID, dstChanID, dstPortID string) error {
func AddMessagesForSequences(
ctx context.Context,
sequences []uint64,
src, dst *Chain,
srch, dsth int64,
srcMsgs, dstMsgs *[]provider.RelayerMessage,
srcChanID, srcPortID, dstChanID, dstPortID string,
order chantypes.Order,
) error {
for _, seq := range sequences {
var (
recvMsg, timeoutMsg provider.RelayerMessage
err error
recvMsg, timeoutMsg, err := src.ChainProvider.RelayPacketFromSequence(
ctx,
src.ChainProvider, dst.ChainProvider,
uint64(srch), uint64(dsth),
seq,
dstChanID, dstPortID, dst.ClientID(),
srcChanID, srcPortID, src.ClientID(),
order,
)

recvMsg, timeoutMsg, err = src.ChainProvider.RelayPacketFromSequence(ctx, src.ChainProvider, dst.ChainProvider,
uint64(srch), uint64(dsth), seq, dstChanID, dstPortID, dst.ClientID(), srcChanID, srcPortID, src.ClientID())
if err != nil {
src.log.Info(
"Failed to relay packet from sequence",
Expand All @@ -489,6 +542,7 @@ func AddMessagesForSequences(ctx context.Context, sequences []uint64, src, dst *
zap.String("dst_chain_id", dst.ChainID()),
zap.String("dst_channel_id", dstChanID),
zap.String("dst_port_id", dstPortID),
zap.String("channel_order", order.String()),
zap.Error(err),
)
return err
Expand Down Expand Up @@ -556,150 +610,3 @@ func PrependUpdateClientMsg(ctx context.Context, msgs *[]provider.RelayerMessage

return nil
}

// RelayPacket creates transactions to relay packets from src to dst and from dst to src
func RelayPacket(ctx context.Context, log *zap.Logger, src, dst *Chain, sp *RelaySequences, maxTxSize, maxMsgLength, seqNum uint64, srcChannel *chantypes.IdentifiedChannel) error {
// set the maximum relay transaction constraints
msgs := &RelayMsgs{
Src: []provider.RelayerMessage{},
Dst: []provider.RelayerMessage{},
MaxTxSize: maxTxSize,
MaxMsgLength: maxMsgLength,
}

srch, dsth, err := QueryLatestHeights(ctx, src, dst)
if err != nil {
return err
}

srcChanID := srcChannel.ChannelId
srcPortID := srcChannel.PortId
dstChanID := srcChannel.Counterparty.ChannelId
dstPortID := srcChannel.Counterparty.PortId

// add messages for sequences on src
for _, seq := range sp.Src {
if seq == seqNum {
// Query src for the sequence number to get type of packet
var recvMsg, timeoutMsg provider.RelayerMessage
if err = retry.Do(func() error {
recvMsg, timeoutMsg, err = src.ChainProvider.RelayPacketFromSequence(ctx, src.ChainProvider, dst.ChainProvider, uint64(srch), uint64(dsth), seq, dstChanID, dstPortID, dst.ClientID(), srcChanID, srcPortID, src.ClientID())
if err != nil {
log.Warn(
"Failed to relay packet from seq on src",
zap.String("src_chain_id", src.ChainID()),
zap.String("dst_chain_id", dst.ChainID()),
zap.Error(err),
)
}
return err
}, retry.Context(ctx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
srch, dsth, _ = QueryLatestHeights(ctx, src, dst)
})); err != nil {
return err
}

// depending on the type of message to be relayed, we need to
// send to different chains
if recvMsg != nil {
msgs.Dst = append(msgs.Dst, recvMsg)
}

if timeoutMsg != nil {
msgs.Src = append(msgs.Src, timeoutMsg)
}
}
}

// add messages for sequences on dst
for _, seq := range sp.Dst {
if seq == seqNum {
// Query dst for the sequence number to get type of packet
var recvMsg, timeoutMsg provider.RelayerMessage
if err = retry.Do(func() error {
recvMsg, timeoutMsg, err = dst.ChainProvider.RelayPacketFromSequence(ctx, dst.ChainProvider, src.ChainProvider, uint64(dsth), uint64(srch), seq, srcChanID, srcPortID, src.ClientID(), dstChanID, dstPortID, dst.ClientID())
if err != nil {
log.Warn("Failed to relay packet from seq on dst", zap.Error(err))
}
return nil
}, retry.Context(ctx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
srch, dsth, _ = QueryLatestHeights(ctx, src, dst)
})); err != nil {
return err
}

// depending on the type of message to be relayed, we need to
// send to different chains
if recvMsg != nil {
msgs.Src = append(msgs.Src, recvMsg)
}

if timeoutMsg != nil {
msgs.Dst = append(msgs.Dst, timeoutMsg)
}
}
}

if !msgs.Ready() {
log.Info(
"No packets to relay",
zap.String("src_chain_id", src.ChainID()),
zap.String("src_port_id", srcPortID),
zap.String("dst_chain_id", dst.ChainID()),
zap.String("dst_port_id", dstPortID),
)
return nil
}

// Prepend non-empty msg lists with UpdateClient
if len(msgs.Dst) != 0 {
srcHeader, err := src.ChainProvider.GetIBCUpdateHeader(ctx, srch, dst.ChainProvider, dst.ClientID())
if err != nil {
return err
}
updateMsg, err := dst.ChainProvider.UpdateClient(dst.ClientID(), srcHeader)
if err != nil {
return err
}

msgs.Dst = append([]provider.RelayerMessage{updateMsg}, msgs.Dst...)
}

if len(msgs.Src) != 0 {
dstHeader, err := dst.ChainProvider.GetIBCUpdateHeader(ctx, dsth, src.ChainProvider, src.ClientID())
if err != nil {
return err
}
updateMsg, err := src.ChainProvider.UpdateClient(src.ClientID(), dstHeader)
if err != nil {
return err
}

msgs.Src = append([]provider.RelayerMessage{updateMsg}, msgs.Src...)
}

// send messages to their respective chains
result := msgs.Send(ctx, log, AsRelayMsgSender(src), AsRelayMsgSender(dst))
if err := result.Error(); err != nil {
if result.PartiallySent() {
log.Info(
"Partial success when relaying packet",
zap.String("src_chain_id", src.ChainID()),
zap.String("src_port_id", srcPortID),
zap.String("dst_chain_id", dst.ChainID()),
zap.String("dst_port_id", dstPortID),
zap.Error(err),
)
}
return err
}

if result.SuccessfulSrcBatches > 0 {
src.logPacketsRelayed(dst, result.SuccessfulSrcBatches, srcChannel)
}
if result.SuccessfulDstBatches > 0 {
dst.logPacketsRelayed(src, result.SuccessfulDstBatches, srcChannel)
}

return nil
}
12 changes: 12 additions & 0 deletions relayer/pathEnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ func OrderFromString(order string) chantypes.Order {
}
}

// StringFromOrder returns the string representation of a channel order.
func StringFromOrder(order chantypes.Order) string {
switch order {
case chantypes.UNORDERED:
return "unordered"
case chantypes.ORDERED:
return "ordered"
default:
return ""
}
}

var marshalledChains = map[PathEnd]*Chain{}

// MarshalChain is PathEnd
Expand Down
Loading

0 comments on commit e0a2858

Please sign in to comment.