Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/bridge: fix bridge integration in stage mode #11646

Merged
merged 50 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
4797124
polygon/bridge: fix synhronisation issues
taratorio Aug 16, 2024
80264ac
err if signal channel closed
taratorio Aug 16, 2024
d36af91
tidy log line
taratorio Aug 16, 2024
ae11124
rename to blockNum
taratorio Aug 18, 2024
c9e614a
Merge branch 'main' of github.com:ledgerwatch/erigon into astrid-brid…
taratorio Aug 19, 2024
63fa8f5
Merge branch 'main' of github.com:ledgerwatch/erigon into astrid-brid…
taratorio Aug 19, 2024
f5bdefb
Merge branch 'astrid-bridge-synchronisation-fix' of github.com:erigon…
taratorio Aug 19, 2024
0a0f213
Merge branch 'astrid-bridge-synchronisation-fix' of github.com:ledger…
taratorio Aug 19, 2024
ebec670
tidy
taratorio Aug 19, 2024
86936a8
code review comments
taratorio Aug 19, 2024
c2fde77
rename to BlockEventIDRange for better clarity
taratorio Aug 19, 2024
59045a7
Update fetchAndWriteHeimdallStateSyncEvents, EventsByBlock, UnwindEvents
shohamc1 Aug 19, 2024
d3e4aaf
fix startup and tests
taratorio Aug 19, 2024
d7a4812
tidy comment
taratorio Aug 19, 2024
9518b9d
print event debug info every 30 secs
taratorio Aug 19, 2024
c6c2645
fix astrid sync stage
taratorio Aug 19, 2024
c46673a
fix windows test & name tidy
taratorio Aug 19, 2024
d4e6f4f
add LastFrozenEventBlockNum
taratorio Aug 20, 2024
b1fa5cf
add more replay logic
taratorio Aug 20, 2024
9ab2005
Fix snapshot generation
shohamc1 Aug 20, 2024
b0528bb
Fix unwind
shohamc1 Aug 21, 2024
ab17de3
Update tests
shohamc1 Aug 21, 2024
d83df94
Rewrite UnwindEvents
shohamc1 Aug 21, 2024
5e0dc06
Fix prune
shohamc1 Aug 21, 2024
bc445b7
Remove print
shohamc1 Aug 21, 2024
1d6e40e
wip
taratorio Aug 21, 2024
fa63b74
Merge branch 'main' of github.com:erigontech/erigon into astrid-bridg…
taratorio Aug 21, 2024
056bbf5
Prev
shohamc1 Aug 21, 2024
6b2ae41
Set eventIdTo to 0
shohamc1 Aug 22, 2024
b83ba68
Use Next
shohamc1 Aug 22, 2024
66f3b59
Fix firstKeyGetter
shohamc1 Aug 22, 2024
46d3961
more debugging
taratorio Aug 22, 2024
6f1bedf
Merge branch 'main' of github.com:ledgerwatch/erigon into astrid-brid…
taratorio Aug 22, 2024
abe21b1
Merge branch 'flip-boreventnums' of github.com:ledgerwatch/erigon int…
taratorio Aug 22, 2024
9d3c54b
revert block replays with bridge
taratorio Aug 22, 2024
d9ccb6f
Revert "revert block replays with bridge"
taratorio Aug 22, 2024
e10703b
wip
taratorio Aug 23, 2024
7346253
Merge branch 'main' of github.com:ledgerwatch/erigon into astrid-brid…
taratorio Aug 23, 2024
1626810
remove last processed event id in mem var
taratorio Aug 23, 2024
26db6be
remove chain reader hack
taratorio Aug 23, 2024
3ed371d
revert debugging code
taratorio Aug 23, 2024
d2534ab
remove playground
taratorio Aug 23, 2024
828c038
remove snapshots version override
taratorio Aug 23, 2024
768fc50
move put processed block info last
taratorio Aug 23, 2024
4ad2ef5
fix bridge tests
taratorio Aug 23, 2024
93ab2c5
add sequential check guards to process new blocks
taratorio Aug 23, 2024
ff39f8f
Merge branch 'main' of github.com:ledgerwatch/erigon into astrid-brid…
taratorio Aug 23, 2024
e7dac3c
Merge branch 'main' of github.com:ledgerwatch/erigon into astrid-brid…
taratorio Aug 23, 2024
ba04f4f
remove debugging print
taratorio Aug 23, 2024
355d7d2
fix fill gap with execution on initial replay
taratorio Aug 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_polygon_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,13 +1132,13 @@ func (s polygonSyncStageBridgeStore) LastEventIDWithinWindow(ctx context.Context
return r.id, nil
}

func (s polygonSyncStageBridgeStore) PutEventIDs(ctx context.Context, eventMap map[uint64]uint64) error {
func (s polygonSyncStageBridgeStore) PutBlockNumToEventID(ctx context.Context, blockNumToEventId map[uint64]uint64) error {
type response struct {
err error
}

r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error {
responseStream <- response{err: bridge.PutEventsIDs(tx, eventMap)}
responseStream <- response{err: bridge.PutBlockNumToEventID(tx, blockNumToEventId)}
return nil
})
if err != nil {
Expand Down
261 changes: 205 additions & 56 deletions polygon/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package bridge
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"

Expand Down Expand Up @@ -54,6 +53,8 @@ func NewBridge(store Store, logger log.Logger, borConfig *borcfg.BorConfig, even
borConfig: borConfig,
eventFetcher: eventFetcher,
stateReceiverContractAddress: libcommon.HexToAddress(borConfig.StateReceiverContract),
fetchedEventsSignal: make(chan struct{}),
processedBlockSignal: make(chan struct{}),
}
}

Expand All @@ -64,20 +65,27 @@ type Bridge struct {
eventFetcher eventFetcher
stateReceiverContractAddress libcommon.Address
// internal state
ready atomic.Bool
reachedTip atomic.Bool
fetchedEventsSignal chan struct{}
lastFetchedEventTime atomic.Uint64
processedBlockSignal chan struct{}
lastProcessedBlockNumber atomic.Uint64
lastProcessedBlockTime atomic.Uint64
lastProcessedEventID atomic.Uint64
}

func (b *Bridge) Run(ctx context.Context) error {
defer close(b.fetchedEventsSignal)
defer close(b.processedBlockSignal)

err := b.store.Prepare(ctx)
if err != nil {
return err
}
defer b.Close()

// get last known sync ID
lastEventID, err := b.store.LatestEventID(ctx)
lastFetchedEventID, err := b.store.LatestEventID(ctx)
if err != nil {
return err
}
Expand All @@ -90,7 +98,11 @@ func (b *Bridge) Run(ctx context.Context) error {
b.lastProcessedEventID.Store(lastProcessedEventID)

// start syncing
b.logger.Debug(bridgeLogPrefix("Bridge is running"), "lastEventID", lastEventID)
b.logger.Debug(
bridgeLogPrefix("running bridge component"),
"lastFetchedEventID", lastFetchedEventID,
"lastProcessedEventID", lastProcessedEventID,
)

for {
select {
Expand All @@ -99,28 +111,49 @@ func (b *Bridge) Run(ctx context.Context) error {
default:
}

// get all events from last sync ID to now
// start scrapping events
to := time.Now()
events, err := b.eventFetcher.FetchStateSyncEvents(ctx, lastEventID+1, to, 0)
events, err := b.eventFetcher.FetchStateSyncEvents(ctx, lastFetchedEventID+1, to, 50)
if err != nil {
return err
}

if len(events) != 0 {
b.ready.Store(false)
if err := b.store.PutEvents(ctx, events); err != nil {
if len(events) == 0 {
// we've reached the tip
b.reachedTip.Store(true)
b.signalFetchedEvents()
if err := libcommon.Sleep(ctx, time.Second); err != nil {
return err
}

lastEventID = events[len(events)-1].ID
} else {
b.ready.Store(true)
if err := libcommon.Sleep(ctx, 30*time.Second); err != nil {
return err
}
continue
}

b.logger.Debug(bridgeLogPrefix(fmt.Sprintf("got %v new events, last event ID: %v, ready: %v", len(events), lastEventID, b.ready.Load())))
// we've received new events
b.reachedTip.Store(false)
if err := b.store.PutEvents(ctx, events); err != nil {
return err
}

lastFetchedEvent := events[len(events)-1]
lastFetchedEventID = lastFetchedEvent.ID

lastFetchedEventTime := lastFetchedEvent.Time.Unix()
if lastFetchedEventTime < 0 {
// be defensive when casting from int64 to uint64
panic(errors.New("lastFetchedEventTime cannot be negative"))
}

b.lastFetchedEventTime.Store(uint64(lastFetchedEventTime))

b.logger.Debug(
bridgeLogPrefix("fetched new events"),
"count", len(events),
"lastFetchedEventID", lastFetchedEventID,
"lastFetchedEventTime", lastFetchedEvent.Time.Format(time.RFC3339),
)

b.signalFetchedEvents()
}
}

Expand All @@ -134,76 +167,72 @@ func (b *Bridge) ProcessNewBlocks(ctx context.Context, blocks []*types.Block) er
return nil
}

if err := b.Synchronize(ctx, blocks[len(blocks)-1].NumberU64()); err != nil {
return err
}

eventMap := make(map[uint64]uint64)
txMap := make(map[libcommon.Hash]uint64)
var prevSprintTime time.Time

blockNumToEventId := make(map[uint64]uint64)
eventTxnToBlockNum := make(map[libcommon.Hash]uint64)
for _, block := range blocks {
// check if block is start of span
blockNum := block.NumberU64()
if !b.isSprintStart(blockNum) {
continue
}

var timeLimit time.Time
if b.borConfig.IsIndore(blockNum) {
stateSyncDelay := b.borConfig.CalculateStateSyncDelay(blockNum)
timeLimit = time.Unix(int64(block.Time()-stateSyncDelay), 0)
} else {
timeLimit = prevSprintTime
blockTime := block.Time()
toTime, err := b.blockEventsWindowTimeEnd(blockNum, blockTime)
if err != nil {
return err
}

prevSprintTime = time.Unix(int64(block.Time()), 0)
if err = b.waitForScrapper(ctx, toTime); err != nil {
return err
}

lastID, err := b.store.LastEventIDWithinWindow(ctx, b.lastProcessedEventID.Load(), timeLimit)
startID := b.lastProcessedEventID.Load() + 1
endID, err := b.store.LastEventIDWithinWindow(ctx, startID, time.Unix(int64(toTime), 0))
if err != nil {
return err
}

if lastID > b.lastProcessedEventID.Load() {
b.logger.Debug(bridgeLogPrefix(fmt.Sprintf("Creating map for block %d, start ID %d, end ID %d", blockNum, b.lastProcessedEventID.Load(), lastID)))

k := bortypes.ComputeBorTxHash(blockNum, block.Hash())
eventMap[blockNum] = b.lastProcessedEventID.Load()
txMap[k] = blockNum

b.lastProcessedEventID.Store(lastID)
if endID > 0 {
b.logger.Debug(
bridgeLogPrefix("mapping events to block"),
"blockNum", blockNum,
"start", startID,
"end", endID,
)

eventTxnHash := bortypes.ComputeBorTxHash(blockNum, block.Hash())
eventTxnToBlockNum[eventTxnHash] = blockNum
blockNumToEventId[blockNum] = startID
b.lastProcessedEventID.Store(endID)
}

b.lastProcessedBlockNumber.Store(blockNum)
b.lastProcessedBlockTime.Store(blockTime)
b.signalProcessedBlock()
}

err := b.store.PutEventIDs(ctx, eventMap)
err := b.store.PutBlockNumToEventID(ctx, blockNumToEventId)
if err != nil {
return err
}

err = b.store.PutEventTxnToBlockNum(ctx, txMap)
err = b.store.PutEventTxnToBlockNum(ctx, eventTxnToBlockNum)
if err != nil {
return err
}

return nil
}

// Synchronize blocks till bridge has map at tip
// Synchronize blocks until events up to a given block are processed.
func (b *Bridge) Synchronize(ctx context.Context, blockNum uint64) error {
b.logger.Debug(bridgeLogPrefix("synchronizing events..."), "blockNum", blockNum)
b.logger.Debug(
bridgeLogPrefix("synchronizing events..."),
"blockNum", blockNum,
"lastProcessedBlockNumber", b.lastProcessedBlockNumber.Load(),
)

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if b.ready.Load() || b.lastProcessedBlockNumber.Load() >= blockNum {
return nil
}
}
return b.waitForProcessedBlock(ctx, blockNum)
}

// Unwind deletes map entries till tip
Expand Down Expand Up @@ -234,7 +263,7 @@ func (b *Bridge) Events(ctx context.Context, blockNum uint64) ([]*types.Message,
return nil, err
}

b.logger.Debug(bridgeLogPrefix(fmt.Sprintf("got %v events for block %v", len(events), blockNum)))
b.logger.Debug(bridgeLogPrefix("events query db result"), "blockNum", blockNum, "eventCount", len(events))

// convert to message
for _, event := range events {
Expand All @@ -260,7 +289,127 @@ func (b *Bridge) EventTxnLookup(ctx context.Context, borTxHash libcommon.Hash) (
return b.store.EventTxnToBlockNum(ctx, borTxHash)
}

// Helper functions
func (b *Bridge) blockEventsWindowTimeEnd(blockNum uint64, blockTime uint64) (uint64, error) {
if b.borConfig.IsIndore(blockNum) {
stateSyncDelay := b.borConfig.CalculateStateSyncDelay(blockNum)
return blockTime - stateSyncDelay, nil
}

lastProcessedBlockTime := b.lastProcessedBlockTime.Load()
if lastProcessedBlockTime == 0 {
return 0, errors.New("unknown last processed block time")
}

return lastProcessedBlockTime, nil
}

func (b *Bridge) waitForScrapper(ctx context.Context, toTime uint64) error {
logTicker := time.NewTicker(5 * time.Second)
defer logTicker.Stop()

shouldLog := true
reachedTip := b.reachedTip.Load()
lastFetchedEventTime := b.lastFetchedEventTime.Load()
for !reachedTip && toTime > lastFetchedEventTime {
if shouldLog {
b.logger.Debug(
bridgeLogPrefix("waiting for event scrapping to catch up"),
"reachedTip", reachedTip,
"lastFetchedEventTime", lastFetchedEventTime,
"toTime", toTime,
)
}

if err := b.waitFetchedEventsSignal(ctx); err != nil {
return err
}

reachedTip = b.reachedTip.Load()
lastFetchedEventTime = b.lastFetchedEventTime.Load()

select {
case <-logTicker.C:
shouldLog = true
default:
shouldLog = false
}
}

return nil
}

func (b *Bridge) waitForProcessedBlock(ctx context.Context, blockNum uint64) error {
logTicker := time.NewTicker(5 * time.Second)
defer logTicker.Stop()

sprintLen := b.borConfig.CalculateSprintLength(blockNum)
blockNum -= blockNum % sprintLen // we only process events at sprint start
shouldLog := true
lastProcessedBlockNumber := b.lastProcessedBlockNumber.Load()
for blockNum > lastProcessedBlockNumber {
if shouldLog {
b.logger.Debug(
bridgeLogPrefix("waiting for block processing to catch up"),
"blockNum", blockNum,
"lastProcessedBlockNumber", lastProcessedBlockNumber,
)
}

if err := b.waitProcessedBlockSignal(ctx); err != nil {
return err
}

lastProcessedBlockNumber = b.lastProcessedBlockNumber.Load()

select {
case <-logTicker.C:
shouldLog = true
default:
shouldLog = false
}
}

return nil
}

func (b *Bridge) signalFetchedEvents() {
select {
case b.fetchedEventsSignal <- struct{}{}:
default: // no-op, signal already queued
}
}

func (b *Bridge) waitFetchedEventsSignal(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case _, ok := <-b.fetchedEventsSignal:
if ok {
return nil
}
return errors.New("fetchedEventsSignal channel closed")
}
}

func (b *Bridge) signalProcessedBlock() {
select {
case b.processedBlockSignal <- struct{}{}:
default: // no-op, signal already queued
}
}

func (b *Bridge) waitProcessedBlockSignal(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case _, ok := <-b.processedBlockSignal:
if ok {
return nil
}
return errors.New("processedBlockSignal channel closed")
}
}

func (b *Bridge) isSprintStart(headerNum uint64) bool {
if headerNum%b.borConfig.CalculateSprintLength(headerNum) != 0 || headerNum == 0 {
return false
Expand Down
Loading
Loading