Skip to content

Commit

Permalink
chore(relayer): fix some carps (#17681)
Browse files Browse the repository at this point in the history
  • Loading branch information
mask-pp authored Jun 26, 2024
1 parent 10b95e1 commit f095698
Show file tree
Hide file tree
Showing 13 changed files with 20 additions and 107 deletions.
5 changes: 2 additions & 3 deletions packages/relayer/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type API struct {
srv *http.Server
httpPort uint64
ctx context.Context
wg *sync.WaitGroup
wg sync.WaitGroup
srcEthClient *ethclient.Client
}

Expand Down Expand Up @@ -84,7 +84,6 @@ func InitFromConfig(ctx context.Context, api *API, cfg *Config) (err error) {
api.srv = srv
api.httpPort = cfg.HTTPPort
api.ctx = ctx
api.wg = &sync.WaitGroup{}
api.srcEthClient = srcEthClient

return nil
Expand Down Expand Up @@ -112,7 +111,7 @@ func (api *API) Start() error {

go func() {
if err := backoff.Retry(func() error {
return utils.ScanBlocks(api.ctx, api.srcEthClient, api.wg)
return utils.ScanBlocks(api.ctx, api.srcEthClient, &api.wg)
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("scan blocks backoff retry", "error", err)
}
Expand Down
7 changes: 1 addition & 6 deletions packages/relayer/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,13 @@ type Bridge struct {
srcBridge relayer.Bridge
destBridge relayer.Bridge

mu *sync.Mutex

addr common.Address

backOffRetryInterval time.Duration
backOffMaxRetries uint64
ethClientTimeout time.Duration

wg *sync.WaitGroup
wg sync.WaitGroup

srcChainId *big.Int
destChainId *big.Int
Expand Down Expand Up @@ -121,9 +119,6 @@ func InitFromConfig(ctx context.Context, b *Bridge, cfg *Config) error {
b.srcChainId = srcChainID
b.destChainId = destChainID

b.wg = &sync.WaitGroup{}
b.mu = &sync.Mutex{}

b.backOffRetryInterval = time.Duration(cfg.BackoffRetryInterval) * time.Second
b.backOffMaxRetries = cfg.BackOffMaxRetrys
b.ethClientTimeout = time.Duration(cfg.ETHClientTimeout) * time.Second
Expand Down
28 changes: 0 additions & 28 deletions packages/relayer/db.go

This file was deleted.

18 changes: 4 additions & 14 deletions packages/relayer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type Indexer struct {

ethClientTimeout time.Duration

wg *sync.WaitGroup
wg sync.WaitGroup

numLatestBlocksEndWhenCrawling uint64
numLatestBlocksStartWhenCrawling uint64
Expand All @@ -123,8 +123,6 @@ type Indexer struct {

ctx context.Context

mu *sync.Mutex

eventName string

cfg *Config
Expand Down Expand Up @@ -230,17 +228,13 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) (err error) {
i.syncMode = cfg.SyncMode
i.watchMode = cfg.WatchMode

i.wg = &sync.WaitGroup{}

i.ethClientTimeout = time.Duration(cfg.ETHClientTimeout) * time.Second

i.numLatestBlocksEndWhenCrawling = cfg.NumLatestBlocksEndWhenCrawling
i.numLatestBlocksStartWhenCrawling = cfg.NumLatestBlocksStartWhenCrawling

i.targetBlockNumber = cfg.TargetBlockNumber

i.mu = &sync.Mutex{}

i.eventName = cfg.EventName

i.cfg = cfg
Expand Down Expand Up @@ -280,13 +274,11 @@ func (i *Indexer) Start() error {
return errors.Wrap(err, "i.setInitialIndexingBlockByMode")
}

i.wg.Add(1)

go i.eventLoop(i.ctx, i.latestIndexedBlockNumber)

go func() {
if err := backoff.Retry(func() error {
return utils.ScanBlocks(i.ctx, i.srcEthClient, i.wg)
return utils.ScanBlocks(i.ctx, i.srcEthClient, &i.wg)
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("scan blocks backoff retry", "error", err)
}
Expand All @@ -296,12 +288,10 @@ func (i *Indexer) Start() error {
}

func (i *Indexer) eventLoop(ctx context.Context, startBlockID uint64) {
i.wg.Add(1)
defer i.wg.Done()

var d time.Duration = 10 * time.Second

t := time.NewTicker(d)

t := time.NewTicker(10 * time.Second)
defer t.Stop()

for {
Expand Down
4 changes: 0 additions & 4 deletions packages/relayer/indexer/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package indexer

import (
"context"
"sync"
"time"

"github.com/taikoxyz/taiko-mono/packages/relayer"
Expand All @@ -28,15 +27,12 @@ func newTestService(syncMode SyncMode, watchMode WatchMode) (*Indexer, relayer.B
syncMode: syncMode,
watchMode: watchMode,

wg: &sync.WaitGroup{},

ctx: context.Background(),

srcChainId: mock.MockChainID,
destChainId: mock.MockChainID,

ethClientTimeout: 10 * time.Second,
mu: &sync.Mutex{},
eventName: relayer.EventNameMessageSent,
}, b
}
5 changes: 1 addition & 4 deletions packages/relayer/pkg/utils/scan_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ type headSubscriber interface {

func ScanBlocks(ctx context.Context, ethClient headSubscriber, wg *sync.WaitGroup) error {
wg.Add(1)

defer func() {
wg.Done()
}()
defer wg.Done()

headers := make(chan *types.Header)

Expand Down
20 changes: 6 additions & 14 deletions packages/relayer/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ type Processor struct {

prover *proof.Prover

mu *sync.Mutex

relayerAddr common.Address
srcSignalServiceAddress common.Address

Expand All @@ -121,7 +119,7 @@ type Processor struct {

msgCh chan queue.Message

wg *sync.WaitGroup
wg sync.WaitGroup

srcChainId *big.Int
destChainId *big.Int
Expand All @@ -137,7 +135,7 @@ type Processor struct {
maxMessageRetries uint64

processingTxHashes map[common.Hash]bool
processingTxHashMu *sync.Mutex
processingTxHashMu sync.Mutex
}

// InitFromCli creates a new processor from a cli context
Expand Down Expand Up @@ -363,8 +361,6 @@ func InitFromConfig(ctx context.Context, p *Processor, cfg *Config) error {
p.srcSignalServiceAddress = cfg.SrcSignalServiceAddress

p.msgCh = make(chan queue.Message)
p.wg = &sync.WaitGroup{}
p.mu = &sync.Mutex{}
p.srcCaller = srcRpcClient

p.backOffRetryInterval = time.Duration(cfg.BackoffRetryInterval) * time.Second
Expand All @@ -376,7 +372,6 @@ func InitFromConfig(ctx context.Context, p *Processor, cfg *Config) error {
p.maxMessageRetries = cfg.MaxMessageRetries

p.processingTxHashes = make(map[common.Hash]bool, 0)
p.processingTxHashMu = &sync.Mutex{}

return nil
}
Expand Down Expand Up @@ -418,7 +413,7 @@ func (p *Processor) Start() error {
go func() {
if err := backoff.Retry(func() error {
slog.Info("attempting backoff queue subscription")
if err := p.queue.Subscribe(ctx, p.msgCh, p.wg); err != nil {
if err := p.queue.Subscribe(ctx, p.msgCh, &p.wg); err != nil {
slog.Error("processor queue subscription error", "err", err.Error())
return err
}
Expand All @@ -429,13 +424,11 @@ func (p *Processor) Start() error {
}
}()

p.wg.Add(1)

go p.eventLoop(ctx)

go func() {
if err := backoff.Retry(func() error {
return utils.ScanBlocks(ctx, p.srcEthClient, p.wg)
return utils.ScanBlocks(ctx, p.srcEthClient, &p.wg)
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("scan blocks backoff retry", "error", err)
}
Expand All @@ -451,9 +444,8 @@ func (p *Processor) queueName() string {
// eventLoop is the main event loop of a Processor which should read
// messages from a queue and then process them.
func (p *Processor) eventLoop(ctx context.Context) {
defer func() {
p.wg.Done()
}()
p.wg.Add(1)
defer p.wg.Done()

for {
select {
Expand Down
3 changes: 0 additions & 3 deletions packages/relayer/processor/processor_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package processor

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -28,7 +27,6 @@ func newTestProcessor(profitableOnly bool) *Processor {
destEthClient: &mock.EthClient{},
destERC20Vault: &mock.TokenVault{},
srcSignalService: &mock.SignalService{},
mu: &sync.Mutex{},
ecdsaKey: privateKey,
prover: prover,
srcCaller: &mock.Caller{},
Expand All @@ -49,6 +47,5 @@ func newTestProcessor(profitableOnly bool) *Processor {
maxMessageRetries: 5,
destQuotaManager: &mock.QuotaManager{},
processingTxHashes: make(map[common.Hash]bool, 0),
processingTxHashMu: &sync.Mutex{},
}
}
2 changes: 1 addition & 1 deletion packages/relayer/scripts/abigen.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#/bin/sh
#!/bin/sh

if [ ! -d "../protocol/out" ]; then
echo "ABI not generated in protocol package yet. Please run npm install && npx hardhat compile in ../protocol"
Expand Down
4 changes: 2 additions & 2 deletions packages/relayer/scripts/swagger.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#/bin/sh
#!/bin/sh

swag init -g server.go -d pkg/http --parseDependency
swag init -g server.go -d pkg/http --parseDependency
11 changes: 0 additions & 11 deletions packages/relayer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,6 @@ var (
ZeroAddress = common.HexToAddress("0x0000000000000000000000000000000000000000")
)

// IsInSlice determines whether v is in slice s
func IsInSlice[T comparable](v T, s []T) bool {
for _, e := range s {
if v == e {
return true
}
}

return false
}

type confirmer interface {
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
BlockNumber(ctx context.Context) (uint64, error)
Expand Down
10 changes: 0 additions & 10 deletions packages/relayer/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_IsInSlice(t *testing.T) {
if IsInSlice("fake", []string{}) {
t.Fatal()
}

if !IsInSlice("real", []string{"real"}) {
t.Fatal()
}
}

type mockConfirmer struct {
}

Expand Down
10 changes: 3 additions & 7 deletions packages/relayer/watchdog/watchdog.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ type Watchdog struct {
srcBridge relayer.Bridge
destBridge relayer.Bridge

mu *sync.Mutex

watchdogAddr common.Address

confirmations uint64
Expand All @@ -80,7 +78,7 @@ type Watchdog struct {

msgCh chan queue.Message

wg *sync.WaitGroup
wg sync.WaitGroup

srcChainId *big.Int
destChainId *big.Int
Expand Down Expand Up @@ -194,8 +192,6 @@ func InitFromConfig(ctx context.Context, w *Watchdog, cfg *Config) error {
w.confirmations = cfg.Confirmations

w.msgCh = make(chan queue.Message)
w.wg = &sync.WaitGroup{}
w.mu = &sync.Mutex{}

w.backOffRetryInterval = time.Duration(cfg.BackoffRetryInterval) * time.Second
w.backOffMaxRetries = cfg.BackOffMaxRetrys
Expand Down Expand Up @@ -230,7 +226,7 @@ func (w *Watchdog) Start() error {
go func() {
if err := backoff.Retry(func() error {
slog.Info("attempting backoff queue subscription")
if err := w.queue.Subscribe(ctx, w.msgCh, w.wg); err != nil {
if err := w.queue.Subscribe(ctx, w.msgCh, &w.wg); err != nil {
slog.Error("processor queue subscription error", "err", err.Error())
return err
}
Expand All @@ -247,7 +243,7 @@ func (w *Watchdog) Start() error {

go func() {
if err := backoff.Retry(func() error {
return utils.ScanBlocks(ctx, w.srcEthClient, w.wg)
return utils.ScanBlocks(ctx, w.srcEthClient, &w.wg)
}, backoff.NewConstantBackOff(5*time.Second)); err != nil {
slog.Error("scan blocks backoff retry", "error", err)
}
Expand Down

0 comments on commit f095698

Please sign in to comment.