diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 27ada6fce953..5a84bbdf74b8 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//shared/params:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_libp2p_go_libp2p_peer//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", @@ -39,11 +40,9 @@ go_test( "receive_block_test.go", "regular_sync_test.go", "service_test.go", - "simulated_sync_test.go", ], embed = [":go_default_library"], deps = [ - "//beacon-chain/chaintest/backend:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/internal:go_default_library", diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index a40a4ace373a..f87121bf0f20 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//shared/params:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_libp2p_go_libp2p_peer//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", diff --git a/beacon-chain/sync/initial-sync/metrics.go b/beacon-chain/sync/initial-sync/metrics.go index ebea942d8f0d..c3052c93c266 100644 --- a/beacon-chain/sync/initial-sync/metrics.go +++ b/beacon-chain/sync/initial-sync/metrics.go @@ -19,10 +19,6 @@ var ( Name: "initsync_received_blocks", Help: "The number of received blocks", }) - recBlockAnnounce = promauto.NewCounter(prometheus.CounterOpts{ - Name: "initsync_received_block_announce", - Help: "The number of received block announce", - }) stateReq = promauto.NewCounter(prometheus.CounterOpts{ Name: "initsync_state_req", Help: "The number of sent state requests", diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index a528cc6e1e53..9c7e1464877f 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -17,14 +17,14 @@ import ( "sync" "time" - "github.com/prysmaticlabs/prysm/shared/hashutil" - "github.com/ethereum/go-ethereum/common" "github.com/gogo/protobuf/proto" + peer "github.com/libp2p/go-libp2p-peer" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/event" + "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/shared/p2p" "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" @@ -110,6 +110,7 @@ type InitialSync struct { finalizedStateRoot [32]byte mutex *sync.Mutex nodeIsSynced bool + bestPeer peer.ID } // NewInitialSyncService constructs a new InitialSyncService. @@ -155,7 +156,6 @@ func (s *InitialSync) Start() { } s.currentSlot = cHead.Slot go s.run() - go s.listenForNewBlocks() go s.checkInMemoryBlocks() } @@ -181,6 +181,11 @@ func (s *InitialSync) InitializeFinalizedStateRoot(root [32]byte) { s.finalizedStateRoot = root } +// InitializeBestPeer sets the peer ID of the highest observed peer. +func (s *InitialSync) InitializeBestPeer(p peer.ID) { + s.bestPeer = p +} + // HighestObservedSlot returns the highest observed slot. func (s *InitialSync) HighestObservedSlot() uint64 { return s.highestObservedSlot @@ -274,24 +279,6 @@ func (s *InitialSync) checkInMemoryBlocks() { } } -// listenForNewBlocks listens for block announcements beyond the canonical head slot that may -// be received during initial sync - we must process these blocks to catch up with peers. -func (s *InitialSync) listenForNewBlocks() { - blockAnnounceSub := s.p2p.Subscribe(&pb.BeaconBlockAnnounce{}, s.blockAnnounceBuf) - defer func() { - blockAnnounceSub.Unsubscribe() - close(s.blockAnnounceBuf) - }() - for { - select { - case <-s.ctx.Done(): - return - case msg := <-s.blockAnnounceBuf: - safelyHandleMessage(s.processBlockAnnounce, msg) - } - } -} - // run is the main goroutine for the initial sync service. // delayChan is explicitly passed into this function to facilitate tests that don't require a timeout. // It is assumed that the goroutine `run` is only called once per instance. @@ -308,6 +295,7 @@ func (s *InitialSync) run() { close(s.stateBuf) }() + // We send out a state request to all peers. if err := s.requestStateFromPeer(s.ctx, s.finalizedStateRoot); err != nil { log.Errorf("Could not request state from peer %v", err) } diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go index d61a61abe1ce..38308e2d4341 100644 --- a/beacon-chain/sync/initial-sync/service_test.go +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -136,7 +136,6 @@ func TestSavingBlock_InSync(t *testing.T) { go func() { ss.run() - ss.listenForNewBlocks() exitRoutine <- true }() diff --git a/beacon-chain/sync/initial-sync/sync_blocks.go b/beacon-chain/sync/initial-sync/sync_blocks.go index 84b5eeb97c6d..1bae9e94ca9e 100644 --- a/beacon-chain/sync/initial-sync/sync_blocks.go +++ b/beacon-chain/sync/initial-sync/sync_blocks.go @@ -14,18 +14,6 @@ import ( "go.opencensus.io/trace" ) -func (s *InitialSync) processBlockAnnounce(msg p2p.Message) { - _, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBlockAnnounce") - defer span.End() - data := msg.Data.(*pb.BeaconBlockAnnounce) - recBlockAnnounce.Inc() - - if s.stateReceived && data.SlotNumber > s.highestObservedSlot { - s.requestBatchedBlocks(s.lastRequestedSlot, data.SlotNumber) - s.lastRequestedSlot = data.SlotNumber - } -} - // processBlock is the main method that validates each block which is received // for initial sync. It checks if the blocks are valid and then will continue to // process and save it into the db. @@ -93,6 +81,12 @@ func (s *InitialSync) processBatchedBlocks(msg p2p.Message) { // Do not process empty responses. return } + if msg.Peer != s.bestPeer { + // Only process batch block responses that come from the best peer + // we originally synced with. + log.WithField("peerID", msg.Peer.Pretty()).Debug("Received batch blocks from a different peer") + return + } log.Debug("Processing batched block response") for _, block := range batchedBlocks { diff --git a/beacon-chain/sync/initial-sync/sync_state.go b/beacon-chain/sync/initial-sync/sync_state.go index 9e82102134fa..09f4d395a044 100644 --- a/beacon-chain/sync/initial-sync/sync_state.go +++ b/beacon-chain/sync/initial-sync/sync_state.go @@ -102,5 +102,5 @@ func (s *InitialSync) requestStateFromPeer(ctx context.Context, lastFinalizedRoo stateReq.Inc() return s.p2p.Send(ctx, &pb.BeaconStateRequest{ FinalizedStateRootHash32S: lastFinalizedRoot[:], - }, p2p.AnyPeer) + }, s.bestPeer) } diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index 6eba811be7cc..7cd857a1169a 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -6,6 +6,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + peer "github.com/libp2p/go-libp2p-peer" "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" @@ -57,6 +58,7 @@ type Querier struct { powchain powChainService chainStarted bool atGenesis bool + bestPeer peer.ID } // NewQuerierService constructs a new Sync Querier Service. @@ -151,8 +153,7 @@ func (q *Querier) run() { ticker.Stop() }() - q.RequestLatestHead() - + timeout := time.After(5 * time.Second) for { select { case <-q.ctx.Done(): @@ -160,25 +161,34 @@ func (q *Querier) run() { return case <-ticker.C: q.RequestLatestHead() - case msg := <-q.responseBuf: - response := msg.Data.(*pb.ChainHeadResponse) + case <-timeout: + queryLog.WithField("peerID", q.bestPeer.Pretty()).Info("Peer with highest canonical head") queryLog.Infof( "Latest chain head is at slot: %d and state root: %#x", - response.CanonicalSlot-params.BeaconConfig().GenesisSlot, response.CanonicalStateRootHash32, + q.currentHeadSlot-params.BeaconConfig().GenesisSlot, q.currentStateRoot, ) - q.currentHeadSlot = response.CanonicalSlot - q.currentStateRoot = response.CanonicalStateRootHash32 - q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S) - ticker.Stop() responseSub.Unsubscribe() q.cancel() + case msg := <-q.responseBuf: + response := msg.Data.(*pb.ChainHeadResponse) + queryLog.WithFields(logrus.Fields{ + "peerID": msg.Peer.Pretty(), + "highestSlot": response.CanonicalSlot - params.BeaconConfig().GenesisSlot, + }).Info("Received chain head from peer") + if response.CanonicalSlot > q.currentHeadSlot { + q.currentHeadSlot = response.CanonicalSlot + q.bestPeer = msg.Peer + q.currentHeadSlot = response.CanonicalSlot + q.currentStateRoot = response.CanonicalStateRootHash32 + q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S) + } } } } -// RequestLatestHead broadcasts out a request for all -// the latest chain heads from the node's peers. +// RequestLatestHead broadcasts a request for +// the latest chain head slot and state root to a peer. func (q *Querier) RequestLatestHead() { request := &pb.ChainHeadRequest{} q.p2p.Broadcast(context.Background(), request) diff --git a/beacon-chain/sync/querier_test.go b/beacon-chain/sync/querier_test.go index 3dc8d9ba6107..a406c0150afd 100644 --- a/beacon-chain/sync/querier_test.go +++ b/beacon-chain/sync/querier_test.go @@ -146,7 +146,7 @@ func TestQuerier_ChainReqResponse(t *testing.T) { }() response := &pb.ChainHeadResponse{ - CanonicalSlot: 0, + CanonicalSlot: 1, CanonicalStateRootHash32: []byte{'a', 'b'}, } diff --git a/beacon-chain/sync/regular_sync_test.go b/beacon-chain/sync/regular_sync_test.go index 658b26ecc7fd..83ec3ba16000 100644 --- a/beacon-chain/sync/regular_sync_test.go +++ b/beacon-chain/sync/regular_sync_test.go @@ -48,7 +48,6 @@ func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) } type mockChainService struct { - bFeed *event.Feed sFeed *event.Feed cFeed *event.Feed db *db.BeaconDB diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 14c8d5529a18..2b3082402650 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -129,6 +129,7 @@ func (ss *Service) run() { // Sets the highest observed slot from querier. ss.InitialSync.InitializeObservedSlot(ss.Querier.currentHeadSlot) + ss.InitialSync.InitializeBestPeer(ss.Querier.bestPeer) ss.InitialSync.InitializeObservedStateRoot(bytesutil.ToBytes32(ss.Querier.currentStateRoot)) // Sets the state root of the highest observed slot. ss.InitialSync.InitializeFinalizedStateRoot(ss.Querier.currentFinalizedStateRoot) diff --git a/beacon-chain/sync/simulated_sync_test.go b/beacon-chain/sync/simulated_sync_test.go deleted file mode 100644 index 1ca82a10b77e..000000000000 --- a/beacon-chain/sync/simulated_sync_test.go +++ /dev/null @@ -1,290 +0,0 @@ -package sync - -import ( - "context" - "reflect" - "sync" - "testing" - "time" - - "github.com/gogo/protobuf/proto" - peer "github.com/libp2p/go-libp2p-peer" - "github.com/prysmaticlabs/prysm/beacon-chain/chaintest/backend" - "github.com/prysmaticlabs/prysm/beacon-chain/db" - pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" - "github.com/prysmaticlabs/prysm/shared/bls" - "github.com/prysmaticlabs/prysm/shared/event" - "github.com/prysmaticlabs/prysm/shared/hashutil" - "github.com/prysmaticlabs/prysm/shared/p2p" - "github.com/prysmaticlabs/prysm/shared/params" -) - -type simulatedP2P struct { - subsChannels map[reflect.Type]*event.Feed - mutex *sync.RWMutex - ctx context.Context -} - -func (sim *simulatedP2P) Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription { - sim.mutex.Lock() - defer sim.mutex.Unlock() - - protoType := reflect.TypeOf(msg) - - feed, ok := sim.subsChannels[protoType] - if !ok { - nFeed := new(event.Feed) - sim.subsChannels[protoType] = nFeed - return nFeed.Subscribe(channel) - } - return feed.Subscribe(channel) -} - -func (sim *simulatedP2P) Broadcast(_ context.Context, msg proto.Message) { - sim.mutex.Lock() - defer sim.mutex.Unlock() - - protoType := reflect.TypeOf(msg) - - feed, ok := sim.subsChannels[protoType] - if !ok { - return - } - - feed.Send(p2p.Message{Ctx: sim.ctx, Data: msg}) -} - -func (sim *simulatedP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) error { - sim.mutex.Lock() - defer sim.mutex.Unlock() - - protoType := reflect.TypeOf(msg) - - feed, ok := sim.subsChannels[protoType] - if !ok { - return nil - } - - feed.Send(p2p.Message{Ctx: sim.ctx, Data: msg}) - return nil -} - -func setupSimBackendAndDB(t *testing.T) (*backend.SimulatedBackend, *db.BeaconDB, []*bls.SecretKey) { - ctx := context.Background() - - bd, err := backend.NewSimulatedBackend() - if err != nil { - t.Fatalf("Could not set up simulated backend %v", err) - } - - privKeys, err := bd.SetupBackend(100) - if err != nil { - t.Fatalf("Could not set up backend %v", err) - } - - beacondb, err := db.SetupDB() - if err != nil { - t.Fatalf("Could not setup beacon db %v", err) - } - - memBlocks := bd.InMemoryBlocks() - if err := beacondb.SaveBlock(memBlocks[0]); err != nil { - t.Fatalf("Could not save block %v", err) - } - if err := beacondb.SaveJustifiedBlock(memBlocks[0]); err != nil { - t.Fatalf("Could not save block %v", err) - } - if err := beacondb.SaveFinalizedBlock(memBlocks[0]); err != nil { - t.Fatalf("Could not save block %v", err) - } - - state := bd.State() - state.LatestBlock = memBlocks[0] - state.LatestEth1Data = &pb.Eth1Data{ - BlockHash32: []byte{}, - } - - if err := beacondb.SaveState(ctx, state); err != nil { - t.Fatalf("Could not save state %v", err) - } - if err := beacondb.SaveJustifiedState(state); err != nil { - t.Fatalf("Could not save state %v", err) - } - if err := beacondb.SaveFinalizedState(state); err != nil { - t.Fatalf("Could not save state %v", err) - } - - if err := beacondb.UpdateChainHead(ctx, memBlocks[0], state); err != nil { - t.Fatalf("Could not update chain head %v", err) - } - - return bd, beacondb, privKeys -} - -func setUpSyncedService(numOfBlocks int, simP2P *simulatedP2P, t *testing.T) (*Service, *db.BeaconDB, [32]byte) { - bd, beacondb, _ := setupSimBackendAndDB(t) - defer bd.Shutdown() - defer db.TeardownDB(bd.DB()) - ctx := context.Background() - - mockPow := &genesisPowChain{ - feed: new(event.Feed), - } - - mockChain := &mockChainService{ - bFeed: new(event.Feed), - sFeed: new(event.Feed), - cFeed: new(event.Feed), - db: bd.DB(), - } - - cfg := &Config{ - ChainService: mockChain, - BeaconDB: beacondb, - OperationService: &mockOperationService{}, - P2P: simP2P, - PowChainService: mockPow, - } - - ss := NewSyncService(context.Background(), cfg) - - go ss.run() - for !ss.Querier.chainStarted { - mockChain.sFeed.Send(time.Now()) - } - - state, err := beacondb.HeadState(ctx) - if err != nil { - t.Fatal(err) - } - inMemoryBlocks := bd.InMemoryBlocks() - genesisBlock := inMemoryBlocks[0] - stateRoot, err := hashutil.HashProto(state) - if err != nil { - t.Fatal(err) - } - parentRoot, err := hashutil.HashBeaconBlock(genesisBlock) - if err != nil { - t.Fatal(err) - } - - for i := 1; i <= numOfBlocks; i++ { - block := &pb.BeaconBlock{ - Slot: params.BeaconConfig().GenesisSlot + uint64(i), - ParentRootHash32: parentRoot[:], - StateRootHash32: stateRoot[:], - } - state, err = mockChain.ApplyBlockStateTransition(ctx, block, state) - if err != nil { - t.Fatal(err) - } - stateRoot, err = hashutil.HashProto(state) - if err != nil { - t.Fatal(err) - } - parentRoot, err = hashutil.HashBeaconBlock(block) - if err := mockChain.CleanupBlockOperations(ctx, block); err != nil { - t.Fatal(err) - } - if err := beacondb.SaveBlock(block); err != nil { - t.Fatal(err) - } - if err := beacondb.UpdateChainHead(ctx, block, state); err != nil { - t.Fatal(err) - } - } - return ss, beacondb, stateRoot -} - -func setUpUnSyncedService(simP2P *simulatedP2P, stateRoot [32]byte, t *testing.T) (*Service, *db.BeaconDB) { - bd, beacondb, _ := setupSimBackendAndDB(t) - defer bd.Shutdown() - defer db.TeardownDB(bd.DB()) - - mockPow := &afterGenesisPowChain{ - feed: new(event.Feed), - } - - mockChain := &mockChainService{ - bFeed: new(event.Feed), - sFeed: new(event.Feed), - cFeed: new(event.Feed), - db: bd.DB(), - } - - cfg := &Config{ - ChainService: mockChain, - BeaconDB: beacondb, - OperationService: &mockOperationService{}, - P2P: simP2P, - PowChainService: mockPow, - } - - ss := NewSyncService(context.Background(), cfg) - - go ss.run() - ss.Querier.chainStarted = true - ss.Querier.atGenesis = false - - for ss.Querier.currentHeadSlot == 0 { - simP2P.Send(simP2P.ctx, &pb.ChainHeadResponse{ - CanonicalSlot: params.BeaconConfig().GenesisSlot + 12, - CanonicalStateRootHash32: stateRoot[:], - }, "") - } - - return ss, beacondb -} - -func TestSyncing_AFullySyncedNode(t *testing.T) { - numOfBlocks := 12 - ctx := context.Background() - newP2P := &simulatedP2P{ - subsChannels: make(map[reflect.Type]*event.Feed), - mutex: new(sync.RWMutex), - ctx: ctx, - } - - // Sets up a synced service which has its head at the current - // numOfBlocks from genesis. The blocks are generated through - // simulated backend. - ss, syncedDB, stateRoot := setUpSyncedService(numOfBlocks, newP2P, t) - defer ss.Stop() - defer db.TeardownDB(syncedDB) - - // Sets up a sync service which has its current head at genesis. - us, unSyncedDB := setUpUnSyncedService(newP2P, stateRoot, t) - defer us.Stop() - defer db.TeardownDB(unSyncedDB) - - us2, unSyncedDB2 := setUpUnSyncedService(newP2P, stateRoot, t) - defer us2.Stop() - defer db.TeardownDB(unSyncedDB2) - - finalized, err := syncedDB.FinalizedState() - if err != nil { - t.Fatal(err) - } - - newP2P.Send(newP2P.ctx, &pb.BeaconStateResponse{ - FinalizedState: finalized, - }, "") - - timeout := time.After(10 * time.Second) - tick := time.Tick(200 * time.Millisecond) -loop: - for { - select { - case <-timeout: - t.Error("Could not sync in time") - break loop - case <-tick: - _, slot1 := us.InitialSync.NodeIsSynced() - _, slot2 := us2.InitialSync.NodeIsSynced() - if slot1 == uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot || - slot2 == uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot { - break loop - } - } - } -} diff --git a/shared/p2p/service.go b/shared/p2p/service.go index 16743e00a588..18e2d62fb4d4 100644 --- a/shared/p2p/service.go +++ b/shared/p2p/service.go @@ -14,7 +14,7 @@ import ( "github.com/gogo/protobuf/proto" ds "github.com/ipfs/go-datastore" dsync "github.com/ipfs/go-datastore/sync" - libp2p "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p" host "github.com/libp2p/go-libp2p-host" kaddht "github.com/libp2p/go-libp2p-kad-dht" libp2pnet "github.com/libp2p/go-libp2p-net"