Skip to content

Commit

Permalink
Merge pull request #77 from InjectiveLabs/master
Browse files Browse the repository at this point in the history
v1.9.1
  • Loading branch information
albertchon authored Jan 26, 2023
2 parents ade8906 + 228fa42 commit 3eedb36
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 27 deletions.
29 changes: 28 additions & 1 deletion cmd/peggo/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ func orchestratorCmd(cmd *cli.Cmd) {
peggyAddress := ethcmn.HexToAddress(peggyParams.BridgeEthereumAddress)
injAddress := ethcmn.HexToAddress(peggyParams.CosmosCoinErc20Contract)

// Check if the provided ETH address belongs to a validator
isValidator, err := isValidatorAddress(cosmosQueryClient, ethKeyFromAddress)
if err != nil {
log.WithError(err).Fatalln("failed to query the current validator set from injective")

return
}

erc20ContractMapping := make(map[ethcmn.Address]string)
erc20ContractMapping[injAddress] = ctypes.InjectiveCoin

Expand Down Expand Up @@ -269,7 +277,7 @@ func orchestratorCmd(cmd *cli.Cmd) {
)

go func() {
if err := svc.Start(ctx); err != nil {
if err := svc.Start(ctx, isValidator); err != nil {
log.Errorln(err)

// signal there that the app failed
Expand All @@ -280,3 +288,22 @@ func orchestratorCmd(cmd *cli.Cmd) {
closer.Hold()
}
}

func isValidatorAddress(peggyQuery cosmos.PeggyQueryClient, addr ethcmn.Address) (bool, error) {
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second*30)
defer cancelFn()

currentValset, err := peggyQuery.CurrentValset(ctx)
if err != nil {
return false, err
}

var isValidator bool
for _, validator := range currentValset.Members {
if ethcmn.HexToAddress(validator.EthereumAddress) == addr {
isValidator = true
}
}

return isValidator, nil
}
65 changes: 59 additions & 6 deletions orchestrator/cosmos/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
chainclient "github.com/InjectiveLabs/sdk-go/client/chain"

"github.com/InjectiveLabs/metrics"

"github.com/InjectiveLabs/peggo/orchestrator/ethereum/keystore"
"github.com/InjectiveLabs/peggo/orchestrator/ethereum/peggy"

Expand Down Expand Up @@ -55,6 +56,7 @@ type PeggyBroadcastClient interface {
oldDeposits []*wrappers.PeggySendToCosmosEvent,
deposits []*wrappers.PeggySendToInjectiveEvent,
withdraws []*wrappers.PeggyTransactionBatchExecutedEvent,
erc20Deployed []*wrappers.PeggyERC20DeployedEvent,
valsetUpdates []*wrappers.PeggyValsetUpdatedEvent,
) error

Expand Down Expand Up @@ -274,7 +276,7 @@ func (s *peggyBroadcastClient) sendOldDepositClaims(
log.WithFields(log.Fields{
"event_nonce": oldDeposit.EventNonce.String(),
"txHash": txResponse.TxResponse.TxHash,
}).Infoln("Oracle sent old deposit event succesfully")
}).Infoln("Oracle sent old deposit event successfully")
}

return nil
Expand Down Expand Up @@ -320,7 +322,7 @@ func (s *peggyBroadcastClient) sendDepositClaims(
log.WithFields(log.Fields{
"event_nonce": deposit.EventNonce.String(),
"txHash": txResponse.TxResponse.TxHash,
}).Infoln("Oracle sent deposit event succesfully")
}).Infoln("Oracle sent deposit event successfully")
}

return nil
Expand Down Expand Up @@ -358,7 +360,7 @@ func (s *peggyBroadcastClient) sendWithdrawClaims(
log.WithFields(log.Fields{
"event_nonce": withdraw.EventNonce.String(),
"txHash": txResponse.TxResponse.TxHash,
}).Infoln("Oracle sent Withdraw event succesfully")
}).Infoln("Oracle sent Withdraw event successfully")
}

return nil
Expand Down Expand Up @@ -407,7 +409,49 @@ func (s *peggyBroadcastClient) sendValsetUpdateClaims(
log.WithFields(log.Fields{
"event_nonce": valsetUpdate.EventNonce.String(),
"txHash": txResponse.TxResponse.TxHash,
}).Infoln("Oracle sent ValsetUpdate event succesfully")
}).Infoln("Oracle sent ValsetUpdate event successfully")
}

return nil
}

func (s *peggyBroadcastClient) sendErc20DeployedClaims(
ctx context.Context,
erc20Deployed *wrappers.PeggyERC20DeployedEvent,
) error {
metrics.ReportFuncCall(s.svcTags)
doneFn := metrics.ReportFuncTiming(s.svcTags)
defer doneFn()

log.WithFields(log.Fields{
"EventNonce": erc20Deployed.EventNonce.Uint64(),
"CosmosDenom": erc20Deployed.CosmosDenom,
"TokenContract": erc20Deployed.TokenContract.Hex(),
"Name": erc20Deployed.Name,
"Symbol": erc20Deployed.Symbol,
"Decimals": erc20Deployed.Decimals,
}).Infoln("Oracle observed a erc20Deployed event. Sending MsgERC20DeployedClaim")

msg := &types.MsgERC20DeployedClaim{
EventNonce: erc20Deployed.EventNonce.Uint64(),
BlockHeight: erc20Deployed.Raw.BlockNumber,
CosmosDenom: erc20Deployed.CosmosDenom,
TokenContract: erc20Deployed.TokenContract.Hex(),
Name: erc20Deployed.Name,
Symbol: erc20Deployed.Symbol,
Decimals: uint64(erc20Deployed.Decimals),
Orchestrator: s.AccFromAddress().String(),
}

if txResponse, err := s.broadcastClient.SyncBroadcastMsg(msg); err != nil {
metrics.ReportFuncError(s.svcTags)
log.WithError(err).Errorln("broadcasting MsgERC20DeployedClaim failed")
return err
} else {
log.WithFields(log.Fields{
"event_nonce": erc20Deployed.EventNonce.String(),
"txHash": txResponse.TxResponse.TxHash,
}).Infoln("Oracle sent ERC20DeployedEvent event successfully")
}

return nil
Expand All @@ -419,14 +463,15 @@ func (s *peggyBroadcastClient) SendEthereumClaims(
oldDeposits []*wrappers.PeggySendToCosmosEvent,
deposits []*wrappers.PeggySendToInjectiveEvent,
withdraws []*wrappers.PeggyTransactionBatchExecutedEvent,
erc20Deployed []*wrappers.PeggyERC20DeployedEvent,
valsetUpdates []*wrappers.PeggyValsetUpdatedEvent,
) error {
metrics.ReportFuncCall(s.svcTags)
doneFn := metrics.ReportFuncTiming(s.svcTags)
defer doneFn()

totalClaimEvents := len(oldDeposits) + len(deposits) + len(withdraws) + len(valsetUpdates)
var count, h, i, j, k int
totalClaimEvents := len(oldDeposits) + len(deposits) + len(withdraws) + len(erc20Deployed) + len(valsetUpdates)
var count, h, i, j, k, l int

// Individual arrays (oldDeposits, deposits, withdraws, valsetUpdates) are sorted.
// Broadcast claim events sequentially starting with eventNonce = lastClaimEvent + 1.
Expand Down Expand Up @@ -464,6 +509,14 @@ func (s *peggyBroadcastClient) SendEthereumClaims(
return err
}
k++
} else if l < len(erc20Deployed) && erc20Deployed[l].EventNonce.Uint64() == lastClaimEvent+1 {
// send erc20 deployed claim
if err := s.sendErc20DeployedClaims(ctx, erc20Deployed[l]); err != nil {
metrics.ReportFuncError(s.svcTags)
log.WithError(err).Errorln("broadcasting MsgERC20DeployedClaim failed")
return err
}
l++
}
count = count + 1
lastClaimEvent = lastClaimEvent + 1
Expand Down
56 changes: 53 additions & 3 deletions orchestrator/eth_event_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
log "github.com/xlab/suplog"

"github.com/InjectiveLabs/metrics"

wrappers "github.com/InjectiveLabs/peggo/solidity/wrappers/Peggy.sol"
)

Expand Down Expand Up @@ -157,6 +158,39 @@ func (s *peggyOrchestrator) CheckForEvents(
"Withdraws": transactionBatchExecutedEvents,
}).Debugln("Scanned TransactionBatchExecuted events from Ethereum")

var erc20DeployedEvents []*wrappers.PeggyERC20DeployedEvent
{
iter, err := peggyFilterer.FilterERC20DeployedEvent(&bind.FilterOpts{
Start: startingBlock,
End: &currentBlock,
}, nil)
if err != nil {
metrics.ReportFuncError(s.svcTags)
log.WithFields(log.Fields{
"start": startingBlock,
"end": currentBlock,
}).Errorln("failed to scan past FilterERC20Deployed events from Ethereum")

if !isUnknownBlockErr(err) {
err = errors.Wrap(err, "failed to scan past FilterERC20Deployed events from Ethereum")
return 0, err
} else if iter == nil {
return 0, errors.New("no iterator returned")
}
}

for iter.Next() {
erc20DeployedEvents = append(erc20DeployedEvents, iter.Event)
}

iter.Close()
}
log.WithFields(log.Fields{
"start": startingBlock,
"end": currentBlock,
"erc20Deployed": erc20DeployedEvents,
}).Debugln("Scanned FilterERC20Deployed events from Ethereum")

var valsetUpdatedEvents []*wrappers.PeggyValsetUpdatedEvent
{
iter, err := peggyFilterer.FilterValsetUpdatedEvent(&bind.FilterOpts{
Expand Down Expand Up @@ -191,7 +225,7 @@ func (s *peggyOrchestrator) CheckForEvents(
"valsetUpdates": valsetUpdatedEvents,
}).Debugln("Scanned ValsetUpdatedEvents events from Ethereum")

// note that starting block overlaps with our last che cked block, because we have to deal with
// note that starting block overlaps with our last checked block, because we have to deal with
// the possibility that the relayer was killed after relaying only one of multiple events in a single
// block, so we also need this routine so make sure we don't send in the first event in this hypothetical
// multi event block again. In theory we only send all events for every block and that will pass of fail
Expand All @@ -206,11 +240,12 @@ func (s *peggyOrchestrator) CheckForEvents(
oldDeposits := filterSendToCosmosEventsByNonce(sendToCosmosEvents, lastClaimEvent.EthereumEventNonce)
deposits := filterSendToInjectiveEventsByNonce(sendToInjectiveEvents, lastClaimEvent.EthereumEventNonce)
withdraws := filterTransactionBatchExecutedEventsByNonce(transactionBatchExecutedEvents, lastClaimEvent.EthereumEventNonce)
erc20Deployments := filterERC20DeployedEventsByNonce(erc20DeployedEvents, lastClaimEvent.EthereumEventNonce)
valsetUpdates := filterValsetUpdateEventsByNonce(valsetUpdatedEvents, lastClaimEvent.EthereumEventNonce)

if len(oldDeposits) > 0 || len(deposits) > 0 || len(withdraws) > 0 || len(valsetUpdates) > 0 {
if len(oldDeposits) > 0 || len(deposits) > 0 || len(withdraws) > 0 || len(erc20Deployments) > 0 || len(valsetUpdates) > 0 {
// todo get eth chain id from the chain
if err := s.peggyBroadcastClient.SendEthereumClaims(ctx, lastClaimEvent.EthereumEventNonce, oldDeposits, deposits, withdraws, valsetUpdates); err != nil {
if err := s.peggyBroadcastClient.SendEthereumClaims(ctx, lastClaimEvent.EthereumEventNonce, oldDeposits, deposits, withdraws, erc20Deployments, valsetUpdates); err != nil {
metrics.ReportFuncError(s.svcTags)
err = errors.Wrap(err, "failed to send ethereum claims to Cosmos chain")
return 0, err
Expand Down Expand Up @@ -265,6 +300,21 @@ func filterTransactionBatchExecutedEventsByNonce(
return res
}

func filterERC20DeployedEventsByNonce(
events []*wrappers.PeggyERC20DeployedEvent,
nonce uint64,
) []*wrappers.PeggyERC20DeployedEvent {
res := make([]*wrappers.PeggyERC20DeployedEvent, 0, len(events))

for _, ev := range events {
if ev.EventNonce.Uint64() > nonce {
res = append(res, ev)
}
}

return res
}

func filterValsetUpdateEventsByNonce(
events []*wrappers.PeggyValsetUpdatedEvent,
nonce uint64,
Expand Down
61 changes: 45 additions & 16 deletions orchestrator/main_loops.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,14 @@ const defaultLoopDur = 60 * time.Second

// Start combines the all major roles required to make
// up the Orchestrator, all of these are async loops.
func (s *peggyOrchestrator) Start(ctx context.Context) error {
var pg loops.ParanoidGroup

pg.Go(func() error {
return s.EthOracleMainLoop(ctx)
})
pg.Go(func() error {
return s.BatchRequesterLoop(ctx)
})
pg.Go(func() error {
return s.EthSignerMainLoop(ctx)
})
pg.Go(func() error {
return s.RelayerMainLoop(ctx)
})
func (s *peggyOrchestrator) Start(ctx context.Context, validatorMode bool) error {
if !validatorMode {
log.Infoln("Starting peggo in relayer (non-validator) mode")
return s.startRelayerMode(ctx)
}

return pg.Wait()
log.Infoln("Starting peggo in validator mode")
return s.startValidatorMode(ctx)
}

// EthOracleMainLoop is responsible for making sure that Ethereum events are retrieved from the Ethereum blockchain
Expand Down Expand Up @@ -395,3 +386,41 @@ func calculateTotalValsetPower(valset *types.Valset) *big.Int {

return totalValsetPower
}

// startValidatorMode runs all orchestrator processes. This is called
// when peggo is run alongside a validator injective node.
func (s *peggyOrchestrator) startValidatorMode(ctx context.Context) error {
var pg loops.ParanoidGroup

pg.Go(func() error {
return s.EthOracleMainLoop(ctx)
})
pg.Go(func() error {
return s.BatchRequesterLoop(ctx)
})
pg.Go(func() error {
return s.EthSignerMainLoop(ctx)
})
pg.Go(func() error {
return s.RelayerMainLoop(ctx)
})

return pg.Wait()
}

// startRelayerMode runs orchestrator processes that only relay specific
// messages that do not require a validator's signature. This mode is run
// alongside a non-validator injective node
func (s *peggyOrchestrator) startRelayerMode(ctx context.Context) error {
var pg loops.ParanoidGroup

pg.Go(func() error {
return s.BatchRequesterLoop(ctx)
})

pg.Go(func() error {
return s.RelayerMainLoop(ctx)
})

return pg.Wait()
}
2 changes: 1 addition & 1 deletion orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type PeggyOrchestrator interface {
Start(ctx context.Context) error
Start(ctx context.Context, validatorMode bool) error

CheckForEvents(ctx context.Context, startingBlock uint64) (currentBlock uint64, err error)
GetLastCheckedBlock(ctx context.Context) (uint64, error)
Expand Down

0 comments on commit 3eedb36

Please sign in to comment.