Skip to content

Commit

Permalink
Revert "restructuring parachain commitment relayer into isolated work…
Browse files Browse the repository at this point in the history
…er (paritytech#352)"

This reverts commit 6d5e58d.
  • Loading branch information
musnit committed Apr 21, 2021
1 parent 6d5e58d commit 201ac86
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 341 deletions.
6 changes: 0 additions & 6 deletions ethereum/scripts/dumpTestConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ const dump = (tmpDir, channels, bridge) => {
database: {
dialect: "sqlite3",
dbpath: "tmp.db",
},
relay: {
direction: 1,
},
workers: {
parachaincommitmentrrelayer: true
}
}
fs.writeFileSync(path.join(tmpDir, "config.toml"), TOML.stringify(config));
Expand Down
4 changes: 0 additions & 4 deletions relayer/chain/ethereum/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,3 @@ func (co *Connection) Close() {
func (co *Connection) GetClient() *ethclient.Client {
return co.client
}

func (co *Connection) GetKP() *secp256k1.Keypair {
return co.kp
}
38 changes: 29 additions & 9 deletions relayer/chain/parachain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (
)

type Chain struct {
config *Config
writer *Writer
conn *Connection
log *logrus.Entry
config *Config
listener *Listener
writer *Writer
conn *Connection
log *logrus.Entry
}

const Name = "Parachain"
Expand All @@ -36,10 +37,11 @@ func NewChain(config *Config) (*Chain, error) {
}

return &Chain{
config: config,
conn: NewConnection(config.Endpoint, kp.AsKeyringPair(), log),
writer: nil,
log: log,
config: config,
conn: NewConnection(config.Endpoint, kp.AsKeyringPair(), log),
listener: nil,
writer: nil,
log: log,
}, nil
}

Expand All @@ -52,8 +54,19 @@ func (ch *Chain) SetReceiver(ethMessages <-chan []chain.Message, ethHeaders <-ch
return nil
}

func (ch *Chain) SetSender(subMessages chan<- []chain.Message, _ chan<- chain.Header, _ chan<- store.DatabaseCmd) error {
listener := NewListener(
ch.config,
ch.conn,
subMessages,
ch.log,
)
ch.listener = listener
return nil
}

func (ch *Chain) Start(ctx context.Context, eg *errgroup.Group, ethInit chan<- chain.Init, _ <-chan chain.Init) error {
if ch.writer == nil {
if ch.listener == nil && ch.writer == nil {
return fmt.Errorf("Sender and/or receiver need to be set before starting chain")
}

Expand All @@ -75,6 +88,13 @@ func (ch *Chain) Start(ctx context.Context, eg *errgroup.Group, ethInit chan<- c
ethInit <- ethInitHeaderID
close(ethInit)

if ch.listener != nil {
err = ch.listener.Start(ctx, eg)
if err != nil {
return err
}
}

if ch.writer != nil {
err = ch.writer.Start(ctx, eg)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions relayer/chain/parachain/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ type Connection struct {
log *logrus.Entry
}

func (co *Connection) GetAPI() *gsrpc.SubstrateAPI {
return co.api
}

func NewConnection(endpoint string, kp *signature.KeyringPair, log *logrus.Entry) *Connection {
return &Connection{
endpoint: endpoint,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2020 Snowfork
// SPDX-License-Identifier: LGPL-3.0-only

package parachaincommitmentrelayer
package parachain

import (
"context"
Expand All @@ -14,27 +14,26 @@ import (
"golang.org/x/sync/errgroup"

"github.com/snowfork/polkadot-ethereum/relayer/chain"
"github.com/snowfork/polkadot-ethereum/relayer/chain/parachain"
chainTypes "github.com/snowfork/polkadot-ethereum/relayer/substrate"
)

type ParachainCommitmentListener struct {
config *parachain.Config
conn *parachain.Connection
type Listener struct {
config *Config
conn *Connection
messages chan<- []chain.Message
log *logrus.Entry
}

func NewParachainCommitmentListener(config *parachain.Config, conn *parachain.Connection, messages chan<- []chain.Message, log *logrus.Entry) *ParachainCommitmentListener {
return &ParachainCommitmentListener{
func NewListener(config *Config, conn *Connection, messages chan<- []chain.Message, log *logrus.Entry) *Listener {
return &Listener{
config: config,
conn: conn,
messages: messages,
log: log,
}
}

func (li *ParachainCommitmentListener) Start(ctx context.Context, eg *errgroup.Group) error {
func (li *Listener) Start(ctx context.Context, eg *errgroup.Group) error {

blockNumber, err := li.fetchStartBlock()
if err != nil {
Expand Down Expand Up @@ -66,14 +65,14 @@ func sleep(ctx context.Context, delay time.Duration) {
}

// Fetch the starting block
func (li *ParachainCommitmentListener) fetchStartBlock() (uint64, error) {
hash, err := li.conn.GetAPI().RPC.Chain.GetFinalizedHead()
func (li *Listener) fetchStartBlock() (uint64, error) {
hash, err := li.conn.api.RPC.Chain.GetFinalizedHead()
if err != nil {
li.log.WithError(err).Error("Failed to fetch hash for starting block")
return 0, err
}

header, err := li.conn.GetAPI().RPC.Chain.GetHeader(hash)
header, err := li.conn.api.RPC.Chain.GetHeader(hash)
if err != nil {
li.log.WithError(err).Error("Failed to fetch header for starting block")
return 0, err
Expand All @@ -84,7 +83,7 @@ func (li *ParachainCommitmentListener) fetchStartBlock() (uint64, error) {

var ErrBlockNotReady = errors.New("required result to be 32 bytes, but got 0")

func (li *ParachainCommitmentListener) produceFinalizedHeaders(ctx context.Context, startBlock uint64, headers chan<- types.Header) error {
func (li *Listener) produceFinalizedHeaders(ctx context.Context, startBlock uint64, headers chan<- types.Header) error {
current := startBlock
retryInterval := time.Duration(6) * time.Second
for {
Expand All @@ -93,13 +92,13 @@ func (li *ParachainCommitmentListener) produceFinalizedHeaders(ctx context.Conte
li.log.Info("Shutting down producer of finalized headers")
return ctx.Err()
default:
finalizedHash, err := li.conn.GetAPI().RPC.Chain.GetFinalizedHead()
finalizedHash, err := li.conn.api.RPC.Chain.GetFinalizedHead()
if err != nil {
li.log.WithError(err).Error("Failed to fetch finalized head")
return err
}

finalizedHeader, err := li.conn.GetAPI().RPC.Chain.GetHeader(finalizedHash)
finalizedHeader, err := li.conn.api.RPC.Chain.GetHeader(finalizedHash)
if err != nil {
li.log.WithError(err).Error("Failed to fetch header for finalized head")
return err
Expand All @@ -114,7 +113,7 @@ func (li *ParachainCommitmentListener) produceFinalizedHeaders(ctx context.Conte
continue
}

hash, err := li.conn.GetAPI().RPC.Chain.GetBlockHash(current)
hash, err := li.conn.api.RPC.Chain.GetBlockHash(current)
if err != nil {
if err.Error() == ErrBlockNotReady.Error() {
sleep(ctx, retryInterval)
Expand All @@ -125,7 +124,7 @@ func (li *ParachainCommitmentListener) produceFinalizedHeaders(ctx context.Conte
}
}

header, err := li.conn.GetAPI().RPC.Chain.GetHeader(hash)
header, err := li.conn.api.RPC.Chain.GetHeader(hash)
if err != nil {
li.log.WithError(err).Error("Failed to fetch header")
return err
Expand All @@ -137,7 +136,7 @@ func (li *ParachainCommitmentListener) produceFinalizedHeaders(ctx context.Conte
}
}

func (li *ParachainCommitmentListener) consumeFinalizedHeaders(ctx context.Context, headers <-chan types.Header) error {
func (li *Listener) consumeFinalizedHeaders(ctx context.Context, headers <-chan types.Header) error {
if li.messages == nil {
li.log.Info("Not polling events since channel is nil")
return nil
Expand All @@ -161,7 +160,7 @@ func (li *ParachainCommitmentListener) consumeFinalizedHeaders(ctx context.Conte
}
}

func (li *ParachainCommitmentListener) processHeader(ctx context.Context, header types.Header) error {
func (li *Listener) processHeader(ctx context.Context, header types.Header) error {

li.log.WithFields(logrus.Fields{
"blockNumber": header.Number,
Expand All @@ -182,12 +181,12 @@ func (li *ParachainCommitmentListener) processHeader(ctx context.Context, header
"commitmentHash": digestItem.AsCommitment.Hash.Hex(),
}).Debug("Found commitment hash in header digest")

storageKey, err := parachain.MakeStorageKey(digestItem.AsCommitment.ChannelID, digestItem.AsCommitment.Hash)
storageKey, err := MakeStorageKey(digestItem.AsCommitment.ChannelID, digestItem.AsCommitment.Hash)
if err != nil {
return err
}

data, err := li.conn.GetAPI().RPC.Offchain.LocalStorageGet(rpcOffchain.Persistent, storageKey)
data, err := li.conn.api.RPC.Offchain.LocalStorageGet(rpcOffchain.Persistent, storageKey)
if err != nil {
li.log.WithError(err).Error("Failed to read commitment from offchain storage")
return err
Expand Down
55 changes: 14 additions & 41 deletions relayer/core/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ import (
"github.com/snowfork/polkadot-ethereum/relayer/chain/parachain"
"github.com/snowfork/polkadot-ethereum/relayer/chain/relaychain"
"github.com/snowfork/polkadot-ethereum/relayer/store"
"github.com/snowfork/polkadot-ethereum/relayer/workers/parachaincommitmentrelayer"
)

type Relay struct {
ethChain chain.Chain
paraChain chain.Chain
relayChain chain.Chain
database *store.Database
direction Direction
parachainCommitmentRelayer *parachaincommitmentrelayer.Worker
ethChain chain.Chain
paraChain chain.Chain
relayChain chain.Chain
database *store.Database
}

type Direction int
Expand All @@ -47,17 +44,12 @@ type RelayConfig struct {
HeadersOnly bool `mapstructure:"headers-only"`
}

type WorkerConfig struct {
ParachainCommitmentRelayer bool `mapstructure:"parachaincommitmentrrelayer"`
}

type Config struct {
Relay RelayConfig `mapstructure:"relay"`
Eth ethereum.Config `mapstructure:"ethereum"`
Parachain parachain.Config `mapstructure:"parachain"`
Relaychain relaychain.Config `mapstructure:"relaychain"`
Database store.Config `mapstructure:"database"`
Workers WorkerConfig `mapstructure:"workers"`
}

func NewRelay() (*Relay, error) {
Expand Down Expand Up @@ -127,28 +119,22 @@ func NewRelay() (*Relay, error) {
return nil, err
}

err = relayChain.SetSender(subMessages, nil, beefyMessages)
err = paraChain.SetSender(subMessages, nil, dbMessages)
if err != nil {
return nil, err
}
}

parachainCommitmentRelayer := &parachaincommitmentrelayer.Worker{}

if config.Workers.ParachainCommitmentRelayer == true {
parachainCommitmentRelayer, err = parachaincommitmentrelayer.NewWorker(&config.Parachain, &config.Eth)
err = relayChain.SetSender(subMessages, nil, beefyMessages)
if err != nil {
return nil, err
}
}

return &Relay{
ethChain: ethChain,
paraChain: paraChain,
relayChain: relayChain,
database: database,
direction: direction,
parachainCommitmentRelayer: parachainCommitmentRelayer,
ethChain: ethChain,
paraChain: paraChain,
relayChain: relayChain,
database: database,
}, nil
}

Expand Down Expand Up @@ -210,29 +196,16 @@ func (re *Relay) Start() {
log.WithField("name", re.paraChain.Name()).Info("Started chain")
defer re.paraChain.Stop()

if re.direction != EthToSub {
err = re.relayChain.Start(ctx, eg, make(chan chain.Init), make(chan chain.Init))
if err != nil {
log.WithFields(log.Fields{
"chain": re.relayChain.Name(),
"error": err,
}).Error("Failed to start chain")
return
}
log.WithField("name", re.relayChain.Name()).Info("Started chain")
defer re.relayChain.Stop()
}

err = re.parachainCommitmentRelayer.Start(ctx, eg)
err = re.relayChain.Start(ctx, eg, make(chan chain.Init), make(chan chain.Init))
if err != nil {
log.WithFields(log.Fields{
"chain": re.relayChain.Name(),
"error": err,
}).Error("Failed to start parachainCommitmentRelayer")
}).Error("Failed to start chain")
return
}
log.WithField("name", re.relayChain.Name()).Info("Started parachainCommitmentRelayer")
defer re.parachainCommitmentRelayer.Stop()
log.WithField("name", re.relayChain.Name()).Info("Started chain")
defer re.relayChain.Stop()

notifyWaitDone := make(chan struct{})

Expand Down
Loading

0 comments on commit 201ac86

Please sign in to comment.