Skip to content

Commit

Permalink
restructuring parachain commitment relayer into isolated worker (pari…
Browse files Browse the repository at this point in the history
…tytech#352)

* first draft of parachain commitment relayer

* refactor updates

* rename
  • Loading branch information
musnit committed Apr 21, 2021
1 parent 633f683 commit 6d5e58d
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 62 deletions.
6 changes: 6 additions & 0 deletions ethereum/scripts/dumpTestConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ const dump = (tmpDir, channels, bridge) => {
database: {
dialect: "sqlite3",
dbpath: "tmp.db",
},
relay: {
direction: 1,
},
workers: {
parachaincommitmentrrelayer: true
}
}
fs.writeFileSync(path.join(tmpDir, "config.toml"), TOML.stringify(config));
Expand Down
4 changes: 4 additions & 0 deletions relayer/chain/ethereum/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,7 @@ func (co *Connection) Close() {
func (co *Connection) GetClient() *ethclient.Client {
return co.client
}

func (co *Connection) GetKP() *secp256k1.Keypair {
return co.kp
}
38 changes: 9 additions & 29 deletions relayer/chain/parachain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (
)

type Chain struct {
config *Config
listener *Listener
writer *Writer
conn *Connection
log *logrus.Entry
config *Config
writer *Writer
conn *Connection
log *logrus.Entry
}

const Name = "Parachain"
Expand All @@ -37,11 +36,10 @@ func NewChain(config *Config) (*Chain, error) {
}

return &Chain{
config: config,
conn: NewConnection(config.Endpoint, kp.AsKeyringPair(), log),
listener: nil,
writer: nil,
log: log,
config: config,
conn: NewConnection(config.Endpoint, kp.AsKeyringPair(), log),
writer: nil,
log: log,
}, nil
}

Expand All @@ -54,19 +52,8 @@ func (ch *Chain) SetReceiver(ethMessages <-chan []chain.Message, ethHeaders <-ch
return nil
}

func (ch *Chain) SetSender(subMessages chan<- []chain.Message, _ chan<- chain.Header, _ chan<- store.DatabaseCmd) error {
listener := NewListener(
ch.config,
ch.conn,
subMessages,
ch.log,
)
ch.listener = listener
return nil
}

func (ch *Chain) Start(ctx context.Context, eg *errgroup.Group, ethInit chan<- chain.Init, _ <-chan chain.Init) error {
if ch.listener == nil && ch.writer == nil {
if ch.writer == nil {
return fmt.Errorf("Sender and/or receiver need to be set before starting chain")
}

Expand All @@ -88,13 +75,6 @@ func (ch *Chain) Start(ctx context.Context, eg *errgroup.Group, ethInit chan<- c
ethInit <- ethInitHeaderID
close(ethInit)

if ch.listener != nil {
err = ch.listener.Start(ctx, eg)
if err != nil {
return err
}
}

if ch.writer != nil {
err = ch.writer.Start(ctx, eg)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions relayer/chain/parachain/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type Connection struct {
log *logrus.Entry
}

func (co *Connection) GetAPI() *gsrpc.SubstrateAPI {
return co.api
}

func NewConnection(endpoint string, kp *signature.KeyringPair, log *logrus.Entry) *Connection {
return &Connection{
endpoint: endpoint,
Expand Down
55 changes: 41 additions & 14 deletions relayer/core/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (
"github.com/snowfork/polkadot-ethereum/relayer/chain/parachain"
"github.com/snowfork/polkadot-ethereum/relayer/chain/relaychain"
"github.com/snowfork/polkadot-ethereum/relayer/store"
"github.com/snowfork/polkadot-ethereum/relayer/workers/parachaincommitmentrelayer"
)

type Relay struct {
ethChain chain.Chain
paraChain chain.Chain
relayChain chain.Chain
database *store.Database
ethChain chain.Chain
paraChain chain.Chain
relayChain chain.Chain
database *store.Database
direction Direction
parachainCommitmentRelayer *parachaincommitmentrelayer.Worker
}

type Direction int
Expand All @@ -44,12 +47,17 @@ type RelayConfig struct {
HeadersOnly bool `mapstructure:"headers-only"`
}

type WorkerConfig struct {
ParachainCommitmentRelayer bool `mapstructure:"parachaincommitmentrrelayer"`
}

type Config struct {
Relay RelayConfig `mapstructure:"relay"`
Eth ethereum.Config `mapstructure:"ethereum"`
Parachain parachain.Config `mapstructure:"parachain"`
Relaychain relaychain.Config `mapstructure:"relaychain"`
Database store.Config `mapstructure:"database"`
Workers WorkerConfig `mapstructure:"workers"`
}

func NewRelay() (*Relay, error) {
Expand Down Expand Up @@ -119,22 +127,28 @@ func NewRelay() (*Relay, error) {
return nil, err
}

err = paraChain.SetSender(subMessages, nil, dbMessages)
err = relayChain.SetSender(subMessages, nil, beefyMessages)
if err != nil {
return nil, err
}
}

err = relayChain.SetSender(subMessages, nil, beefyMessages)
parachainCommitmentRelayer := &parachaincommitmentrelayer.Worker{}

if config.Workers.ParachainCommitmentRelayer == true {
parachainCommitmentRelayer, err = parachaincommitmentrelayer.NewWorker(&config.Parachain, &config.Eth)
if err != nil {
return nil, err
}
}

return &Relay{
ethChain: ethChain,
paraChain: paraChain,
relayChain: relayChain,
database: database,
ethChain: ethChain,
paraChain: paraChain,
relayChain: relayChain,
database: database,
direction: direction,
parachainCommitmentRelayer: parachainCommitmentRelayer,
}, nil
}

Expand Down Expand Up @@ -196,16 +210,29 @@ func (re *Relay) Start() {
log.WithField("name", re.paraChain.Name()).Info("Started chain")
defer re.paraChain.Stop()

err = re.relayChain.Start(ctx, eg, make(chan chain.Init), make(chan chain.Init))
if re.direction != EthToSub {
err = re.relayChain.Start(ctx, eg, make(chan chain.Init), make(chan chain.Init))
if err != nil {
log.WithFields(log.Fields{
"chain": re.relayChain.Name(),
"error": err,
}).Error("Failed to start chain")
return
}
log.WithField("name", re.relayChain.Name()).Info("Started chain")
defer re.relayChain.Stop()
}

err = re.parachainCommitmentRelayer.Start(ctx, eg)
if err != nil {
log.WithFields(log.Fields{
"chain": re.relayChain.Name(),
"error": err,
}).Error("Failed to start chain")
}).Error("Failed to start parachainCommitmentRelayer")
return
}
log.WithField("name", re.relayChain.Name()).Info("Started chain")
defer re.relayChain.Stop()
log.WithField("name", re.relayChain.Name()).Info("Started parachainCommitmentRelayer")
defer re.parachainCommitmentRelayer.Stop()

notifyWaitDone := make(chan struct{})

Expand Down
137 changes: 137 additions & 0 deletions relayer/workers/parachaincommitmentrelayer/ethereum-channel-writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package parachaincommitmentrelayer

import (
"context"
"fmt"

"golang.org/x/sync/errgroup"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/sirupsen/logrus"

"github.com/snowfork/polkadot-ethereum/relayer/chain"
"github.com/snowfork/polkadot-ethereum/relayer/chain/ethereum"
"github.com/snowfork/polkadot-ethereum/relayer/contracts/inbound"
"github.com/snowfork/polkadot-ethereum/relayer/substrate"
)

type EthereumChannelWriter struct {
config *ethereum.Config
conn *ethereum.Connection
contracts map[substrate.ChannelID]*inbound.Contract
messages <-chan []chain.Message
log *logrus.Entry
}

func NewEthereumChannelWriter(config *ethereum.Config, conn *ethereum.Connection, messages <-chan []chain.Message,
contracts map[substrate.ChannelID]*inbound.Contract,
log *logrus.Entry) (*EthereumChannelWriter, error) {
return &EthereumChannelWriter{
config: config,
conn: conn,
contracts: contracts,
messages: messages,
log: log,
}, nil
}

func (wr *EthereumChannelWriter) Start(ctx context.Context, eg *errgroup.Group) error {

id := substrate.ChannelID{IsBasic: true}
contract, err := inbound.NewContract(common.HexToAddress(wr.config.Channels.Basic.Inbound), wr.conn.GetClient())
if err != nil {
return err
}
wr.contracts[id] = contract

id = substrate.ChannelID{IsIncentivized: true}
contract, err = inbound.NewContract(common.HexToAddress(wr.config.Channels.Incentivized.Inbound), wr.conn.GetClient())
if err != nil {
return err
}
wr.contracts[id] = contract

eg.Go(func() error {
return wr.writeMessagesLoop(ctx)
})

return nil
}

func (wr *EthereumChannelWriter) onDone(ctx context.Context) error {
wr.log.Info("Shutting down writer...")
// Avoid deadlock if a listener is still trying to send to a channel
for range wr.messages {
wr.log.Debug("Discarded message")
}
return ctx.Err()
}

func (wr *EthereumChannelWriter) writeMessagesLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return wr.onDone(ctx)
case msgs := <-wr.messages:
for _, msg := range msgs {
concreteMsg, ok := msg.(chain.SubstrateOutboundMessage)
if !ok {
return fmt.Errorf("Invalid message")
}

err := wr.WriteChannel(ctx, &concreteMsg)
if err != nil {
wr.log.WithError(err).Error("Error submitting message to ethereum")
}
}
}
}
}

func (wr *EthereumChannelWriter) signerFn(_ common.Address, tx *types.Transaction) (*types.Transaction, error) {
signedTx, err := types.SignTx(tx, types.HomesteadSigner{}, wr.conn.GetKP().PrivateKey())
if err != nil {
return nil, err
}
return signedTx, nil
}

// Submit sends a SCALE-encoded message to an application deployed on the Ethereum network
func (wr *EthereumChannelWriter) WriteChannel(ctx context.Context, msg *chain.SubstrateOutboundMessage) error {
contract := wr.contracts[msg.ChannelID]
if contract == nil {
return fmt.Errorf("Unknown contract")
}

options := bind.TransactOpts{
From: wr.conn.GetKP().CommonAddress(),
Signer: wr.signerFn,
Context: ctx,
GasLimit: 500000,
}

var messages []inbound.InboundChannelMessage
for _, m := range msg.Commitment {
messages = append(messages,
inbound.InboundChannelMessage{
Target: m.Target,
Nonce: m.Nonce,
Payload: m.Payload,
},
)
}

tx, err := contract.Submit(&options, messages, msg.CommitmentHash)
if err != nil {
wr.log.WithError(err).Error("Failed to submit transaction")
return err
}

wr.log.WithFields(logrus.Fields{
"txHash": tx.Hash().Hex(),
}).Info("Transaction submitted")

return nil
}
Loading

0 comments on commit 6d5e58d

Please sign in to comment.