From 7e1dd70ee01e8b8072bcd8f3e478793502ecd936 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Wed, 17 Apr 2019 08:54:45 -0500 Subject: [PATCH 01/15] only accept the highest finalized slot from peers --- beacon-chain/sync/initial-sync/service.go | 39 +++++++++----------- beacon-chain/sync/initial-sync/sync_state.go | 5 ++- shared/p2p/service.go | 15 +++++--- 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 2b0316cce1b4..0fb4fb7ada09 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -13,6 +13,7 @@ package initialsync import ( "context" "fmt" + "github.com/libp2p/go-libp2p-peer" "math/big" "sync" "time" @@ -64,6 +65,7 @@ func DefaultConfig() *Config { type p2pAPI interface { p2p.Broadcaster p2p.Sender + Peers() peer.IDSlice Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription } @@ -154,7 +156,6 @@ func (s *InitialSync) Start() { } s.currentSlot = cHead.Slot go s.run() - go s.listenForNewBlocks() go s.checkInMemoryBlocks() } @@ -268,24 +269,6 @@ func (s *InitialSync) checkInMemoryBlocks() { } } -// listenForNewBlocks listens for block announcements beyond the canonical head slot that may -// be received during initial sync - we must process these blocks to catch up with peers. -func (s *InitialSync) listenForNewBlocks() { - blockAnnounceSub := s.p2p.Subscribe(&pb.BeaconBlockAnnounce{}, s.blockAnnounceBuf) - defer func() { - blockAnnounceSub.Unsubscribe() - close(s.blockAnnounceBuf) - }() - for { - select { - case <-s.ctx.Done(): - return - case msg := <-s.blockAnnounceBuf: - safelyHandleMessage(s.processBlockAnnounce, msg) - } - } -} - // run is the main goroutine for the initial sync service. // delayChan is explicitly passed into this function to facilitate tests that don't require a timeout. // It is assumed that the goroutine `run` is only called once per instance. @@ -302,8 +285,14 @@ func (s *InitialSync) run() { close(s.stateBuf) }() - if err := s.requestStateFromPeer(s.ctx, s.finalizedStateRoot); err != nil { - log.Errorf("Could not request state from peer %v", err) + // We send out a state request to all peers. + peers := s.p2p.Peers() + stateResponses := 0 + highestObservedFinalizedSlot := params.BeaconConfig().GenesisSlot + for _, p := range peers { + if err := s.requestStateFromPeer(s.ctx, s.finalizedStateRoot, p); err != nil { + log.Errorf("Could not request state from peer %v", err) + } } for { @@ -317,6 +306,14 @@ func (s *InitialSync) run() { s.processBlock(message.Ctx, data.Block) }, msg) case msg := <-s.stateBuf: + data := msg.Data.(*pb.BeaconStateResponse) + if stateResponses != len(peers) { + if data.FinalizedState.Slot > highestObservedFinalizedSlot { + highestObservedFinalizedSlot = data.FinalizedState.Slot + } + stateResponses++ + continue + } safelyHandleMessage(s.processState, msg) case msg := <-s.batchedBlockBuf: safelyHandleMessage(s.processBatchedBlocks, msg) diff --git a/beacon-chain/sync/initial-sync/sync_state.go b/beacon-chain/sync/initial-sync/sync_state.go index 718b13a9b9e2..4f709503bea7 100644 --- a/beacon-chain/sync/initial-sync/sync_state.go +++ b/beacon-chain/sync/initial-sync/sync_state.go @@ -2,6 +2,7 @@ package initialsync import ( "context" + "github.com/libp2p/go-libp2p-peer" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" @@ -77,11 +78,11 @@ func (s *InitialSync) processState(msg p2p.Message) { } // requestStateFromPeer requests for the canonical state, finalized state, and justified state from a peer. -func (s *InitialSync) requestStateFromPeer(ctx context.Context, lastFinalizedRoot [32]byte) error { +func (s *InitialSync) requestStateFromPeer(ctx context.Context, lastFinalizedRoot [32]byte, peerID peer.ID) error { ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestStateFromPeer") defer span.End() stateReq.Inc() return s.p2p.Send(ctx, &pb.BeaconStateRequest{ FinalizedStateRootHash32S: lastFinalizedRoot[:], - }, p2p.AnyPeer) + }, peerID) } diff --git a/shared/p2p/service.go b/shared/p2p/service.go index 3216075282b1..9d8ed1e6624d 100644 --- a/shared/p2p/service.go +++ b/shared/p2p/service.go @@ -14,13 +14,13 @@ import ( "github.com/gogo/protobuf/proto" ds "github.com/ipfs/go-datastore" dsync "github.com/ipfs/go-datastore/sync" - libp2p "github.com/libp2p/go-libp2p" - host "github.com/libp2p/go-libp2p-host" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-host" kaddht "github.com/libp2p/go-libp2p-kad-dht" libp2pnet "github.com/libp2p/go-libp2p-net" - peer "github.com/libp2p/go-libp2p-peer" - protocol "github.com/libp2p/go-libp2p-protocol" - pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p-peer" + "github.com/libp2p/go-libp2p-protocol" + "github.com/libp2p/go-libp2p-pubsub" rhost "github.com/libp2p/go-libp2p/p2p/host/routed" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/event" @@ -170,6 +170,11 @@ func (s *Server) Status() error { return nil } +// Peers -- +func (s *Server) Peers() peer.IDSlice { + return s.host.Peerstore().Peers() +} + // RegisterTopic with a message and the adapter stack for the given topic. The // message type provided will be feed selector for emitting messages received // on a given topic. From ebe69778bdcf0ab8bfc886d9023fef83b41c80ab Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Wed, 17 Apr 2019 09:14:32 -0500 Subject: [PATCH 02/15] use best peer for state requests --- beacon-chain/sync/initial-sync/service.go | 24 ++++++----------- beacon-chain/sync/querier.go | 32 ++++++++++++++++++----- beacon-chain/sync/regular_sync.go | 2 ++ beacon-chain/sync/service.go | 1 + 4 files changed, 36 insertions(+), 23 deletions(-) diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 0fb4fb7ada09..8858b4d09cf7 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -65,7 +65,6 @@ func DefaultConfig() *Config { type p2pAPI interface { p2p.Broadcaster p2p.Sender - Peers() peer.IDSlice Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription } @@ -111,6 +110,7 @@ type InitialSync struct { finalizedStateRoot [32]byte mutex *sync.Mutex nodeIsSynced bool + bestPeer peer.ID } // NewInitialSyncService constructs a new InitialSyncService. @@ -181,6 +181,11 @@ func (s *InitialSync) InitializeFinalizedStateRoot(root [32]byte) { s.finalizedStateRoot = root } +// InitializeBestPeer sets the peer ID of the highest observed peer. +func (s *InitialSync) InitializeBestPeer(p peer.ID) { + s.bestPeer = p +} + // HighestObservedSlot returns the highest observed slot. func (s *InitialSync) HighestObservedSlot() uint64 { return s.highestObservedSlot @@ -286,13 +291,8 @@ func (s *InitialSync) run() { }() // We send out a state request to all peers. - peers := s.p2p.Peers() - stateResponses := 0 - highestObservedFinalizedSlot := params.BeaconConfig().GenesisSlot - for _, p := range peers { - if err := s.requestStateFromPeer(s.ctx, s.finalizedStateRoot, p); err != nil { - log.Errorf("Could not request state from peer %v", err) - } + if err := s.requestStateFromPeer(s.ctx, s.finalizedStateRoot, s.bestPeer); err != nil { + log.Errorf("Could not request state from peer %v", err) } for { @@ -306,14 +306,6 @@ func (s *InitialSync) run() { s.processBlock(message.Ctx, data.Block) }, msg) case msg := <-s.stateBuf: - data := msg.Data.(*pb.BeaconStateResponse) - if stateResponses != len(peers) { - if data.FinalizedState.Slot > highestObservedFinalizedSlot { - highestObservedFinalizedSlot = data.FinalizedState.Slot - } - stateResponses++ - continue - } safelyHandleMessage(s.processState, msg) case msg := <-s.batchedBlockBuf: safelyHandleMessage(s.processBatchedBlocks, msg) diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index 6eba811be7cc..0a5722c08f2e 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -2,6 +2,7 @@ package sync import ( "context" + "github.com/libp2p/go-libp2p-peer" "math/big" "time" @@ -57,6 +58,7 @@ type Querier struct { powchain powChainService chainStarted bool atGenesis bool + bestPeer peer.ID } // NewQuerierService constructs a new Sync Querier Service. @@ -151,17 +153,32 @@ func (q *Querier) run() { ticker.Stop() }() - q.RequestLatestHead() - + peerResponses := 0 for { select { case <-q.ctx.Done(): queryLog.Info("Exiting goroutine") return case <-ticker.C: - q.RequestLatestHead() + peers := q.p2p.Peers() + for _, p := range peers { + if err := q.RequestLatestHead(p); err != nil { + log.Errorf("Could not request head from peer: %v", err) + } + } case msg := <-q.responseBuf: response := msg.Data.(*pb.ChainHeadResponse) + peers := q.p2p.Peers() + peerResponses++ + if peerResponses != len(peers) { + if response.CanonicalSlot > q.currentHeadSlot { + q.currentHeadSlot = response.CanonicalSlot + } + continue + } + if response.CanonicalSlot > q.currentHeadSlot { + q.currentHeadSlot = response.CanonicalSlot + } queryLog.Infof( "Latest chain head is at slot: %d and state root: %#x", response.CanonicalSlot-params.BeaconConfig().GenesisSlot, response.CanonicalStateRootHash32, @@ -169,6 +186,7 @@ func (q *Querier) run() { q.currentHeadSlot = response.CanonicalSlot q.currentStateRoot = response.CanonicalStateRootHash32 q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S) + q.bestPeer = msg.Peer ticker.Stop() responseSub.Unsubscribe() @@ -177,11 +195,11 @@ func (q *Querier) run() { } } -// RequestLatestHead broadcasts out a request for all -// the latest chain heads from the node's peers. -func (q *Querier) RequestLatestHead() { +// RequestLatestHead sends a request for all +// the latest chain head slot and state root to a peer. +func (q *Querier) RequestLatestHead(p peer.ID) error { request := &pb.ChainHeadRequest{} - q.p2p.Broadcast(context.Background(), request) + return q.p2p.Send(context.Background(), request, p) } // IsSynced checks if the node is currently synced with the diff --git a/beacon-chain/sync/regular_sync.go b/beacon-chain/sync/regular_sync.go index d6e1dfb086b1..a64fbbb68fed 100644 --- a/beacon-chain/sync/regular_sync.go +++ b/beacon-chain/sync/regular_sync.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "github.com/libp2p/go-libp2p-peer" "runtime/debug" "sync" @@ -48,6 +49,7 @@ type p2pAPI interface { p2p.Broadcaster p2p.Sender p2p.Subscriber + Peers() peer.IDSlice } // RegularSync is the gateway and the bridge between the p2p network and the local beacon chain. diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index cf452b9fb484..4218ef3759c8 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -117,6 +117,7 @@ func (ss *Service) run() { // Sets the highest observed slot from querier. ss.InitialSync.InitializeObservedSlot(ss.Querier.currentHeadSlot) + ss.InitialSync.InitializeBestPeer(ss.Querier.bestPeer) ss.InitialSync.InitializeObservedStateRoot(bytesutil.ToBytes32(ss.Querier.currentStateRoot)) // Sets the state root of the highest observed slot. ss.InitialSync.InitializeFinalizedStateRoot(ss.Querier.currentFinalizedStateRoot) From f6b84ceefde5c74496683937c8e42c025564e766 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Wed, 24 Apr 2019 23:16:11 -0500 Subject: [PATCH 03/15] peer id --- beacon-chain/sync/BUILD.bazel | 1 + beacon-chain/sync/initial-sync/BUILD.bazel | 1 + beacon-chain/sync/initial-sync/service.go | 7 ++++--- beacon-chain/sync/initial-sync/service_test.go | 1 - beacon-chain/sync/initial-sync/sync_state.go | 2 +- beacon-chain/sync/querier.go | 13 +++++++------ beacon-chain/sync/regular_sync.go | 3 ++- beacon-chain/sync/regular_sync_test.go | 4 ++++ beacon-chain/sync/simulated_sync_test.go | 4 ++++ shared/p2p/service.go | 10 +++++----- 10 files changed, 29 insertions(+), 17 deletions(-) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 27ada6fce953..42a88a8cc73f 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//shared/params:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_libp2p_go_libp2p_peer//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index a40a4ace373a..f87121bf0f20 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//shared/params:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", + "@com_github_libp2p_go_libp2p_peer//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 9cb344c4c9f3..bfe877eb6231 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -13,11 +13,12 @@ package initialsync import ( "context" "fmt" - "github.com/libp2p/go-libp2p-peer" "math/big" "sync" "time" + peer "github.com/libp2p/go-libp2p-peer" + "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/ethereum/go-ethereum/common" @@ -111,7 +112,7 @@ type InitialSync struct { finalizedStateRoot [32]byte mutex *sync.Mutex nodeIsSynced bool - bestPeer peer.ID + bestPeer peer.ID } // NewInitialSyncService constructs a new InitialSyncService. @@ -184,7 +185,7 @@ func (s *InitialSync) InitializeFinalizedStateRoot(root [32]byte) { // InitializeBestPeer sets the peer ID of the highest observed peer. func (s *InitialSync) InitializeBestPeer(p peer.ID) { - s.bestPeer = p + s.bestPeer = p } // HighestObservedSlot returns the highest observed slot. diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go index d61a61abe1ce..38308e2d4341 100644 --- a/beacon-chain/sync/initial-sync/service_test.go +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -136,7 +136,6 @@ func TestSavingBlock_InSync(t *testing.T) { go func() { ss.run() - ss.listenForNewBlocks() exitRoutine <- true }() diff --git a/beacon-chain/sync/initial-sync/sync_state.go b/beacon-chain/sync/initial-sync/sync_state.go index 87bb1ddfcee5..ac05ddc441c4 100644 --- a/beacon-chain/sync/initial-sync/sync_state.go +++ b/beacon-chain/sync/initial-sync/sync_state.go @@ -2,8 +2,8 @@ package initialsync import ( "context" - "github.com/libp2p/go-libp2p-peer" + peer "github.com/libp2p/go-libp2p-peer" "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/beacon-chain/core/validators" diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index 0a5722c08f2e..86611dd0f650 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -2,10 +2,11 @@ package sync import ( "context" - "github.com/libp2p/go-libp2p-peer" "math/big" "time" + peer "github.com/libp2p/go-libp2p-peer" + "github.com/ethereum/go-ethereum/common" "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -58,7 +59,7 @@ type Querier struct { powchain powChainService chainStarted bool atGenesis bool - bestPeer peer.ID + bestPeer peer.ID } // NewQuerierService constructs a new Sync Querier Service. @@ -170,11 +171,11 @@ func (q *Querier) run() { response := msg.Data.(*pb.ChainHeadResponse) peers := q.p2p.Peers() peerResponses++ - if peerResponses != len(peers) { - if response.CanonicalSlot > q.currentHeadSlot { - q.currentHeadSlot = response.CanonicalSlot + if peerResponses != len(peers) { + if response.CanonicalSlot > q.currentHeadSlot { + q.currentHeadSlot = response.CanonicalSlot } - continue + continue } if response.CanonicalSlot > q.currentHeadSlot { q.currentHeadSlot = response.CanonicalSlot diff --git a/beacon-chain/sync/regular_sync.go b/beacon-chain/sync/regular_sync.go index 5d7d407addb8..bb1af8669a51 100644 --- a/beacon-chain/sync/regular_sync.go +++ b/beacon-chain/sync/regular_sync.go @@ -5,10 +5,11 @@ import ( "context" "errors" "fmt" - "github.com/libp2p/go-libp2p-peer" "runtime/debug" "sync" + peer "github.com/libp2p/go-libp2p-peer" + "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" diff --git a/beacon-chain/sync/regular_sync_test.go b/beacon-chain/sync/regular_sync_test.go index 5d54d8dcfc49..f7b90239b461 100644 --- a/beacon-chain/sync/regular_sync_test.go +++ b/beacon-chain/sync/regular_sync_test.go @@ -47,6 +47,10 @@ func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) return nil } +func (mp *mockP2P) Peers() peer.IDSlice { + return []peer.ID{} +} + type mockChainService struct { bFeed *event.Feed sFeed *event.Feed diff --git a/beacon-chain/sync/simulated_sync_test.go b/beacon-chain/sync/simulated_sync_test.go index 1ca82a10b77e..774ab9ac8440 100644 --- a/beacon-chain/sync/simulated_sync_test.go +++ b/beacon-chain/sync/simulated_sync_test.go @@ -69,6 +69,10 @@ func (sim *simulatedP2P) Send(ctx context.Context, msg proto.Message, peerID pee return nil } +func (sim *simulatedP2P) Peers() peer.IDSlice { + return []peer.ID{} +} + func setupSimBackendAndDB(t *testing.T) (*backend.SimulatedBackend, *db.BeaconDB, []*bls.SecretKey) { ctx := context.Background() diff --git a/shared/p2p/service.go b/shared/p2p/service.go index e82bad22520f..7aae039446a7 100644 --- a/shared/p2p/service.go +++ b/shared/p2p/service.go @@ -15,12 +15,12 @@ import ( ds "github.com/ipfs/go-datastore" dsync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-host" + host "github.com/libp2p/go-libp2p-host" kaddht "github.com/libp2p/go-libp2p-kad-dht" libp2pnet "github.com/libp2p/go-libp2p-net" - "github.com/libp2p/go-libp2p-peer" - "github.com/libp2p/go-libp2p-protocol" - "github.com/libp2p/go-libp2p-pubsub" + peer "github.com/libp2p/go-libp2p-peer" + protocol "github.com/libp2p/go-libp2p-protocol" + pubsub "github.com/libp2p/go-libp2p-pubsub" rhost "github.com/libp2p/go-libp2p/p2p/host/routed" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/event" @@ -184,7 +184,7 @@ func (s *Server) Status() error { return nil } -// Peers -- +// Peers returns a slice of currently connected peer id's to be used throughout the runtime. func (s *Server) Peers() peer.IDSlice { return s.host.Peerstore().Peers() } From 69204230edd83217f8e02ce95822204896861b63 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 25 Apr 2019 00:00:46 -0500 Subject: [PATCH 04/15] rem old tests --- beacon-chain/sync/BUILD.bazel | 2 - beacon-chain/sync/simulated_sync_test.go | 294 ----------------------- 2 files changed, 296 deletions(-) delete mode 100644 beacon-chain/sync/simulated_sync_test.go diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 42a88a8cc73f..5a84bbdf74b8 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -40,11 +40,9 @@ go_test( "receive_block_test.go", "regular_sync_test.go", "service_test.go", - "simulated_sync_test.go", ], embed = [":go_default_library"], deps = [ - "//beacon-chain/chaintest/backend:go_default_library", "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/internal:go_default_library", diff --git a/beacon-chain/sync/simulated_sync_test.go b/beacon-chain/sync/simulated_sync_test.go deleted file mode 100644 index 774ab9ac8440..000000000000 --- a/beacon-chain/sync/simulated_sync_test.go +++ /dev/null @@ -1,294 +0,0 @@ -package sync - -import ( - "context" - "reflect" - "sync" - "testing" - "time" - - "github.com/gogo/protobuf/proto" - peer "github.com/libp2p/go-libp2p-peer" - "github.com/prysmaticlabs/prysm/beacon-chain/chaintest/backend" - "github.com/prysmaticlabs/prysm/beacon-chain/db" - pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" - "github.com/prysmaticlabs/prysm/shared/bls" - "github.com/prysmaticlabs/prysm/shared/event" - "github.com/prysmaticlabs/prysm/shared/hashutil" - "github.com/prysmaticlabs/prysm/shared/p2p" - "github.com/prysmaticlabs/prysm/shared/params" -) - -type simulatedP2P struct { - subsChannels map[reflect.Type]*event.Feed - mutex *sync.RWMutex - ctx context.Context -} - -func (sim *simulatedP2P) Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription { - sim.mutex.Lock() - defer sim.mutex.Unlock() - - protoType := reflect.TypeOf(msg) - - feed, ok := sim.subsChannels[protoType] - if !ok { - nFeed := new(event.Feed) - sim.subsChannels[protoType] = nFeed - return nFeed.Subscribe(channel) - } - return feed.Subscribe(channel) -} - -func (sim *simulatedP2P) Broadcast(_ context.Context, msg proto.Message) { - sim.mutex.Lock() - defer sim.mutex.Unlock() - - protoType := reflect.TypeOf(msg) - - feed, ok := sim.subsChannels[protoType] - if !ok { - return - } - - feed.Send(p2p.Message{Ctx: sim.ctx, Data: msg}) -} - -func (sim *simulatedP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) error { - sim.mutex.Lock() - defer sim.mutex.Unlock() - - protoType := reflect.TypeOf(msg) - - feed, ok := sim.subsChannels[protoType] - if !ok { - return nil - } - - feed.Send(p2p.Message{Ctx: sim.ctx, Data: msg}) - return nil -} - -func (sim *simulatedP2P) Peers() peer.IDSlice { - return []peer.ID{} -} - -func setupSimBackendAndDB(t *testing.T) (*backend.SimulatedBackend, *db.BeaconDB, []*bls.SecretKey) { - ctx := context.Background() - - bd, err := backend.NewSimulatedBackend() - if err != nil { - t.Fatalf("Could not set up simulated backend %v", err) - } - - privKeys, err := bd.SetupBackend(100) - if err != nil { - t.Fatalf("Could not set up backend %v", err) - } - - beacondb, err := db.SetupDB() - if err != nil { - t.Fatalf("Could not setup beacon db %v", err) - } - - memBlocks := bd.InMemoryBlocks() - if err := beacondb.SaveBlock(memBlocks[0]); err != nil { - t.Fatalf("Could not save block %v", err) - } - if err := beacondb.SaveJustifiedBlock(memBlocks[0]); err != nil { - t.Fatalf("Could not save block %v", err) - } - if err := beacondb.SaveFinalizedBlock(memBlocks[0]); err != nil { - t.Fatalf("Could not save block %v", err) - } - - state := bd.State() - state.LatestBlock = memBlocks[0] - state.LatestEth1Data = &pb.Eth1Data{ - BlockHash32: []byte{}, - } - - if err := beacondb.SaveState(ctx, state); err != nil { - t.Fatalf("Could not save state %v", err) - } - if err := beacondb.SaveJustifiedState(state); err != nil { - t.Fatalf("Could not save state %v", err) - } - if err := beacondb.SaveFinalizedState(state); err != nil { - t.Fatalf("Could not save state %v", err) - } - - if err := beacondb.UpdateChainHead(ctx, memBlocks[0], state); err != nil { - t.Fatalf("Could not update chain head %v", err) - } - - return bd, beacondb, privKeys -} - -func setUpSyncedService(numOfBlocks int, simP2P *simulatedP2P, t *testing.T) (*Service, *db.BeaconDB, [32]byte) { - bd, beacondb, _ := setupSimBackendAndDB(t) - defer bd.Shutdown() - defer db.TeardownDB(bd.DB()) - ctx := context.Background() - - mockPow := &genesisPowChain{ - feed: new(event.Feed), - } - - mockChain := &mockChainService{ - bFeed: new(event.Feed), - sFeed: new(event.Feed), - cFeed: new(event.Feed), - db: bd.DB(), - } - - cfg := &Config{ - ChainService: mockChain, - BeaconDB: beacondb, - OperationService: &mockOperationService{}, - P2P: simP2P, - PowChainService: mockPow, - } - - ss := NewSyncService(context.Background(), cfg) - - go ss.run() - for !ss.Querier.chainStarted { - mockChain.sFeed.Send(time.Now()) - } - - state, err := beacondb.HeadState(ctx) - if err != nil { - t.Fatal(err) - } - inMemoryBlocks := bd.InMemoryBlocks() - genesisBlock := inMemoryBlocks[0] - stateRoot, err := hashutil.HashProto(state) - if err != nil { - t.Fatal(err) - } - parentRoot, err := hashutil.HashBeaconBlock(genesisBlock) - if err != nil { - t.Fatal(err) - } - - for i := 1; i <= numOfBlocks; i++ { - block := &pb.BeaconBlock{ - Slot: params.BeaconConfig().GenesisSlot + uint64(i), - ParentRootHash32: parentRoot[:], - StateRootHash32: stateRoot[:], - } - state, err = mockChain.ApplyBlockStateTransition(ctx, block, state) - if err != nil { - t.Fatal(err) - } - stateRoot, err = hashutil.HashProto(state) - if err != nil { - t.Fatal(err) - } - parentRoot, err = hashutil.HashBeaconBlock(block) - if err := mockChain.CleanupBlockOperations(ctx, block); err != nil { - t.Fatal(err) - } - if err := beacondb.SaveBlock(block); err != nil { - t.Fatal(err) - } - if err := beacondb.UpdateChainHead(ctx, block, state); err != nil { - t.Fatal(err) - } - } - return ss, beacondb, stateRoot -} - -func setUpUnSyncedService(simP2P *simulatedP2P, stateRoot [32]byte, t *testing.T) (*Service, *db.BeaconDB) { - bd, beacondb, _ := setupSimBackendAndDB(t) - defer bd.Shutdown() - defer db.TeardownDB(bd.DB()) - - mockPow := &afterGenesisPowChain{ - feed: new(event.Feed), - } - - mockChain := &mockChainService{ - bFeed: new(event.Feed), - sFeed: new(event.Feed), - cFeed: new(event.Feed), - db: bd.DB(), - } - - cfg := &Config{ - ChainService: mockChain, - BeaconDB: beacondb, - OperationService: &mockOperationService{}, - P2P: simP2P, - PowChainService: mockPow, - } - - ss := NewSyncService(context.Background(), cfg) - - go ss.run() - ss.Querier.chainStarted = true - ss.Querier.atGenesis = false - - for ss.Querier.currentHeadSlot == 0 { - simP2P.Send(simP2P.ctx, &pb.ChainHeadResponse{ - CanonicalSlot: params.BeaconConfig().GenesisSlot + 12, - CanonicalStateRootHash32: stateRoot[:], - }, "") - } - - return ss, beacondb -} - -func TestSyncing_AFullySyncedNode(t *testing.T) { - numOfBlocks := 12 - ctx := context.Background() - newP2P := &simulatedP2P{ - subsChannels: make(map[reflect.Type]*event.Feed), - mutex: new(sync.RWMutex), - ctx: ctx, - } - - // Sets up a synced service which has its head at the current - // numOfBlocks from genesis. The blocks are generated through - // simulated backend. - ss, syncedDB, stateRoot := setUpSyncedService(numOfBlocks, newP2P, t) - defer ss.Stop() - defer db.TeardownDB(syncedDB) - - // Sets up a sync service which has its current head at genesis. - us, unSyncedDB := setUpUnSyncedService(newP2P, stateRoot, t) - defer us.Stop() - defer db.TeardownDB(unSyncedDB) - - us2, unSyncedDB2 := setUpUnSyncedService(newP2P, stateRoot, t) - defer us2.Stop() - defer db.TeardownDB(unSyncedDB2) - - finalized, err := syncedDB.FinalizedState() - if err != nil { - t.Fatal(err) - } - - newP2P.Send(newP2P.ctx, &pb.BeaconStateResponse{ - FinalizedState: finalized, - }, "") - - timeout := time.After(10 * time.Second) - tick := time.Tick(200 * time.Millisecond) -loop: - for { - select { - case <-timeout: - t.Error("Could not sync in time") - break loop - case <-tick: - _, slot1 := us.InitialSync.NodeIsSynced() - _, slot2 := us2.InitialSync.NodeIsSynced() - if slot1 == uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot || - slot2 == uint64(numOfBlocks)+params.BeaconConfig().GenesisSlot { - break loop - } - } - } -} From 47c499acd3c785e7b1a323d0a7dfa063031bcfd5 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 25 Apr 2019 12:00:27 -0500 Subject: [PATCH 05/15] req peers tests pass --- beacon-chain/sync/regular_sync_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/regular_sync_test.go b/beacon-chain/sync/regular_sync_test.go index f7b90239b461..d8a35768d302 100644 --- a/beacon-chain/sync/regular_sync_test.go +++ b/beacon-chain/sync/regular_sync_test.go @@ -48,7 +48,7 @@ func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) } func (mp *mockP2P) Peers() peer.IDSlice { - return []peer.ID{} + return make([]peer.ID, 1) } type mockChainService struct { From 2007b694df89cb490abf9041825188731cd059f7 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 25 Apr 2019 12:07:26 -0500 Subject: [PATCH 06/15] warn level support for peer not having the same protocol --- beacon-chain/sync/querier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index 86611dd0f650..3fc27d53e62c 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -164,7 +164,7 @@ func (q *Querier) run() { peers := q.p2p.Peers() for _, p := range peers { if err := q.RequestLatestHead(p); err != nil { - log.Errorf("Could not request head from peer: %v", err) + log.Warnf("Could not request head from peer %d: %v", p, err) } } case msg := <-q.responseBuf: From d49b2af51a16e7f27987b67191ba0d2905546eb9 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 25 Apr 2019 12:09:01 -0500 Subject: [PATCH 07/15] pretty --- beacon-chain/sync/querier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index 3fc27d53e62c..f391d51f70ae 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -164,7 +164,7 @@ func (q *Querier) run() { peers := q.p2p.Peers() for _, p := range peers { if err := q.RequestLatestHead(p); err != nil { - log.Warnf("Could not request head from peer %d: %v", p, err) + log.Warnf("Could not request head from peer %s: %v", p.Pretty(), err) } } case msg := <-q.responseBuf: From 9a9809dc7cede4066b85351cd85442e501638413 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 25 Apr 2019 12:10:57 -0500 Subject: [PATCH 08/15] best peer --- beacon-chain/sync/querier.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index f391d51f70ae..998bd4d4d873 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -164,7 +164,7 @@ func (q *Querier) run() { peers := q.p2p.Peers() for _, p := range peers { if err := q.RequestLatestHead(p); err != nil { - log.Warnf("Could not request head from peer %s: %v", p.Pretty(), err) + queryLog.Warnf("Could not request head from peer %s: %v", p.Pretty(), err) } } case msg := <-q.responseBuf: @@ -180,6 +180,7 @@ func (q *Querier) run() { if response.CanonicalSlot > q.currentHeadSlot { q.currentHeadSlot = response.CanonicalSlot } + queryLog.Infof("Received chain head from best peer: %s", msg.Peer.Pretty()) queryLog.Infof( "Latest chain head is at slot: %d and state root: %#x", response.CanonicalSlot-params.BeaconConfig().GenesisSlot, response.CanonicalStateRootHash32, From ae4e008d00a187a0d695fe21cd39768d8ea957f3 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 25 Apr 2019 12:24:12 -0500 Subject: [PATCH 09/15] comments --- beacon-chain/sync/initial-sync/service.go | 2 -- beacon-chain/sync/querier.go | 7 +++---- beacon-chain/sync/regular_sync.go | 1 - shared/p2p/service.go | 2 +- 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index bfe877eb6231..9b387b807990 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -18,9 +18,7 @@ import ( "time" peer "github.com/libp2p/go-libp2p-peer" - "github.com/prysmaticlabs/prysm/shared/hashutil" - "github.com/ethereum/go-ethereum/common" "github.com/gogo/protobuf/proto" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index 998bd4d4d873..ee9269f5693f 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -6,7 +6,6 @@ import ( "time" peer "github.com/libp2p/go-libp2p-peer" - "github.com/ethereum/go-ethereum/common" "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -163,10 +162,10 @@ func (q *Querier) run() { case <-ticker.C: peers := q.p2p.Peers() for _, p := range peers { - if err := q.RequestLatestHead(p); err != nil { - queryLog.Warnf("Could not request head from peer %s: %v", p.Pretty(), err) - } + if err := q.RequestLatestHead(p); err != nil { + queryLog.Warnf("Could not request head from peer %s: %v", p.Pretty(), err) } + } case msg := <-q.responseBuf: response := msg.Data.(*pb.ChainHeadResponse) peers := q.p2p.Peers() diff --git a/beacon-chain/sync/regular_sync.go b/beacon-chain/sync/regular_sync.go index bb1af8669a51..3821b04a2801 100644 --- a/beacon-chain/sync/regular_sync.go +++ b/beacon-chain/sync/regular_sync.go @@ -9,7 +9,6 @@ import ( "sync" peer "github.com/libp2p/go-libp2p-peer" - "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" diff --git a/shared/p2p/service.go b/shared/p2p/service.go index 7aae039446a7..d1bcf2a43500 100644 --- a/shared/p2p/service.go +++ b/shared/p2p/service.go @@ -186,7 +186,7 @@ func (s *Server) Status() error { // Peers returns a slice of currently connected peer id's to be used throughout the runtime. func (s *Server) Peers() peer.IDSlice { - return s.host.Peerstore().Peers() + return s.host.Network().Peers() } // RegisterTopic with a message and the adapter stack for the given topic. The From eebfce73783de22885dc79fed415bc315e388c47 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 25 Apr 2019 12:44:29 -0500 Subject: [PATCH 10/15] req peers done --- beacon-chain/sync/querier.go | 47 ++++++++++---------------- beacon-chain/sync/regular_sync.go | 2 -- beacon-chain/sync/regular_sync_test.go | 4 --- shared/p2p/service.go | 5 --- 4 files changed, 17 insertions(+), 41 deletions(-) diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index ee9269f5693f..85d8f7e638e9 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -5,7 +5,7 @@ import ( "math/big" "time" - peer "github.com/libp2p/go-libp2p-peer" + "github.com/libp2p/go-libp2p-peer" "github.com/ethereum/go-ethereum/common" "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -153,54 +153,41 @@ func (q *Querier) run() { ticker.Stop() }() - peerResponses := 0 + timeout := time.After(5 * time.Second) for { select { case <-q.ctx.Done(): queryLog.Info("Exiting goroutine") return case <-ticker.C: - peers := q.p2p.Peers() - for _, p := range peers { - if err := q.RequestLatestHead(p); err != nil { - queryLog.Warnf("Could not request head from peer %s: %v", p.Pretty(), err) - } - } - case msg := <-q.responseBuf: - response := msg.Data.(*pb.ChainHeadResponse) - peers := q.p2p.Peers() - peerResponses++ - if peerResponses != len(peers) { - if response.CanonicalSlot > q.currentHeadSlot { - q.currentHeadSlot = response.CanonicalSlot - } - continue - } - if response.CanonicalSlot > q.currentHeadSlot { - q.currentHeadSlot = response.CanonicalSlot - } - queryLog.Infof("Received chain head from best peer: %s", msg.Peer.Pretty()) + q.RequestLatestHead() + case <-timeout: + queryLog.Infof("Peer with highest canonical head: %v", q.bestPeer.Pretty()) queryLog.Infof( "Latest chain head is at slot: %d and state root: %#x", - response.CanonicalSlot-params.BeaconConfig().GenesisSlot, response.CanonicalStateRootHash32, + q.currentHeadSlot-params.BeaconConfig().GenesisSlot, q.currentStateRoot, ) - q.currentHeadSlot = response.CanonicalSlot - q.currentStateRoot = response.CanonicalStateRootHash32 - q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S) - q.bestPeer = msg.Peer - ticker.Stop() responseSub.Unsubscribe() q.cancel() + case msg := <-q.responseBuf: + response := msg.Data.(*pb.ChainHeadResponse) + if response.CanonicalSlot > q.currentHeadSlot { + q.currentHeadSlot = response.CanonicalSlot + q.bestPeer = msg.Peer + q.currentHeadSlot = response.CanonicalSlot + q.currentStateRoot = response.CanonicalStateRootHash32 + q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S) + } } } } // RequestLatestHead sends a request for all // the latest chain head slot and state root to a peer. -func (q *Querier) RequestLatestHead(p peer.ID) error { +func (q *Querier) RequestLatestHead() { request := &pb.ChainHeadRequest{} - return q.p2p.Send(context.Background(), request, p) + q.p2p.Broadcast(context.Background(), request) } // IsSynced checks if the node is currently synced with the diff --git a/beacon-chain/sync/regular_sync.go b/beacon-chain/sync/regular_sync.go index 2f20b16389f3..b62a188d70d1 100644 --- a/beacon-chain/sync/regular_sync.go +++ b/beacon-chain/sync/regular_sync.go @@ -8,7 +8,6 @@ import ( "runtime/debug" "sync" - peer "github.com/libp2p/go-libp2p-peer" "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -49,7 +48,6 @@ type p2pAPI interface { p2p.Broadcaster p2p.Sender p2p.Subscriber - Peers() peer.IDSlice } // RegularSync is the gateway and the bridge between the p2p network and the local beacon chain. diff --git a/beacon-chain/sync/regular_sync_test.go b/beacon-chain/sync/regular_sync_test.go index af5a9485d0a5..200077a9fe51 100644 --- a/beacon-chain/sync/regular_sync_test.go +++ b/beacon-chain/sync/regular_sync_test.go @@ -47,10 +47,6 @@ func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) return nil } -func (mp *mockP2P) Peers() peer.IDSlice { - return make([]peer.ID, 1) -} - type mockChainService struct { bFeed *event.Feed sFeed *event.Feed diff --git a/shared/p2p/service.go b/shared/p2p/service.go index d1bcf2a43500..18e2d62fb4d4 100644 --- a/shared/p2p/service.go +++ b/shared/p2p/service.go @@ -184,11 +184,6 @@ func (s *Server) Status() error { return nil } -// Peers returns a slice of currently connected peer id's to be used throughout the runtime. -func (s *Server) Peers() peer.IDSlice { - return s.host.Network().Peers() -} - // RegisterTopic with a message and the adapter stack for the given topic. The // message type provided will be feed selector for emitting messages received // on a given topic. From 9677a569c1cb0ba9670715ad8f954855cebb2140 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 25 Apr 2019 12:49:03 -0500 Subject: [PATCH 11/15] tests passing --- beacon-chain/sync/initial-sync/service.go | 4 ++-- beacon-chain/sync/querier.go | 2 +- beacon-chain/sync/querier_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 9b387b807990..32822b24a3ff 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -17,14 +17,14 @@ import ( "sync" "time" - peer "github.com/libp2p/go-libp2p-peer" - "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/ethereum/go-ethereum/common" "github.com/gogo/protobuf/proto" + peer "github.com/libp2p/go-libp2p-peer" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/event" + "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/shared/p2p" "github.com/prysmaticlabs/prysm/shared/params" "github.com/sirupsen/logrus" diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index 85d8f7e638e9..4a4181dd4f18 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -5,8 +5,8 @@ import ( "math/big" "time" - "github.com/libp2p/go-libp2p-peer" "github.com/ethereum/go-ethereum/common" + peer "github.com/libp2p/go-libp2p-peer" "github.com/prysmaticlabs/prysm/beacon-chain/db" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/bytesutil" diff --git a/beacon-chain/sync/querier_test.go b/beacon-chain/sync/querier_test.go index 3dc8d9ba6107..a406c0150afd 100644 --- a/beacon-chain/sync/querier_test.go +++ b/beacon-chain/sync/querier_test.go @@ -146,7 +146,7 @@ func TestQuerier_ChainReqResponse(t *testing.T) { }() response := &pb.ChainHeadResponse{ - CanonicalSlot: 0, + CanonicalSlot: 1, CanonicalStateRootHash32: []byte{'a', 'b'}, } From 513b50938060e1df8319da2a12eecebfc463c423 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 25 Apr 2019 12:55:31 -0500 Subject: [PATCH 12/15] fully functional --- beacon-chain/sync/querier.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index 4a4181dd4f18..38c497abe71c 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -172,6 +172,10 @@ func (q *Querier) run() { q.cancel() case msg := <-q.responseBuf: response := msg.Data.(*pb.ChainHeadResponse) + queryLog.WithFields(logrus.Fields{ + "peerID": msg.Peer.Pretty(), + "highestSlot": response.CanonicalSlot - params.BeaconConfig().GenesisSlot, + }).Info("Received chain head from peer") if response.CanonicalSlot > q.currentHeadSlot { q.currentHeadSlot = response.CanonicalSlot q.bestPeer = msg.Peer From d3ef31567be3491d956b0598cd2476128612671f Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 25 Apr 2019 18:06:20 -0500 Subject: [PATCH 13/15] enforce receiving from the best peer --- beacon-chain/sync/initial-sync/service.go | 2 +- beacon-chain/sync/initial-sync/sync_blocks.go | 6 ++++++ beacon-chain/sync/initial-sync/sync_state.go | 5 ++--- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 32822b24a3ff..9c7e1464877f 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -296,7 +296,7 @@ func (s *InitialSync) run() { }() // We send out a state request to all peers. - if err := s.requestStateFromPeer(s.ctx, s.finalizedStateRoot, s.bestPeer); err != nil { + if err := s.requestStateFromPeer(s.ctx, s.finalizedStateRoot); err != nil { log.Errorf("Could not request state from peer %v", err) } diff --git a/beacon-chain/sync/initial-sync/sync_blocks.go b/beacon-chain/sync/initial-sync/sync_blocks.go index 84b5eeb97c6d..de54f38fc872 100644 --- a/beacon-chain/sync/initial-sync/sync_blocks.go +++ b/beacon-chain/sync/initial-sync/sync_blocks.go @@ -93,6 +93,12 @@ func (s *InitialSync) processBatchedBlocks(msg p2p.Message) { // Do not process empty responses. return } + if msg.Peer != s.bestPeer { + // Only process batch block responses that come from the best peer + // we originally synced with. + log.Debugf("Received batch blocks from a different peer: %s", msg.Peer.Pretty()) + return + } log.Debug("Processing batched block response") for _, block := range batchedBlocks { diff --git a/beacon-chain/sync/initial-sync/sync_state.go b/beacon-chain/sync/initial-sync/sync_state.go index ac05ddc441c4..09f4d395a044 100644 --- a/beacon-chain/sync/initial-sync/sync_state.go +++ b/beacon-chain/sync/initial-sync/sync_state.go @@ -3,7 +3,6 @@ package initialsync import ( "context" - peer "github.com/libp2p/go-libp2p-peer" "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/beacon-chain/core/validators" @@ -97,11 +96,11 @@ func (s *InitialSync) processState(msg p2p.Message) { } // requestStateFromPeer requests for the canonical state, finalized state, and justified state from a peer. -func (s *InitialSync) requestStateFromPeer(ctx context.Context, lastFinalizedRoot [32]byte, peerID peer.ID) error { +func (s *InitialSync) requestStateFromPeer(ctx context.Context, lastFinalizedRoot [32]byte) error { ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestStateFromPeer") defer span.End() stateReq.Inc() return s.p2p.Send(ctx, &pb.BeaconStateRequest{ FinalizedStateRootHash32S: lastFinalizedRoot[:], - }, peerID) + }, s.bestPeer) } From 11ce6adec9c271099a39bf0edf62b4e6fbe19c84 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Thu, 25 Apr 2019 23:09:14 -0500 Subject: [PATCH 14/15] comments --- beacon-chain/sync/initial-sync/sync_blocks.go | 2 +- beacon-chain/sync/querier.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/beacon-chain/sync/initial-sync/sync_blocks.go b/beacon-chain/sync/initial-sync/sync_blocks.go index de54f38fc872..8933bd0520db 100644 --- a/beacon-chain/sync/initial-sync/sync_blocks.go +++ b/beacon-chain/sync/initial-sync/sync_blocks.go @@ -96,7 +96,7 @@ func (s *InitialSync) processBatchedBlocks(msg p2p.Message) { if msg.Peer != s.bestPeer { // Only process batch block responses that come from the best peer // we originally synced with. - log.Debugf("Received batch blocks from a different peer: %s", msg.Peer.Pretty()) + log.WithField("peerID", msg.Peer.Pretty()).Debug("Received batch blocks from a different peer") return } diff --git a/beacon-chain/sync/querier.go b/beacon-chain/sync/querier.go index 38c497abe71c..7cd857a1169a 100644 --- a/beacon-chain/sync/querier.go +++ b/beacon-chain/sync/querier.go @@ -162,7 +162,7 @@ func (q *Querier) run() { case <-ticker.C: q.RequestLatestHead() case <-timeout: - queryLog.Infof("Peer with highest canonical head: %v", q.bestPeer.Pretty()) + queryLog.WithField("peerID", q.bestPeer.Pretty()).Info("Peer with highest canonical head") queryLog.Infof( "Latest chain head is at slot: %d and state root: %#x", q.currentHeadSlot-params.BeaconConfig().GenesisSlot, q.currentStateRoot, @@ -187,7 +187,7 @@ func (q *Querier) run() { } } -// RequestLatestHead sends a request for all +// RequestLatestHead broadcasts a request for // the latest chain head slot and state root to a peer. func (q *Querier) RequestLatestHead() { request := &pb.ChainHeadRequest{} From 75f32a9c94d458bbdf610aadbc60e7fc7014f96c Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Fri, 26 Apr 2019 09:32:53 -0500 Subject: [PATCH 15/15] lint fixes --- beacon-chain/sync/initial-sync/metrics.go | 4 ---- beacon-chain/sync/initial-sync/sync_blocks.go | 12 ------------ beacon-chain/sync/regular_sync_test.go | 1 - 3 files changed, 17 deletions(-) diff --git a/beacon-chain/sync/initial-sync/metrics.go b/beacon-chain/sync/initial-sync/metrics.go index ebea942d8f0d..c3052c93c266 100644 --- a/beacon-chain/sync/initial-sync/metrics.go +++ b/beacon-chain/sync/initial-sync/metrics.go @@ -19,10 +19,6 @@ var ( Name: "initsync_received_blocks", Help: "The number of received blocks", }) - recBlockAnnounce = promauto.NewCounter(prometheus.CounterOpts{ - Name: "initsync_received_block_announce", - Help: "The number of received block announce", - }) stateReq = promauto.NewCounter(prometheus.CounterOpts{ Name: "initsync_state_req", Help: "The number of sent state requests", diff --git a/beacon-chain/sync/initial-sync/sync_blocks.go b/beacon-chain/sync/initial-sync/sync_blocks.go index 8933bd0520db..1bae9e94ca9e 100644 --- a/beacon-chain/sync/initial-sync/sync_blocks.go +++ b/beacon-chain/sync/initial-sync/sync_blocks.go @@ -14,18 +14,6 @@ import ( "go.opencensus.io/trace" ) -func (s *InitialSync) processBlockAnnounce(msg p2p.Message) { - _, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBlockAnnounce") - defer span.End() - data := msg.Data.(*pb.BeaconBlockAnnounce) - recBlockAnnounce.Inc() - - if s.stateReceived && data.SlotNumber > s.highestObservedSlot { - s.requestBatchedBlocks(s.lastRequestedSlot, data.SlotNumber) - s.lastRequestedSlot = data.SlotNumber - } -} - // processBlock is the main method that validates each block which is received // for initial sync. It checks if the blocks are valid and then will continue to // process and save it into the db. diff --git a/beacon-chain/sync/regular_sync_test.go b/beacon-chain/sync/regular_sync_test.go index 200077a9fe51..a7ff031de814 100644 --- a/beacon-chain/sync/regular_sync_test.go +++ b/beacon-chain/sync/regular_sync_test.go @@ -48,7 +48,6 @@ func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID) } type mockChainService struct { - bFeed *event.Feed sFeed *event.Feed cFeed *event.Feed db *db.BeaconDB