diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index be75f2ca01ff..0daf892d0beb 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -88,16 +88,29 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { continue } - s.pendingQueueLock.RLock() - inPendingQueue := s.seenPendingBlocks[bytesutil.ToBytes32(b.Block().ParentRoot())] - s.pendingQueueLock.RUnlock() - blkRoot, err := b.Block().HashTreeRoot() if err != nil { tracing.AnnotateError(span, err) span.End() return err } + inDB := s.cfg.beaconDB.HasBlock(ctx, blkRoot) + // No need to process the same block twice. + if inDB { + s.pendingQueueLock.Lock() + if err := s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil { + s.pendingQueueLock.Unlock() + return err + } + s.pendingQueueLock.Unlock() + span.End() + continue + } + + s.pendingQueueLock.RLock() + inPendingQueue := s.seenPendingBlocks[bytesutil.ToBytes32(b.Block().ParentRoot())] + s.pendingQueueLock.RUnlock() + parentIsBad := s.hasBadBlock(bytesutil.ToBytes32(b.Block().ParentRoot())) blockIsBad := s.hasBadBlock(blkRoot) // Check if parent is a bad block. @@ -117,12 +130,12 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { continue } - inDB := s.cfg.beaconDB.HasBlock(ctx, bytesutil.ToBytes32(b.Block().ParentRoot())) + parentInDb := s.cfg.beaconDB.HasBlock(ctx, bytesutil.ToBytes32(b.Block().ParentRoot())) hasPeer := len(pids) != 0 // Only request for missing parent block if it's not in beaconDB, not in pending cache // and has peer in the peer list. - if !inPendingQueue && !inDB && hasPeer { + if !inPendingQueue && !parentInDb && hasPeer { log.WithFields(logrus.Fields{ "currentSlot": b.Block().Slot(), "parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block().ParentRoot())), @@ -133,7 +146,7 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { continue } - if !inDB { + if !parentInDb { span.End() continue } @@ -167,6 +180,7 @@ func (s *Service) processPendingBlocks(ctx context.Context) error { s.pendingQueueLock.Lock() if err := s.deleteBlockFromPendingQueue(slot, b, blkRoot); err != nil { + s.pendingQueueLock.Unlock() return err } s.pendingQueueLock.Unlock() @@ -321,6 +335,7 @@ func (s *Service) deleteBlockFromPendingQueue(slot types.Slot, b block.SignedBea } if len(newBlks) == 0 { s.slotToPendingBlocks.Delete(slotToCacheKey(slot)) + delete(s.seenPendingBlocks, r) return nil } diff --git a/beacon-chain/sync/pending_blocks_queue_test.go b/beacon-chain/sync/pending_blocks_queue_test.go index 05aee9284889..6110ebad43bd 100644 --- a/beacon-chain/sync/pending_blocks_queue_test.go +++ b/beacon-chain/sync/pending_blocks_queue_test.go @@ -84,9 +84,13 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks1(t *testing.T) { require.NoError(t, r.insertBlockToPendingQueue(b1.Block.Slot, wrapper.WrappedPhase0SignedBeaconBlock(b1), b1Root)) require.NoError(t, r.cfg.beaconDB.SaveBlock(context.Background(), wrapper.WrappedPhase0SignedBeaconBlock(b1))) - // Insert bad b1 in the cache to verify the good one doesn't get replaced. - require.NoError(t, r.insertBlockToPendingQueue(b1.Block.Slot, wrapper.WrappedPhase0SignedBeaconBlock(util.NewBeaconBlock()), [32]byte{})) + nBlock := util.NewBeaconBlock() + nBlock.Block.Slot = b1.Block.Slot + nRoot, err := nBlock.Block.HashTreeRoot() + require.NoError(t, err) + // Insert bad b1 in the cache to verify the good one doesn't get replaced. + require.NoError(t, r.insertBlockToPendingQueue(nBlock.Block.Slot, wrapper.WrappedPhase0SignedBeaconBlock(nBlock), nRoot)) require.NoError(t, r.processPendingBlocks(context.Background())) // Marks a block as bad require.NoError(t, r.processPendingBlocks(context.Background())) // Bad block removed on second run @@ -140,6 +144,46 @@ func TestRegularSync_InsertDuplicateBlocks(t *testing.T) { } +func TestRegularSyncBeaconBlockSubscriber_DoNotReprocessBlock(t *testing.T) { + db := dbtest.SetupDB(t) + + p1 := p2ptest.NewTestP2P(t) + r := &Service{ + cfg: &config{ + p2p: p1, + beaconDB: db, + chain: &mock.ChainService{ + FinalizedCheckPoint: ðpb.Checkpoint{ + Epoch: 0, + }, + }, + stateGen: stategen.New(db), + }, + slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), + seenPendingBlocks: make(map[[32]byte]bool), + } + r.initCaches() + + b0 := util.NewBeaconBlock() + require.NoError(t, r.cfg.beaconDB.SaveBlock(context.Background(), wrapper.WrappedPhase0SignedBeaconBlock(b0))) + b0Root, err := b0.Block.HashTreeRoot() + require.NoError(t, err) + b3 := util.NewBeaconBlock() + b3.Block.Slot = 3 + b3.Block.ParentRoot = b0Root[:] + b3Root, err := b3.Block.HashTreeRoot() + require.NoError(t, err) + + require.NoError(t, r.cfg.beaconDB.SaveBlock(context.Background(), wrapper.WrappedPhase0SignedBeaconBlock(b3))) + + // Add b3 to the cache + require.NoError(t, r.insertBlockToPendingQueue(b3.Block.Slot, wrapper.WrappedPhase0SignedBeaconBlock(b3), b3Root)) + + require.NoError(t, r.processPendingBlocks(context.Background())) + assert.Equal(t, 0, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache") + assert.Equal(t, 0, len(r.seenPendingBlocks), "Incorrect size for seen pending block") +} + // /- b1 - b2 - b5 // b0 // \- b3 - b4 @@ -237,7 +281,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks_2Chains(t *testin require.NoError(t, r.processPendingBlocks(context.Background())) // Bad block removed on second run assert.Equal(t, 1, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache") - assert.Equal(t, 3, len(r.seenPendingBlocks), "Incorrect size for seen pending block") + assert.Equal(t, 1, len(r.seenPendingBlocks), "Incorrect size for seen pending block") // Add b2 to the cache require.NoError(t, r.insertBlockToPendingQueue(b2.Block.Slot, wrapper.WrappedPhase0SignedBeaconBlock(b2), b2Root)) @@ -248,7 +292,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks_2Chains(t *testin require.NoError(t, r.processPendingBlocks(context.Background())) // Bad block removed on second run assert.Equal(t, 0, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache") - assert.Equal(t, 4, len(r.seenPendingBlocks), "Incorrect size for seen pending block") + assert.Equal(t, 0, len(r.seenPendingBlocks), "Incorrect size for seen pending block") } func TestRegularSyncBeaconBlockSubscriber_PruneOldPendingBlocks(t *testing.T) { @@ -318,7 +362,7 @@ func TestRegularSyncBeaconBlockSubscriber_PruneOldPendingBlocks(t *testing.T) { require.NoError(t, r.processPendingBlocks(context.Background())) assert.Equal(t, 0, len(r.slotToPendingBlocks.Items()), "Incorrect size for slot to pending blocks cache") - assert.Equal(t, 4, len(r.seenPendingBlocks), "Incorrect size for seen pending block") + assert.Equal(t, 0, len(r.seenPendingBlocks), "Incorrect size for seen pending block") } func TestService_sortedPendingSlots(t *testing.T) { @@ -429,7 +473,7 @@ func TestService_BatchRootRequest(t *testing.T) { assert.Equal(t, 4, len(r.seenPendingBlocks), "Incorrect size for seen pending block") } -func TestService_AddPeningBlockToQueueOverMax(t *testing.T) { +func TestService_AddPendingBlockToQueueOverMax(t *testing.T) { r := &Service{ slotToPendingBlocks: gcache.New(time.Second, 2*time.Second), seenPendingBlocks: make(map[[32]byte]bool),