Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
migrate beefy relayer into own worker (paritytech#353)
Browse files Browse the repository at this point in the history
* first draft of parachain commitment relayer

* refactor updates

* rename

* first attempt

* more refactor

* fix

* sub to headers

* pr fixes

* Worker improvements (paritytech#356)

* start test

* useful mage command

* log improvements

* remove

* fix

* fix
  • Loading branch information
musnit authored Apr 21, 2021
1 parent 80b9a0d commit e46e2c2
Show file tree
Hide file tree
Showing 24 changed files with 1,126 additions and 519 deletions.
7 changes: 7 additions & 0 deletions ethereum/scripts/dumpTestConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ const dump = (tmpDir, channels, bridge) => {
database: {
dialect: "sqlite3",
dbpath: "tmp.db",
},
relay: {
direction: 1,
},
workers: {
parachaincommitmentrrelayer: true,
beefyrelayer: true
}
}
fs.writeFileSync(path.join(tmpDir, "config.toml"), TOML.stringify(config));
Expand Down
17 changes: 6 additions & 11 deletions relayer/chain/ethereum/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/snowfork/polkadot-ethereum/relayer/chain"
"github.com/snowfork/polkadot-ethereum/relayer/contracts/inbound"
"github.com/snowfork/polkadot-ethereum/relayer/store"
"github.com/snowfork/polkadot-ethereum/relayer/substrate"
"golang.org/x/sync/errgroup"

Expand All @@ -20,7 +19,6 @@ import (
// Chain streams the Ethereum blockchain and routes tx data packets
type Chain struct {
config *Config
db *store.Database
listener *Listener
writer *Writer
conn *Connection
Expand All @@ -30,7 +28,7 @@ type Chain struct {
const Name = "Ethereum"

// NewChain initializes a new instance of EthChain
func NewChain(config *Config, db *store.Database) (*Chain, error) {
func NewChain(config *Config) (*Chain, error) {
log := logrus.WithField("chain", Name)

kp, err := secp256k1.NewKeypairFromString(config.PrivateKey)
Expand All @@ -40,19 +38,17 @@ func NewChain(config *Config, db *store.Database) (*Chain, error) {

return &Chain{
config: config,
db: db,
listener: nil,
writer: nil,
conn: NewConnection(config.Endpoint, kp, log),
log: log,
}, nil
}

func (ch *Chain) SetReceiver(subMessages <-chan []chain.Message, _ <-chan chain.Header,
dbMessages chan<- store.DatabaseCmd, beefyMessages <-chan store.BeefyRelayInfo) error {
func (ch *Chain) SetReceiver(subMessages <-chan []chain.Message, _ <-chan chain.Header) error {
contracts := make(map[substrate.ChannelID]*inbound.Contract)

writer, err := NewWriter(ch.config, ch.conn, ch.db, subMessages, dbMessages, beefyMessages, contracts, ch.log)
writer, err := NewWriter(ch.config, ch.conn, subMessages, contracts, ch.log)
if err != nil {
return err
}
Expand All @@ -61,10 +57,9 @@ func (ch *Chain) SetReceiver(subMessages <-chan []chain.Message, _ <-chan chain.
return nil
}

func (ch *Chain) SetSender(ethMessages chan<- []chain.Message, ethHeaders chan<- chain.Header,
dbMessages chan<- store.DatabaseCmd, beefyMessages chan<- store.BeefyRelayInfo) error {
listener, err := NewListener(ch.config, ch.conn, ch.db, ethMessages,
beefyMessages, dbMessages, ethHeaders, ch.log)
func (ch *Chain) SetSender(ethMessages chan<- []chain.Message, ethHeaders chan<- chain.Header) error {
listener, err := NewListener(ch.config, ch.conn, ethMessages,
ethHeaders, ch.log)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions relayer/chain/ethereum/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,7 @@ func (co *Connection) Close() {
func (co *Connection) GetClient() *ethclient.Client {
return co.client
}

func (co *Connection) GetKP() *secp256k1.Keypair {
return co.kp
}
120 changes: 2 additions & 118 deletions relayer/chain/ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package ethereum

import (
"context"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -18,9 +16,7 @@ import (

"github.com/snowfork/polkadot-ethereum/relayer/chain"
"github.com/snowfork/polkadot-ethereum/relayer/chain/ethereum/syncer"
"github.com/snowfork/polkadot-ethereum/relayer/contracts/lightclientbridge"
"github.com/snowfork/polkadot-ethereum/relayer/contracts/outbound"
"github.com/snowfork/polkadot-ethereum/relayer/store"
)

const MaxMessagesPerSend = 10
Expand All @@ -32,30 +28,23 @@ type Listener struct {
basicOutboundChannel *outbound.BasicOutboundChannel
incentivizedOutboundChannel *outbound.IncentivizedOutboundChannel
mapping map[common.Address]string
db *store.Database
address common.Address
lightClientBridge *lightclientbridge.Contract
messages chan<- []chain.Message
beefyMessages chan<- store.BeefyRelayInfo
dbMessages chan<- store.DatabaseCmd
headers chan<- chain.Header
blockWaitPeriod uint64
log *logrus.Entry
}

func NewListener(config *Config, conn *Connection, db *store.Database, messages chan<- []chain.Message,
beefyMessages chan<- store.BeefyRelayInfo, dbMessages chan<- store.DatabaseCmd, headers chan<- chain.Header,
func NewListener(config *Config, conn *Connection, messages chan<- []chain.Message,
headers chan<- chain.Header,
log *logrus.Entry) (*Listener, error) {
return &Listener{
config: config,
conn: conn,
basicOutboundChannel: nil,
incentivizedOutboundChannel: nil,
mapping: make(map[common.Address]string),
db: db,
messages: messages,
dbMessages: dbMessages,
beefyMessages: beefyMessages,
headers: headers,
blockWaitPeriod: 0,
log: log,
Expand Down Expand Up @@ -88,19 +77,6 @@ func (li *Listener) Start(cxt context.Context, eg *errgroup.Group, initBlockHeig
li.mapping[common.HexToAddress(li.config.Channels.Basic.Outbound)] = "BasicInboundChannel.submit"
li.mapping[common.HexToAddress(li.config.Channels.Incentivized.Outbound)] = "IncentivizedInboundChannel.submit"

// Set up light client bridge contract
lightClientBridgeContract, err := lightclientbridge.NewContract(common.HexToAddress(li.config.LightClientBridge), li.conn.client)
if err != nil {
return err
}
li.lightClientBridge = lightClientBridgeContract

// Fetch BLOCK_WAIT_PERIOD from light client bridge contract
blockWaitPeriod, err := li.lightClientBridge.ContractCaller.BLOCKWAITPERIOD(nil)
if err != nil {
return err
}
li.blockWaitPeriod = blockWaitPeriod.Uint64()
eg.Go(func() error {
err := li.pollEventsAndHeaders(cxt, initBlockHeight, descendantsUntilFinal, hcs)
if li.messages != nil {
Expand Down Expand Up @@ -174,42 +150,6 @@ func (li *Listener) pollEventsAndHeaders(
events = append(events, incentivizedEvents...)

li.forwardEvents(ctx, hcs, events)

// Query LightClientBridge contract's InitialVerificationSuccessful events
blockNumber := gethheader.Number.Uint64()
var lightClientBridgeEvents []*lightclientbridge.ContractInitialVerificationSuccessful

contractEvents, err := li.queryLightClientEvents(ctx, blockNumber, &blockNumber)
if err != nil {
li.log.WithError(err).Error("Failure fetching event logs")
return err
}
lightClientBridgeEvents = append(lightClientBridgeEvents, contractEvents...)

if len(lightClientBridgeEvents) > 0 {
li.log.Info(fmt.Sprintf("Found %d LightClientBridge contract events on block %d", len(lightClientBridgeEvents), blockNumber))
}
li.processLightClientEvents(ctx, lightClientBridgeEvents)

// Mark items ReadyToComplete if the current block number has passed their CompleteOnBlock number
items := li.db.GetItemsByStatus(store.InitialVerificationTxConfirmed)
if len(items) > 0 {
li.log.Info(fmt.Sprintf("Found %d item(s) in database awaiting completion block", len(items)))
}
for _, item := range items {
if item.CompleteOnBlock+descendantsUntilFinal <= blockNumber {
// Fetch intended completion block's hash
block, err := li.conn.client.BlockByNumber(ctx, big.NewInt(int64(item.CompleteOnBlock)))
if err != nil {
li.log.WithError(err).Error("Failure fetching inclusion block")
}

li.log.Info("3: Updating item status from 'InitialVerificationTxConfirmed' to 'ReadyToComplete'")
item.Status = store.ReadyToComplete
item.RandomSeed = block.Hash()
li.beefyMessages <- *item
}
}
}
}
}
Expand Down Expand Up @@ -319,59 +259,3 @@ func (li *Listener) forwardHeader(hcs *HeaderCacheState, gethheader *gethTypes.H

return nil
}

// queryLightClientEvents queries ContractInitialVerificationSuccessful events from the LightClientBridge contract
func (li *Listener) queryLightClientEvents(ctx context.Context, start uint64,
end *uint64) ([]*lightclientbridge.ContractInitialVerificationSuccessful, error) {
var events []*lightclientbridge.ContractInitialVerificationSuccessful
filterOps := bind.FilterOpts{Start: start, End: end, Context: ctx}

iter, err := li.lightClientBridge.FilterInitialVerificationSuccessful(&filterOps)
if err != nil {
return nil, err
}

for {
more := iter.Next()
if !more {
err = iter.Error()
if err != nil {
return nil, err
}
break
}

events = append(events, iter.Event)
}

return events, nil
}

// processLightClientEvents matches events to BEEFY commitment info by transaction hash
func (li *Listener) processLightClientEvents(ctx context.Context, events []*lightclientbridge.ContractInitialVerificationSuccessful) {
for _, event := range events {
// Only process events emitted by transactions sent from our node
if event.Prover != li.conn.kp.CommonAddress() {
continue
}

li.log.WithFields(logrus.Fields{
"blockHash": event.Raw.BlockHash.Hex(),
"blockNumber": event.Raw.BlockNumber,
"txHash": event.Raw.TxHash.Hex(),
}).Info("event information")

item := li.db.GetItemByInitialVerificationTxHash(event.Raw.TxHash)
if item.Status != store.InitialVerificationTxSent {
continue
}

li.log.Info("2: Updating item status from 'InitialVerificationTxSent' to 'InitialVerificationTxConfirmed'")
instructions := map[string]interface{}{
"status": store.InitialVerificationTxConfirmed,
"complete_on_block": event.Raw.BlockNumber + li.blockWaitPeriod,
}
updateCmd := store.NewDatabaseCmd(item, store.Update, instructions)
li.dbMessages <- updateCmd
}
}
Loading

0 comments on commit e46e2c2

Please sign in to comment.