Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eventindexer): support multiple swap pairs #14130

Merged
merged 5 commits into from
Jul 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/eventindexer/.l2.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ MYSQL_MAX_IDLE_CONNS=50
MYSQL_MAX_OPEN_CONNS=3000
MYSQL_CONN_MAX_LIFETIME_IN_MS=100000
PROVER_POOL_ADDRESS=0x7D992599E1B8b4508Ba6E2Ba97893b4C36C23A28
SWAP_ADDRESS=0x501f63210aE6D7Eeb50DaE74DA5Ae407515ee246
SWAP_ADDRESSES=0x501f63210aE6D7Eeb50DaE74DA5Ae407515ee246,0x926815A3fb587DDF5e2d2A03ea235630c0A53a16,0x2223D60359736532958DF6a4E9A5e4A5a71729A1
RPC_URL=wss://ws.test.taiko.xyz
CORS_ORIGINS=*
BLOCK_BATCH_SIZE=1000
Expand Down
14 changes: 13 additions & 1 deletion packages/eventindexer/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func Run(
RPCClient: rpcClient,
SrcTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")),
SrcBridgeAddress: common.HexToAddress(os.Getenv("BRIDGE_ADDRESS")),
SrcSwapAddress: common.HexToAddress(os.Getenv("SWAP_ADDRESS")),
SrcSwapAddresses: stringsToAddresses(strings.Split(os.Getenv("SWAP_ADDRESSES"), ",")),
BlockBatchSize: uint64(blockBatchSize),
SubscriptionBackoff: subscriptionBackoff,
})
Expand All @@ -158,6 +158,18 @@ func Run(
<-forever
}

func stringsToAddresses(s []string) []common.Address {
a := []common.Address{}

for _, v := range s {
if v != "" {
a = append(a, common.HexToAddress(v))
}
}

return a
}

func openDBConnection(opts eventindexer.DBConnectionOpts) (eventindexer.DB, error) {
dsn := ""
if opts.Password == "" {
Expand Down
20 changes: 11 additions & 9 deletions packages/eventindexer/indexer/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,18 @@ func L2FilterFunc(
svc *Service,
filterOpts *bind.FilterOpts,
) error {
swaps, err := svc.swap.FilterSwap(filterOpts, nil, nil)
if err != nil {
return errors.Wrap(err, "svc.bridge.FilterSwap")
}
for _, s := range svc.swaps {
swaps, err := s.FilterSwap(filterOpts, nil, nil)
if err != nil {
return errors.Wrap(err, "svc.bridge.FilterSwap")
}

// only save ones above 0.01 ETH, this is only for Galaxe
// and we dont care about the rest
err = svc.saveSwapEvents(ctx, chainID, swaps)
if err != nil {
return errors.Wrap(err, "svc.saveSwapEvents")
// only save ones above 0.01 ETH, this is only for Galaxe
// and we dont care about the rest
err = svc.saveSwapEvents(ctx, chainID, swaps)
if err != nil {
return errors.Wrap(err, "svc.saveSwapEvents")
}
}

return nil
Expand Down
22 changes: 22 additions & 0 deletions packages/eventindexer/indexer/save_message_sent_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ import (
"encoding/json"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/taikoxyz/taiko-mono/packages/eventindexer"
"github.com/taikoxyz/taiko-mono/packages/eventindexer/contracts/bridge"
)

var (
minEthAmount = new(big.Int).SetUint64(150000000000000000)
zeroHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
)

func (svc *Service) saveMessageSentEvents(
ctx context.Context,
chainID *big.Int,
Expand Down Expand Up @@ -43,6 +49,22 @@ func (svc *Service) saveMessageSentEvent(
chainID *big.Int,
event *bridge.BridgeMessageSent,
) error {
// only save eth transfers
if event.Message.Data != nil && common.BytesToHash(event.Message.Data) != zeroHash {
log.Info("skipping message sent event, is not eth transfer")
return nil
}

// amount must be >= 0.15 eth
if event.Message.DepositValue.Cmp(minEthAmount) < 0 {
log.Infof("skipping message sent event, value: %v, requiredValue: %v",
event.Message.DepositValue.String(),
minEthAmount.String(),
)

return nil
}

marshaled, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "json.Marshal(event)")
Expand Down
20 changes: 12 additions & 8 deletions packages/eventindexer/indexer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Service struct {

taikol1 *taikol1.TaikoL1
bridge *bridge.Bridge
swap *swap.Swap
swaps []*swap.Swap
}

type NewServiceOpts struct {
Expand All @@ -41,7 +41,7 @@ type NewServiceOpts struct {
RPCClient *rpc.Client
SrcTaikoAddress common.Address
SrcBridgeAddress common.Address
SrcSwapAddress common.Address
SrcSwapAddresses []common.Address
BlockBatchSize uint64
SubscriptionBackoff time.Duration
}
Expand Down Expand Up @@ -79,12 +79,16 @@ func NewService(opts NewServiceOpts) (*Service, error) {
}
}

var swapContract *swap.Swap
var swapContracts []*swap.Swap

if opts.SrcSwapAddress.Hex() != ZeroAddress.Hex() {
swapContract, err = swap.NewSwap(opts.SrcSwapAddress, opts.EthClient)
if err != nil {
return nil, errors.Wrap(err, "contracts.NewBridge")
if opts.SrcSwapAddresses != nil && len(opts.SrcSwapAddresses) > 0 {
for _, v := range opts.SrcSwapAddresses {
swapContract, err := swap.NewSwap(v, opts.EthClient)
if err != nil {
return nil, errors.Wrap(err, "contracts.NewBridge")
}

swapContracts = append(swapContracts, swapContract)
}
}

Expand All @@ -95,7 +99,7 @@ func NewService(opts NewServiceOpts) (*Service, error) {
ethClient: opts.EthClient,
taikol1: taikoL1,
bridge: bridgeContract,
swap: swapContract,
swaps: swapContracts,

blockBatchSize: opts.BlockBatchSize,
subscriptionBackoff: opts.SubscriptionBackoff,
Expand Down
12 changes: 7 additions & 5 deletions packages/eventindexer/indexer/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {
go svc.subscribeMessageSent(ctx, chainID, errChan)
}

if svc.swap != nil {
go svc.subscribeSwap(ctx, chainID, errChan)
if svc.swaps != nil {
for _, swap := range svc.swaps {
go svc.subscribeSwap(ctx, swap, chainID, errChan)
}
}

// nolint: gosimple
Expand Down Expand Up @@ -306,16 +308,16 @@ func (svc *Service) subscribeMessageSent(ctx context.Context, chainID *big.Int,
}
}

func (svc *Service) subscribeSwap(ctx context.Context, chainID *big.Int, errChan chan error) {
func (svc *Service) subscribeSwap(ctx context.Context, s *swap.Swap, chainID *big.Int, errChan chan error) {
sink := make(chan *swap.SwapSwap)

sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
log.Errorf("svc.swap.WatchSwap: %v", err)
log.Errorf("s.WatchSwap: %v", err)
}
log.Info("resubscribing to Swap events")

return svc.swap.WatchSwap(&bind.WatchOpts{
return s.WatchSwap(&bind.WatchOpts{
Context: ctx,
}, sink, nil, nil)
})
Expand Down