Skip to content

Commit

Permalink
fix(deposit): synchronized failedBlocks between multiple goroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
htiennv committed Jun 14, 2024
1 parent a38c3ec commit 6ead64a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 24 deletions.
6 changes: 3 additions & 3 deletions mod/execution/pkg/deposit/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ type Service[
]
// metrics is the metrics for the deposit service.
metrics *metrics
// failedBlocks is a map of blocks that failed to be processed to be
// failedBlocks is a channel of blocks that failed to be processed to be
// retried.
failedBlocks map[math.U64]struct{}
failedBlocks chan math.U64
}

// NewService creates a new instance of the Service struct.
Expand Down Expand Up @@ -105,7 +105,7 @@ func NewService[
metrics: newMetrics(telemetrySink),
dc: dc,
ds: ds,
failedBlocks: make(map[math.Slot]struct{}),
failedBlocks: make(chan math.U64, 100),
}
}

Expand Down
27 changes: 6 additions & 21 deletions mod/execution/pkg/deposit/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,11 @@ package deposit

import (
"context"
"time"

"github.com/berachain/beacon-kit/mod/primitives/pkg/events"
"github.com/berachain/beacon-kit/mod/primitives/pkg/math"
)

// defaultRetryInterval processes a deposit event.
const defaultRetryInterval = 20 * time.Second

// depositFetcher processes a deposit event.
func (s *Service[
BeaconBlockT, BeaconBlockBodyT, BlockEventT,
Expand Down Expand Up @@ -61,26 +57,17 @@ func (s *Service[
ExecutionPayloadT, SubscriptionT,
WithdrawalCredentialsT, DepositT,
]) depositCatchupFetcher(ctx context.Context) {
ticker := time.NewTicker(defaultRetryInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if len(s.failedBlocks) == 0 {
continue
}
case blockNum := <-s.failedBlocks:
s.logger.Warn(
"failed to get deposits from block(s), retrying...",
"num_blocks",
s.failedBlocks,
"block_num",
blockNum,
)

// Fetch deposits for blocks that failed to be processed.
for blockNum := range s.failedBlocks {
s.fetchAndStoreDeposits(ctx, blockNum)
}
s.fetchAndStoreDeposits(ctx, blockNum)
}
}
}
Expand All @@ -93,7 +80,7 @@ func (s *Service[
deposits, err := s.dc.ReadDeposits(ctx, blockNum)
if err != nil {
s.metrics.markFailedToGetBlockLogs(blockNum)
s.failedBlocks[blockNum] = struct{}{}
s.failedBlocks <- blockNum
return
}

Expand All @@ -106,9 +93,7 @@ func (s *Service[

if err = s.ds.EnqueueDeposits(deposits); err != nil {
s.logger.Error("Failed to store deposits", "error", err)
s.failedBlocks[blockNum] = struct{}{}
s.failedBlocks <- blockNum
return
}

delete(s.failedBlocks, blockNum)
}

0 comments on commit 6ead64a

Please sign in to comment.