Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/bridge: fix bridge integration in stage mode #11646

Merged
merged 50 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
4797124
polygon/bridge: fix synhronisation issues
taratorio Aug 16, 2024
80264ac
err if signal channel closed
taratorio Aug 16, 2024
d36af91
tidy log line
taratorio Aug 16, 2024
ae11124
rename to blockNum
taratorio Aug 18, 2024
c9e614a
Merge branch 'main' of github.com:ledgerwatch/erigon into astrid-brid…
taratorio Aug 19, 2024
63fa8f5
Merge branch 'main' of github.com:ledgerwatch/erigon into astrid-brid…
taratorio Aug 19, 2024
f5bdefb
Merge branch 'astrid-bridge-synchronisation-fix' of github.com:erigon…
taratorio Aug 19, 2024
0a0f213
Merge branch 'astrid-bridge-synchronisation-fix' of github.com:ledger…
taratorio Aug 19, 2024
ebec670
tidy
taratorio Aug 19, 2024
86936a8
code review comments
taratorio Aug 19, 2024
c2fde77
rename to BlockEventIDRange for better clarity
taratorio Aug 19, 2024
59045a7
Update fetchAndWriteHeimdallStateSyncEvents, EventsByBlock, UnwindEvents
shohamc1 Aug 19, 2024
d3e4aaf
fix startup and tests
taratorio Aug 19, 2024
d7a4812
tidy comment
taratorio Aug 19, 2024
9518b9d
print event debug info every 30 secs
taratorio Aug 19, 2024
c6c2645
fix astrid sync stage
taratorio Aug 19, 2024
c46673a
fix windows test & name tidy
taratorio Aug 19, 2024
d4e6f4f
add LastFrozenEventBlockNum
taratorio Aug 20, 2024
b1fa5cf
add more replay logic
taratorio Aug 20, 2024
9ab2005
Fix snapshot generation
shohamc1 Aug 20, 2024
b0528bb
Fix unwind
shohamc1 Aug 21, 2024
ab17de3
Update tests
shohamc1 Aug 21, 2024
d83df94
Rewrite UnwindEvents
shohamc1 Aug 21, 2024
5e0dc06
Fix prune
shohamc1 Aug 21, 2024
bc445b7
Remove print
shohamc1 Aug 21, 2024
1d6e40e
wip
taratorio Aug 21, 2024
fa63b74
Merge branch 'main' of github.com:erigontech/erigon into astrid-bridg…
taratorio Aug 21, 2024
056bbf5
Prev
shohamc1 Aug 21, 2024
6b2ae41
Set eventIdTo to 0
shohamc1 Aug 22, 2024
b83ba68
Use Next
shohamc1 Aug 22, 2024
66f3b59
Fix firstKeyGetter
shohamc1 Aug 22, 2024
46d3961
more debugging
taratorio Aug 22, 2024
6f1bedf
Merge branch 'main' of github.com:ledgerwatch/erigon into astrid-brid…
taratorio Aug 22, 2024
abe21b1
Merge branch 'flip-boreventnums' of github.com:ledgerwatch/erigon int…
taratorio Aug 22, 2024
9d3c54b
revert block replays with bridge
taratorio Aug 22, 2024
d9ccb6f
Revert "revert block replays with bridge"
taratorio Aug 22, 2024
e10703b
wip
taratorio Aug 23, 2024
7346253
Merge branch 'main' of github.com:ledgerwatch/erigon into astrid-brid…
taratorio Aug 23, 2024
1626810
remove last processed event id in mem var
taratorio Aug 23, 2024
26db6be
remove chain reader hack
taratorio Aug 23, 2024
3ed371d
revert debugging code
taratorio Aug 23, 2024
d2534ab
remove playground
taratorio Aug 23, 2024
828c038
remove snapshots version override
taratorio Aug 23, 2024
768fc50
move put processed block info last
taratorio Aug 23, 2024
4ad2ef5
fix bridge tests
taratorio Aug 23, 2024
93ab2c5
add sequential check guards to process new blocks
taratorio Aug 23, 2024
ff39f8f
Merge branch 'main' of github.com:ledgerwatch/erigon into astrid-brid…
taratorio Aug 23, 2024
e7dac3c
Merge branch 'main' of github.com:ledgerwatch/erigon into astrid-brid…
taratorio Aug 23, 2024
ba04f4f
remove debugging print
taratorio Aug 23, 2024
355d7d2
fix fill gap with execution on initial replay
taratorio Aug 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/rpcdaemon/rpcservices/eth_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ func (back *RemoteBackend) LastFrozenEventId() uint64 {
panic("not implemented")
}

func (back *RemoteBackend) LastFrozenEventBlockNum() uint64 {
panic("not implemented")
}

func (back *RemoteBackend) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([]byte, error) {
return back.blockReader.Span(ctx, tx, spanId)
}
Expand Down
49 changes: 26 additions & 23 deletions erigon-lib/kv/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,18 +331,19 @@ const (
PendingEpoch = "DevPendingEpoch" // block_num_u64+block_hash->transition_proof

// BOR
BorReceipts = "BorReceipt"
BorFinality = "BorFinality"
BorTxLookup = "BlockBorTransactionLookup" // transaction_hash -> block_num_u64
BorSeparate = "BorSeparate" // persisted snapshots of the Validator Sets, with their proposer priorities
BorEvents = "BorEvents" // event_id -> event_payload
BorEventNums = "BorEventNums" // block_num -> event_id (last event_id in that block)
BorSpans = "BorSpans" // span_id -> span (in JSON encoding)
BorMilestones = "BorMilestones" // milestone_id -> milestone (in JSON encoding)
BorMilestoneEnds = "BorMilestoneEnds" // start block_num -> milestone_id (first block of milestone)
BorCheckpoints = "BorCheckpoints" // checkpoint_id -> checkpoint (in JSON encoding)
BorCheckpointEnds = "BorCheckpointEnds" // start block_num -> checkpoint_id (first block of checkpoint)
BorProducerSelections = "BorProducerSelections" // span_id -> span selection with accumulated proposer priorities (in JSON encoding)
BorReceipts = "BorReceipt"
BorFinality = "BorFinality"
BorTxLookup = "BlockBorTransactionLookup" // transaction_hash -> block_num_u64
BorSeparate = "BorSeparate" // persisted snapshots of the Validator Sets, with their proposer priorities
BorEvents = "BorEvents" // event_id -> event_payload
BorEventNums = "BorEventNums" // block_num -> event_id (last event_id in that block)
BorEventProcessedBlocks = "BorEventProcessedBlocks" // block_num -> block_time, tracks processed blocks in the bridge, used for unwinds and restarts, gets pruned
BorSpans = "BorSpans" // span_id -> span (in JSON encoding)
BorMilestones = "BorMilestones" // milestone_id -> milestone (in JSON encoding)
BorMilestoneEnds = "BorMilestoneEnds" // start block_num -> milestone_id (first block of milestone)
BorCheckpoints = "BorCheckpoints" // checkpoint_id -> checkpoint (in JSON encoding)
BorCheckpointEnds = "BorCheckpointEnds" // start block_num -> checkpoint_id (first block of checkpoint)
BorProducerSelections = "BorProducerSelections" // span_id -> span selection with accumulated proposer priorities (in JSON encoding)

// Downloader
BittorrentCompletion = "BittorrentCompletion"
Expand Down Expand Up @@ -540,6 +541,7 @@ var ChaindataTables = []string{
BorSeparate,
BorEvents,
BorEventNums,
BorEventProcessedBlocks,
BorSpans,
BorMilestones,
BorMilestoneEnds,
Expand Down Expand Up @@ -743,17 +745,18 @@ var ChaindataTablesCfg = TableCfg{
}

var BorTablesCfg = TableCfg{
BorReceipts: {Flags: DupSort},
BorFinality: {Flags: DupSort},
BorTxLookup: {Flags: DupSort},
BorEvents: {Flags: DupSort},
BorEventNums: {Flags: DupSort},
BorSpans: {Flags: DupSort},
BorCheckpoints: {Flags: DupSort},
BorCheckpointEnds: {Flags: DupSort},
BorMilestones: {Flags: DupSort},
BorMilestoneEnds: {Flags: DupSort},
BorProducerSelections: {Flags: DupSort},
BorReceipts: {Flags: DupSort},
BorFinality: {Flags: DupSort},
BorTxLookup: {Flags: DupSort},
BorEvents: {Flags: DupSort},
BorEventNums: {Flags: DupSort},
BorEventProcessedBlocks: {Flags: DupSort},
BorSpans: {Flags: DupSort},
BorCheckpoints: {Flags: DupSort},
BorCheckpointEnds: {Flags: DupSort},
BorMilestones: {Flags: DupSort},
BorMilestoneEnds: {Flags: DupSort},
BorProducerSelections: {Flags: DupSort},
}

var TxpoolTablesCfg = TableCfg{}
Expand Down
113 changes: 85 additions & 28 deletions eth/stagedsync/stage_polygon_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,18 @@ func NewPolygonSyncStageCfg(
txActionStream: txActionStream,
},
}
bridgeStore := &polygonSyncStageBridgeStore{
eventReader: blockReader,
txActionStream: txActionStream,
}
borConfig := chainConfig.Bor.(*borcfg.BorConfig)
heimdallService := heimdall.NewService(borConfig, heimdallClient, heimdallStore, logger)
bridgeService := bridge.NewBridge(bridgeStore, logger, borConfig, heimdallClient, nil)
p2pService := p2p.NewService(maxPeers, logger, sentry, statusDataProvider.GetStatusData)
checkpointVerifier := polygonsync.VerifyCheckpointHeaders
milestoneVerifier := polygonsync.VerifyMilestoneHeaders
blocksVerifier := polygonsync.VerifyBlocks
syncStore := &polygonSyncStageSyncStore{
executionEngine: executionEngine,
}
syncStore := polygonsync.NewStore(logger, executionEngine, bridgeService)
blockDownloader := polygonsync.NewBlockDownloader(
logger,
p2pService,
Expand All @@ -112,11 +115,6 @@ func NewPolygonSyncStageCfg(
blockLimit,
)
events := polygonsync.NewTipEvents(logger, p2pService, heimdallService)
bridgeStore := &polygonSyncStageBridgeStore{
eventReader: blockReader,
txActionStream: txActionStream,
}
bridgeService := bridge.NewBridge(bridgeStore, logger, borConfig, heimdallClient, nil)
sync := polygonsync.NewSync(
syncStore,
executionEngine,
Expand All @@ -133,6 +131,7 @@ func NewPolygonSyncStageCfg(
syncService := &polygonSyncStageService{
logger: logger,
sync: sync,
syncStore: syncStore,
events: events,
p2p: p2pService,
executionEngine: executionEngine,
Expand Down Expand Up @@ -318,6 +317,18 @@ func UnwindEvents(tx kv.RwTx, unwindPoint uint64) error {
return err
}
}

epbCursor, err := tx.RwCursor(kv.BorEventProcessedBlocks)
if err != nil {
return err
}

defer epbCursor.Close()
for k, _, err = epbCursor.Seek(blockNumBuf[:]); err == nil && k != nil; k, _, err = epbCursor.Next() {
if err = epbCursor.DeleteCurrent(); err != nil {
return err
}
}
return err
}

Expand Down Expand Up @@ -399,6 +410,7 @@ type polygonSyncStageTxAction struct {
type polygonSyncStageService struct {
logger log.Logger
sync *polygonsync.Sync
syncStore polygonsync.Store
events *polygonsync.TipEvents
p2p p2p.Service
executionEngine *polygonSyncStageExecutionEngine
Expand Down Expand Up @@ -493,6 +505,10 @@ func (s *polygonSyncStageService) runBgComponentsOnce(ctx context.Context) {
}
})

eg.Go(func() error {
return s.syncStore.Run(ctx)
})

eg.Go(func() error {
return s.sync.Run(ctx)
})
Expand All @@ -503,22 +519,6 @@ func (s *polygonSyncStageService) runBgComponentsOnce(ctx context.Context) {
}()
}

type polygonSyncStageSyncStore struct {
executionEngine *polygonSyncStageExecutionEngine
}

func (s *polygonSyncStageSyncStore) InsertBlocks(ctx context.Context, blocks []*types.Block) error {
return s.executionEngine.InsertBlocks(ctx, blocks)
}

func (s *polygonSyncStageSyncStore) Flush(context.Context) error {
return nil
}

func (s *polygonSyncStageSyncStore) Run(context.Context) error {
return nil
}

type polygonSyncStageHeimdallStore struct {
checkpoints *polygonSyncStageCheckpointStore
milestones *polygonSyncStageMilestoneStore
Expand Down Expand Up @@ -1133,6 +1133,45 @@ func (s polygonSyncStageBridgeStore) LastProcessedEventID(ctx context.Context) (
return r.id, nil
}

func (s polygonSyncStageBridgeStore) LastProcessedBlockInfo(ctx context.Context) (bridge.ProcessedBlockInfo, bool, error) {
type response struct {
info bridge.ProcessedBlockInfo
ok bool
err error
}

r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error {
info, ok, err := bridge.LastProcessedBlockInfo(tx)
responseStream <- response{info: info, ok: ok, err: err}
return nil
})
if err != nil {
return bridge.ProcessedBlockInfo{}, false, err
}

return r.info, r.ok, r.err
}

func (s polygonSyncStageBridgeStore) PutProcessedBlockInfo(ctx context.Context, info bridge.ProcessedBlockInfo) error {
type response struct {
err error
}

r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error {
responseStream <- response{err: bridge.PutProcessedBlockInfo(tx, info)}
return nil
})
if err != nil {
return err
}

return r.err
}

func (s polygonSyncStageBridgeStore) LastFrozenEventBlockNum() uint64 {
return s.eventReader.LastFrozenEventBlockNum()
}

func (s polygonSyncStageBridgeStore) LastEventIDWithinWindow(ctx context.Context, fromID uint64, toTime time.Time) (uint64, error) {
type response struct {
id uint64
Expand All @@ -1154,13 +1193,13 @@ func (s polygonSyncStageBridgeStore) LastEventIDWithinWindow(ctx context.Context
return r.id, nil
}

func (s polygonSyncStageBridgeStore) PutEventIDs(ctx context.Context, eventMap map[uint64]uint64) error {
func (s polygonSyncStageBridgeStore) PutBlockNumToEventID(ctx context.Context, blockNumToEventId map[uint64]uint64) error {
type response struct {
err error
}

r, err := awaitTxAction(ctx, s.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error {
responseStream <- response{err: bridge.PutEventsIDs(tx, eventMap)}
responseStream <- response{err: bridge.PutBlockNumToEventID(tx, blockNumToEventId)}
return nil
})
if err != nil {
Expand All @@ -1178,12 +1217,12 @@ func (s polygonSyncStageBridgeStore) Events(context.Context, uint64, uint64) ([]
panic("polygonSyncStageBridgeStore.Events not supported")
}

func (s polygonSyncStageBridgeStore) EventIDRange(context.Context, uint64) (uint64, uint64, error) {
func (s polygonSyncStageBridgeStore) BlockEventIDsRange(context.Context, uint64) (uint64, uint64, error) {
// used for accessing events in execution
// astrid stage integration intends to use the bridge only for scrapping
// not for reading which remains the same in execution (via BlockReader)
// astrid standalone mode introduces its own reader
panic("polygonSyncStageBridgeStore.EventIDRange not supported")
panic("polygonSyncStageBridgeStore.BlockEventIDsRange not supported")
}

func (s polygonSyncStageBridgeStore) EventTxnToBlockNum(context.Context, common.Hash) (uint64, bool, error) {
Expand Down Expand Up @@ -1225,6 +1264,24 @@ type polygonSyncStageExecutionEngine struct {
cachedForkChoice *types.Header
}

func (e *polygonSyncStageExecutionEngine) GetHeader(ctx context.Context, blockNum uint64) (*types.Header, error) {
type response struct {
header *types.Header
err error
}

r, err := awaitTxAction(ctx, e.txActionStream, func(tx kv.RwTx, responseStream chan<- response) error {
header, err := e.blockReader.HeaderByNumber(ctx, tx, blockNum)
responseStream <- response{header: header, err: err}
return nil
})
if err != nil {
return nil, err
}

return r.header, r.err
}

func (e *polygonSyncStageExecutionEngine) InsertBlocks(ctx context.Context, blocks []*types.Block) error {
type response struct {
err error
Expand Down
32 changes: 31 additions & 1 deletion polygon/bor/bordb/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
// doesn't change sequences of kv.EthTx
// doesn't delete Receipts, Senders, Canonical markers, TotalDifficulty
func PruneBorBlocks(tx kv.RwTx, blockTo uint64, blocksDeleteLimit int, SpanIdAt func(number uint64) uint64) (deleted int, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the return value also? Is deleted the total number of objects deleted?

// events
c, err := tx.Cursor(kv.BorEventNums)
if err != nil {
return deleted, err
Expand Down Expand Up @@ -70,14 +71,40 @@ func PruneBorBlocks(tx kv.RwTx, blockTo uint64, blocksDeleteLimit int, SpanIdAt
if err != nil {
return deleted, err
}

epbCursor, err := tx.RwCursor(kv.BorEventProcessedBlocks)
if err != nil {
return deleted, err
}

defer epbCursor.Close()
counter = blocksDeleteLimit
for k, _, err = epbCursor.First(); err == nil && k != nil && counter > 0; k, _, err = epbCursor.Next() {
blockNum := binary.BigEndian.Uint64(k)
if blockNum >= blockTo {
break
}

if err = epbCursor.DeleteCurrent(); err != nil {
return deleted, err
}

deleted++
counter--
}
if err != nil {
return deleted, err
}
Comment on lines +75 to +97
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be extracted into a sub-function.


// spans
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be extracted to a DeleteSpans function.

firstSpanToKeep := SpanIdAt(blockTo)
c2, err := tx.RwCursor(kv.BorSpans)
if err != nil {
return deleted, err
}
defer c2.Close()
counter = blocksDeleteLimit
for k, _, err := c2.First(); err == nil && k != nil && counter > 0; k, _, err = c2.Next() {
for k, _, err = c2.First(); err == nil && k != nil && counter > 0; k, _, err = c2.Next() {
spanId := binary.BigEndian.Uint64(k)
if spanId >= firstSpanToKeep {
break
Expand All @@ -88,6 +115,9 @@ func PruneBorBlocks(tx kv.RwTx, blockTo uint64, blocksDeleteLimit int, SpanIdAt
deleted++
counter--
}
if err != nil {
return deleted, err
}

if snaptype.CheckpointsEnabled() {
checkpointCursor, err := tx.RwCursor(kv.BorCheckpoints)
Copy link
Member

@antonis19 antonis19 Aug 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this isn't part or purpose of the PR, but since this function is a bit long, you could split it into smaller sub-functions each deleting from a certain table.

Expand Down
Loading
Loading