Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor loop to event based processing #743

Closed
wants to merge 10 commits into from
Closed

Conversation

agouin
Copy link
Member

@agouin agouin commented May 13, 2022

Refactors main query loop to a new event-based processing method based on processing loops for the involved chains.

For the cosmos specific implementation:
Drastically reduces query load on the RPC nodes. Now the query loop is slimmed down to (per chain):

  • Query Latest Chain Height. For Each New Block:
  • Make 2 queries to get all necessary information - block transactions and tendermint light block to get validator set

Now no other queries are necessary for determining whether packets need to be relayed, and also for constructing the MsgUpdateClient. For constructing packet relay messages (MsgRecvPacket, MsgAcknowledgement, MsgTimeout, MsgTimeoutOnClose), just need one additional lightweight query per packet to get the tendermint proof from the counterparty chain.

High level constructs:

ChainProcessor (interface)

One will be run per chain involved in the desired paths. Specific implementation should be provided for different chain types. Responsibilities of the ChainProcessor implementation:

  • poll the chain blocks as they are produced
  • parse the IBC messages from the transactions in the blocks
  • call the handlers on the PathProcessors where applicable
  • methods implemented for constructing the different IBC messages
  • keep track of channel state, do not send messages to PathProcessors for closed channels

CosmosChainProcessor implementation:

caches things as blocks are polled to prevent additional queries when relaying:

  • latest block height and timestamp
  • recent validator sets and latest client heights, used for constructing MsgUpdateClient messages for counterparty chain
  • channel open state

PathProcessor (struct)

  • Kept generic at the IBC level, applies to different chain implementations (do not need a different implementation per chain type)
  • One will be constructed per path
  • Has pathEnd1 and pathEnd2, linking up to the ChainProcessor for each PathEnd.
  • Each PathEndRuntime maintains the state of IBC packets for the given side of the path.
  • Packet sequence history will be removed from the state as soon as the packet flow is complete.
  • Responsible for determining which IBC messages need to be sent based on current state (MsgRecvPacket, MsgAcknowledgement, MsgTimeout, MsgTimeoutOnClose)
  • sends those messages if both chains are InSync (query loop is up to date with the chain)
  • Will construct MsgUpdateClient to prepend all non-empty message sends.
  • If there are failed message sends, will schedule a retry.

The ChainProcessor currently holds reference to the ChainProvider. The ChainProvider can probably now be slimmed down quite a bit, but the methods are still quite useful for the CLI methods, so will need to determine whether that makes sense going through the ChainProcessor or not. Consolidation can definitely be improved between the ChainProcessor and ChainProvider to DRY things up.

Other stuff:

  • Removes 45 second timeout for relay packets and acks cycle, replaces with 10 second timeout per batch of messages on send.
  • Fixes osmosis test

Resolves #721
Resolves #726

@agouin agouin force-pushed the andrew/event_processor branch from 419146d to fd66519 Compare May 23, 2022 13:34
@agouin agouin changed the title WIP refactor loop to event based processing Refactor loop to event based processing May 23, 2022
@agouin agouin marked this pull request as ready for review May 23, 2022 14:57
@agouin agouin requested a review from DavidNix May 23, 2022 14:57

ENV PACKAGES curl make git libc-dev bash gcc linux-headers eudev-dev

RUN apk add --no-cache $PACKAGES

WORKDIR /go/src/github.com/osmosis-labs

RUN wget -O /lib/libwasmvm_muslc.a https://github.com/CosmWasm/wasmvm/releases/download/v1.0.0/libwasmvm_muslc.$(uname -m).a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh good catch on these.

@jtieri
Copy link
Member

jtieri commented May 23, 2022

Gonna be reading through this on/off today while I play catch up a bit!

I did run into a couple issues running the integration tests locally, I tried make install && make test-gaia but ran into a nil pointer exception after the handshake and the MsgTransfers are sent on both the src and dst chains'.

logger.go:130: 2022-05-23T11:01:05.676-0500 INFO    entering main query loop        {"chainID": "ibc-0"}
logger.go:130: 2022-05-23T11:01:05.677-0500 INFO    entering main query loop        {"chainID": "ibc-1"}
logger.go:130: 2022-05-23T11:01:05.679-0500 WARN    chain is not yet in sync        {"chainID": "ibc-0", "latestQueriedBlock": 0, "latestHeight": 84}
logger.go:130: 2022-05-23T11:01:05.679-0500 DEBUG   queried latest height   {"chainID": "ibc-0", "latestHeight": 84}
logger.go:130: 2022-05-23T11:01:05.681-0500 WARN    chain is not yet in sync        {"chainID": "ibc-1", "latestQueriedBlock": 0, "latestHeight": 85}
logger.go:130: 2022-05-23T11:01:05.681-0500 DEBUG   queried latest height   {"chainID": "ibc-1", "latestHeight": 85}
logger.go:130: 2022-05-23T11:01:05.705-0500 DEBUG   observed MsgCreateClient        {"chainID": "ibc-0", "clientID": "07-tendermint-0"}
logger.go:130: 2022-05-23T11:01:05.733-0500 DEBUG   observed MsgCreateClient        {"chainID": "ibc-1", "clientID": "07-tendermint-0"}
logger.go:130: 2022-05-23T11:01:05.763-0500 DEBUG   observed MsgUpdateClient        {"chainID": "ibc-0", "clientID": "07-tendermint-0"}
logger.go:130: 2022-05-23T11:01:05.915-0500 DEBUG   observed MsgUpdateClient        {"chainID": "ibc-1", "clientID": "07-tendermint-0"}
logger.go:130: 2022-05-23T11:01:05.924-0500 DEBUG   observed MsgUpdateClient        {"chainID": "ibc-0", "clientID": "07-tendermint-0"}
logger.go:130: 2022-05-23T11:01:06.136-0500 DEBUG   observed MsgUpdateClient        {"chainID": "ibc-1", "clientID": "07-tendermint-0"}
logger.go:130: 2022-05-23T11:01:06.222-0500 DEBUG   observed MsgUpdateClient        {"chainID": "ibc-0", "clientID": "07-tendermint-0"}
logger.go:130: 2022-05-23T11:01:06.222-0500 DEBUG   observed MsgChannelOpenInit     {"chainID": "ibc-0", "channelID": "channel-0", "portID": "transfer", "counterpartyChannelID": "", "counterpartyPortID": "transfer", "connectionID": "connection-0"}
logger.go:130: 2022-05-23T11:01:06.356-0500 DEBUG   observed MsgUpdateClient        {"chainID": "ibc-0", "clientID": "07-tendermint-0"}
logger.go:130: 2022-05-23T11:01:06.356-0500 DEBUG   observed MsgChannelOpenAck      {"chainID": "ibc-0", "channelID": "channel-0", "portID": "transfer", "counterpartyChannelID": "channel-0", "counterpartyPortID": "transfer", "connectionID": "connection-0"}
logger.go:130: 2022-05-23T11:01:06.358-0500 DEBUG   observed MsgUpdateClient        {"chainID": "ibc-1", "clientID": "07-tendermint-0"}
logger.go:130: 2022-05-23T11:01:06.358-0500 DEBUG   observed MsgChannelOpenTry      {"chainID": "ibc-1", "channelID": "channel-0", "portID": "transfer", "counterpartyChannelID": "channel-0", "counterpartyPortID": "transfer", "connectionID": "connection-0"}
logger.go:130: 2022-05-23T11:01:06.550-0500 DEBUG   observed MsgUpdateClient        {"chainID": "ibc-1", "clientID": "07-tendermint-0"}
logger.go:130: 2022-05-23T11:01:06.550-0500 DEBUG   observed MsgChannelOpenConfirm  {"chainID": "ibc-1", "channelID": "channel-0", "portID": "transfer", "counterpartyChannelID": "channel-0", "counterpartyPortID": "transfer", "connectionID": "connection-0"}
logger.go:130: 2022-05-23T11:01:06.643-0500 DEBUG   observed MsgTransfer    {"chainID": "ibc-0", "sequence": 1, "srcChannel": "channel-0", "srcPort": "transfer", "timeoutHeight": "1-1078", "timeoutTimestamp": 0}
logger.go:130: 2022-05-23T11:01:06.658-0500 DEBUG   observed MsgTransfer    {"chainID": "ibc-0", "sequence": 2, "srcChannel": "channel-0", "srcPort": "transfer", "timeoutHeight": "1-1080", "timeoutTimestamp": 0}
logger.go:130: 2022-05-23T11:01:06.711-0500 INFO    chain is in sync        {"chainID": "ibc-0"}
logger.go:130: 2022-05-23T11:01:06.711-0500 DEBUG   queried latest height   {"chainID": "ibc-0", "latestHeight": 85}
logger.go:130: 2022-05-23T11:01:06.712-0500 DEBUG   observed MsgTransfer    {"chainID": "ibc-1", "sequence": 1, "srcChannel": "channel-0", "srcPort": "transfer", "timeoutHeight": "0-1080", "timeoutTimestamp": 0}
logger.go:130: 2022-05-23T11:01:06.726-0500 DEBUG   will send message       {"chainID": "ibc-1", "message": "/ibc.core.channel.v1.MsgRecvPacket", "sequence": 2}
logger.go:130: 2022-05-23T11:01:06.727-0500 DEBUG   observed MsgTransfer    {"chainID": "ibc-1", "sequence": 2, "srcChannel": "channel-0", "srcPort": "transfer", "timeoutHeight": "0-1081", "timeoutTimestamp": 0}
logger.go:130: 2022-05-23T11:01:06.729-0500 DEBUG   will send message       {"chainID": "ibc-0", "message": "/ibc.core.channel.v1.MsgRecvPacket", "sequence": 1}
logger.go:130: 2022-05-23T11:01:06.733-0500 DEBUG   will send message       {"chainID": "ibc-1", "message": "/ibc.core.channel.v1.MsgRecvPacket", "sequence": 1}

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x2 addr=0x0 pc=0x1038ad644]

goroutine 329 [running]:
github.com/cosmos/relayer/v2/relayer/chains/cosmos.(*CosmosChainProcessor).LatestHeaderWithTrustedVals(0x1400122c000, 0x14000df27b0?)
        /Users/justintieri/go/src/github.com/cosmos/relayer/relayer/chains/cosmos/cosmos_chain_processor.go:173 +0x204
github.com/cosmos/relayer/v2/relayer/paths.(*PathProcessor).sendIBCMessagesWithUpdateClient(0x14000111a00, {0x14001999ea0, 0x1, 0x466d4d7474575932?}, 0x14000c78d20, 0x14000c78c80)
        /Users/justintieri/go/src/github.com/cosmos/relayer/relayer/paths/path_processor.go:371 +0xb4
github.com/cosmos/relayer/v2/relayer/paths.(*PathProcessor).process.func1()
        /Users/justintieri/go/src/github.com/cosmos/relayer/relayer/paths/path_processor.go:471 +0x3c
golang.org/x/sync/errgroup.(*Group).Go.func1()
        /Users/justintieri/go/pkg/mod/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/errgroup/errgroup.go:57 +0x64
created by golang.org/x/sync/errgroup.(*Group).Go
        /Users/justintieri/go/pkg/mod/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/errgroup/errgroup.go:54 +0x90
FAIL    github.com/cosmos/relayer/v2/_test      158.126s

Attempting to run the relayer on the sommelier<->osmosis path on mainnet also yields a panic.

~:rly start sommosmo -d
2022-05-23T15:43:24.770916Z     info    Debug server listening  {"sys": "debughttp", "addr": "localhost:7597"}
panic: {sommelier-3} error querying latest height: post failed: Post "http:": http: no Host in request URL

goroutine 67 [running]:
github.com/cosmos/relayer/v2/relayer/chains/cosmos.(*CosmosChainProcessor).Start(0x140001ae9c0, {0x103fe1b38, 0x14000e53b40}, 0x140011490e0?)
        /Users/justintieri/go/src/github.com/cosmos/relayer/relayer/chains/cosmos/cosmos_chain_processor.go:202 +0x111c
created by github.com/cosmos/relayer/v2/relayer/chains/processor.ChainProcessors.Start
        /Users/justintieri/go/src/github.com/cosmos/relayer/relayer/chains/processor/chain_processor.go:64 +0x138
panic: {osmosis-1} error querying latest height: post failed: Post "http:": http: no Host in request URL

goroutine 68 [running]:
github.com/cosmos/relayer/v2/relayer/chains/cosmos.(*CosmosChainProcessor).Start(0x140001af520, {0x103fe1b38, 0x14000e53b40}, 0x140011490e0?)
        /Users/justintieri/go/src/github.com/cosmos/relayer/relayer/chains/cosmos/cosmos_chain_processor.go:202 +0x111c
created by github.com/cosmos/relayer/v2/relayer/chains/processor.ChainProcessors.Start
        /Users/justintieri/go/src/github.com/cosmos/relayer/relayer/chains/processor/chain_processor.go:64 +0x138

@agouin
Copy link
Member Author

agouin commented May 23, 2022

ran into a nil pointer exception after the handshake and the MsgTransfers are sent on both the src and dst chains

Thanks for catching this. This should be resolved now!

Attempting to run the relayer on the sommelier<->osmosis path on mainnet also yields a panic.

ibctest just caught this as well. Should also be resolved now. Here is the passing test https://github.com/strangelove-ventures/ibctest/runs/6559353175?check_suite_focus=true

@jtieri
Copy link
Member

jtieri commented May 23, 2022

Looks like another panic when running rly start sommosmo -d

Here's the full stacktrace https://gist.github.com/jtieri/f2bce727e7f63ea236c7d0e82e6ebbf4

@agouin
Copy link
Member Author

agouin commented May 23, 2022

Looks like another panic when running rly start sommosmo -d

Meant to remove that .Unlock() call. Thanks! Resolved now.

Copy link
Contributor

@DavidNix DavidNix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the general architecture is a routing or pub/sub type workflow, which I do like.

Right now, I don't think I have authority to approve this PR yet given my lack of experience with the relayer.

To state the obvious, this is a large PR which always comes with risk. I'm betting manual QA and ibctest will have our bases covered though.

In the future, (if this is a routing pattern), I wonder if it's worth exploring something similar to Go http routing like https://github.com/go-chi/chi.

I'm not saying use http routing verbatim, but it's a nice pattern to declare your routes and attach handlers and (optional) middleware.

relayer/chains/cosmos/ibc_messages.go Outdated Show resolved Hide resolved
relayer/chains/cosmos/ibc_messages.go Show resolved Hide resolved
relayer/chains/cosmos/ibc_messages.go Outdated Show resolved Hide resolved
relayer/chains/processor/chain_processor.go Show resolved Hide resolved
relayer/paths/path_end.go Show resolved Hide resolved
@agouin agouin force-pushed the andrew/event_processor branch from 8b14ade to 025a4a9 Compare May 23, 2022 18:25
@agouin
Copy link
Member Author

agouin commented May 23, 2022

To state the obvious, this is a large PR which always comes with risk. I'm betting manual QA and ibctest will have our bases covered though.

Agreed. I think a few days of mainnet testing, in addition to the passing ibctest suite, can give us the confidence we need.

Copy link
Member

@mark-rushakoff mark-rushakoff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To David's point, there is a bit much going on in the chain processor.

After nearly a decade working in distributed systems in Go, the concurrency pattern that I much prefer over independent goroutines taking common locks, is what I consider the "control loop" pattern. A single goroutine is responsible for all of the mutable data, without requiring any mutexes. The goroutine uses a single for-select loop to handle mutating actions, and it can use request-response channels to respond to queries.

I haven't absorbed the chain processor API enough to say with certainty whether this would apply cleanly here. Sometimes this pattern can be a bit more verbose, but the significant advantage is that you completely avoid deadlocks (because only one goroutine is modifying the data, no need for sync.Mutex), and you completely avoid data races (only one goroutine is reading or writing the set of data). The other tradeoff is that you have to return read-only copies, which I don't expect will be an issue here.

relayer/chains/cosmos/ibc_messages.go Outdated Show resolved Hide resolved
relayer/chains/cosmos/ibc_messages.go Outdated Show resolved Hide resolved
relayer/chains/cosmos/ibc_messages.go Outdated Show resolved Hide resolved
relayer/chains/cosmos/ibc_messages.go Outdated Show resolved Hide resolved
relayer/chains/cosmos/ibc_messages.go Outdated Show resolved Hide resolved
relayer/paths/path_end.go Outdated Show resolved Hide resolved
relayer/paths/path_end.go Show resolved Hide resolved
relayer/paths/path_processor.go Outdated Show resolved Hide resolved
Comment on lines 165 to 172
srcPathEnd.messagesLock.Lock()
dstPathEnd.messagesLock.Lock()
srcPathEnd.inProgressMessageSendsLock.Lock()
dstPathEnd.inProgressMessageSendsLock.Lock()
defer srcPathEnd.messagesLock.Unlock()
defer dstPathEnd.messagesLock.Unlock()
defer srcPathEnd.inProgressMessageSendsLock.Unlock()
defer dstPathEnd.inProgressMessageSendsLock.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking several locks in sequence will incur a high risk of deadlock, for any application that has enough goroutines active.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this somehow. Yeah, high chance of deadlock. As Mark stated:

A single goroutine is responsible for all of the mutable data, without requiring any mutexes.

I am also a fan of this pattern. I've heard it called a "single writer" pattern. Only 1 goroutine writes (or mutates) data.

To echo Mark, hard to tell if that's feasible here though.

Copy link
Member Author

@agouin agouin May 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cleaned this section up by making copies of the messages, using that for iteration to determine what needs to be deleted, then only locking/deleting for the specific maps after all of that.

I also removed the locks for the ibc message assembly, since those can just be separate slices that are concatenated after the waitgroup.

Right now the chain processors push messages into the path processor, and the path processor manages the backlog of messages coming from both chains, and clears out old packets. That data could be separated into two different stores, but we'd still have to lock at some point since the data is being read/written by different threads.

var clientInfo *ibc.ClientInfo
var action string
for _, event := range messageLog.Events {
switch event.Type {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to log an ignore event type at debug level in the default case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea for debugging

@DavidNix
Copy link
Contributor

One final comment, I recommend unit tests around the relayer rejecting/accepting a packet. E.g. Is the channel open, port and channel ids match up, correct sequence, channel filtering (already suggested), etc.

Basically, starting here: https://github.com/cosmos/ibc/blob/master/spec/core/ics-004-channel-and-packet-semantics/README.md#a-day-in-the-life-of-a-packet

The above could be tested with ibctest. However, given ibctest's black box style, the more we add, the more brittle it becomes. Plus, black box tests are slow.

The routing and validation of packets probably can be segmented into units. Hence, unit tests will be valuable, give instant feedback, and we can quickly test a variety of edge/error cases.

If given appetite for the above, I'm happy to make an issue and tackle it in the near future.

@agouin agouin marked this pull request as draft May 24, 2022 00:36
"golang.org/x/sync/errgroup"
)

const (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bringing these up to be user configurable is good

}
}

func (ccp *CosmosChainProcessor) IsRelevantChannel(channelKey ibc.ChannelKey) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need a IsRelevantConnection

action = attr.Value
}
}
case "connection_open_init", "connection_open_try", "connection_open_ack",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to catch channel_open_init and complete the channel opening (i.e. format and send the completion transactions)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this implemented as a PoC in a stale branch in case it's helpful here.
7fb2846#diff-8a85df9f5b36f2f6c8bbf9f000dc704f92f60bc6e94da171df51c9cd3a422620L51

There are two upstream changes in ibc-go that we need iirc so that we can properly pull the channel version identifier from the events.

cosmos/ibc-go#1204
cosmos/ibc-go#1203

Thomas from Interchain said, "So, the fixes have been implemented in the release/v3.1.x (and release/v2.3.x) branch already. So the next release should include them but we're not yet sure if that's going to be minor release or v4
cosmos/ibc-go#1279"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. I'll get the connection and channel handshakes pulled into the chain/path processors also.

func (ccp *CosmosChainProcessor) GetMsgRecvPacket(signer string, msgRecvPacket provider.RelayerMessage) (provider.RelayerMessage, error) {
msg := getCosmosMsg[*chantypes.MsgRecvPacket](msgRecvPacket)

key := host.PacketCommitmentKey(msg.Packet.SourcePort, msg.Packet.SourceChannel, msg.Packet.Sequence)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we wanna handle retries here or upstream? Prob upstream

@agouin
Copy link
Member Author

agouin commented May 24, 2022

As an update per conversation outside this PR, I am going to break down this PR into smaller chunks. I am planning on a first PR with the minimal Chain and Path processors so we can decide on an optimal relationship between the two.

Runtime consists of multiple chain processors and potentially multiple path processors, with both multiple path processors per chain processor and multiple chain processors per path processor, so we can experiment with different routing and concurrency strategies that will allow the parallelization needed between them.

@agouin
Copy link
Member Author

agouin commented Jul 5, 2022

All of this functionality is now present, was handled by smaller PRs to revise all of these components.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants