Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(deposit): synchronized failedBlocks between multiple goroutines #1472

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions mod/execution/pkg/deposit/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"github.com/berachain/beacon-kit/mod/primitives/pkg/math"
)

// defaultChannelCap defines the default capacity of the channel's buffer
const defaultChannelCap = 100

// Service represents the deposit service that processes deposit events.
type Service[
BeaconBlockT BeaconBlock[DepositT, BeaconBlockBodyT, ExecutionPayloadT],
Expand Down Expand Up @@ -60,9 +63,9 @@ type Service[
]
// metrics is the metrics for the deposit service.
metrics *metrics
// failedBlocks is a map of blocks that failed to be processed to be
// failedBlocks is a channel of blocks that failed to be processed to be
// retried.
failedBlocks map[math.U64]struct{}
failedBlocks chan math.U64
}

// NewService creates a new instance of the Service struct.
Expand Down Expand Up @@ -105,7 +108,7 @@ func NewService[
metrics: newMetrics(telemetrySink),
dc: dc,
ds: ds,
failedBlocks: make(map[math.Slot]struct{}),
failedBlocks: make(chan math.U64, defaultChannelCap),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialization of failedBlocks with defaultChannelCap is well-implemented.

Consider adding documentation on how the buffer size was determined to be 100, to aid future maintainability.

}
}

Expand Down
27 changes: 6 additions & 21 deletions mod/execution/pkg/deposit/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,11 @@ package deposit

import (
"context"
"time"

"github.com/berachain/beacon-kit/mod/primitives/pkg/events"
"github.com/berachain/beacon-kit/mod/primitives/pkg/math"
)

// defaultRetryInterval processes a deposit event.
const defaultRetryInterval = 20 * time.Second

// depositFetcher processes a deposit event.
func (s *Service[
BeaconBlockT, BeaconBlockBodyT, BlockEventT,
Expand Down Expand Up @@ -61,26 +57,17 @@ func (s *Service[
ExecutionPayloadT, SubscriptionT,
WithdrawalCredentialsT, DepositT,
]) depositCatchupFetcher(ctx context.Context) {
ticker := time.NewTicker(defaultRetryInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if len(s.failedBlocks) == 0 {
continue
}
case blockNum := <-s.failedBlocks:
s.logger.Warn(
"failed to get deposits from block(s), retrying...",
"num_blocks",
s.failedBlocks,
"block_num",
blockNum,
)

// Fetch deposits for blocks that failed to be processed.
for blockNum := range s.failedBlocks {
s.fetchAndStoreDeposits(ctx, blockNum)
}
s.fetchAndStoreDeposits(ctx, blockNum)
}
}
}
Expand All @@ -93,7 +80,7 @@ func (s *Service[
deposits, err := s.dc.ReadDeposits(ctx, blockNum)
if err != nil {
s.metrics.markFailedToGetBlockLogs(blockNum)
s.failedBlocks[blockNum] = struct{}{}
s.failedBlocks <- blockNum
return
}

Expand All @@ -106,9 +93,7 @@ func (s *Service[

if err = s.ds.EnqueueDeposits(deposits); err != nil {
s.logger.Error("Failed to store deposits", "error", err)
s.failedBlocks[blockNum] = struct{}{}
s.failedBlocks <- blockNum
return
}

delete(s.failedBlocks, blockNum)
}
Loading