Skip to content

Commit

Permalink
Only Sync With the Peer With the Highest Observed Slot (#2280)
Browse files Browse the repository at this point in the history
* only accept the highest finalized slot from peers

* use best peer for state requests

* peer id

* rem old tests

* req peers tests pass

* warn level support for peer not having the same protocol

* pretty

* best peer

* comments

* req peers done

* tests passing

* fully functional

* enforce receiving from the best peer

* comments

* lint fixes
  • Loading branch information
rauljordan authored Apr 26, 2019
1 parent a170c69 commit 81c8b13
Show file tree
Hide file tree
Showing 13 changed files with 42 additions and 345 deletions.
3 changes: 1 addition & 2 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//shared/params:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
Expand All @@ -39,11 +40,9 @@ go_test(
"receive_block_test.go",
"regular_sync_test.go",
"service_test.go",
"simulated_sync_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/chaintest/backend:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/internal:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//shared/params:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
Expand Down
4 changes: 0 additions & 4 deletions beacon-chain/sync/initial-sync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ var (
Name: "initsync_received_blocks",
Help: "The number of received blocks",
})
recBlockAnnounce = promauto.NewCounter(prometheus.CounterOpts{
Name: "initsync_received_block_announce",
Help: "The number of received block announce",
})
stateReq = promauto.NewCounter(prometheus.CounterOpts{
Name: "initsync_state_req",
Help: "The number of sent state requests",
Expand Down
30 changes: 9 additions & 21 deletions beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import (
"sync"
"time"

"github.com/prysmaticlabs/prysm/shared/hashutil"

"github.com/ethereum/go-ethereum/common"
"github.com/gogo/protobuf/proto"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -110,6 +110,7 @@ type InitialSync struct {
finalizedStateRoot [32]byte
mutex *sync.Mutex
nodeIsSynced bool
bestPeer peer.ID
}

// NewInitialSyncService constructs a new InitialSyncService.
Expand Down Expand Up @@ -155,7 +156,6 @@ func (s *InitialSync) Start() {
}
s.currentSlot = cHead.Slot
go s.run()
go s.listenForNewBlocks()
go s.checkInMemoryBlocks()
}

Expand All @@ -181,6 +181,11 @@ func (s *InitialSync) InitializeFinalizedStateRoot(root [32]byte) {
s.finalizedStateRoot = root
}

// InitializeBestPeer sets the peer ID of the highest observed peer.
func (s *InitialSync) InitializeBestPeer(p peer.ID) {
s.bestPeer = p
}

// HighestObservedSlot returns the highest observed slot.
func (s *InitialSync) HighestObservedSlot() uint64 {
return s.highestObservedSlot
Expand Down Expand Up @@ -274,24 +279,6 @@ func (s *InitialSync) checkInMemoryBlocks() {
}
}

// listenForNewBlocks listens for block announcements beyond the canonical head slot that may
// be received during initial sync - we must process these blocks to catch up with peers.
func (s *InitialSync) listenForNewBlocks() {
blockAnnounceSub := s.p2p.Subscribe(&pb.BeaconBlockAnnounce{}, s.blockAnnounceBuf)
defer func() {
blockAnnounceSub.Unsubscribe()
close(s.blockAnnounceBuf)
}()
for {
select {
case <-s.ctx.Done():
return
case msg := <-s.blockAnnounceBuf:
safelyHandleMessage(s.processBlockAnnounce, msg)
}
}
}

// run is the main goroutine for the initial sync service.
// delayChan is explicitly passed into this function to facilitate tests that don't require a timeout.
// It is assumed that the goroutine `run` is only called once per instance.
Expand All @@ -308,6 +295,7 @@ func (s *InitialSync) run() {
close(s.stateBuf)
}()

// We send out a state request to all peers.
if err := s.requestStateFromPeer(s.ctx, s.finalizedStateRoot); err != nil {
log.Errorf("Could not request state from peer %v", err)
}
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/sync/initial-sync/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func TestSavingBlock_InSync(t *testing.T) {

go func() {
ss.run()
ss.listenForNewBlocks()
exitRoutine <- true
}()

Expand Down
18 changes: 6 additions & 12 deletions beacon-chain/sync/initial-sync/sync_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,6 @@ import (
"go.opencensus.io/trace"
)

func (s *InitialSync) processBlockAnnounce(msg p2p.Message) {
_, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.initial-sync.processBlockAnnounce")
defer span.End()
data := msg.Data.(*pb.BeaconBlockAnnounce)
recBlockAnnounce.Inc()

if s.stateReceived && data.SlotNumber > s.highestObservedSlot {
s.requestBatchedBlocks(s.lastRequestedSlot, data.SlotNumber)
s.lastRequestedSlot = data.SlotNumber
}
}

// processBlock is the main method that validates each block which is received
// for initial sync. It checks if the blocks are valid and then will continue to
// process and save it into the db.
Expand Down Expand Up @@ -93,6 +81,12 @@ func (s *InitialSync) processBatchedBlocks(msg p2p.Message) {
// Do not process empty responses.
return
}
if msg.Peer != s.bestPeer {
// Only process batch block responses that come from the best peer
// we originally synced with.
log.WithField("peerID", msg.Peer.Pretty()).Debug("Received batch blocks from a different peer")
return
}

log.Debug("Processing batched block response")
for _, block := range batchedBlocks {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/sync_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,5 @@ func (s *InitialSync) requestStateFromPeer(ctx context.Context, lastFinalizedRoo
stateReq.Inc()
return s.p2p.Send(ctx, &pb.BeaconStateRequest{
FinalizedStateRootHash32S: lastFinalizedRoot[:],
}, p2p.AnyPeer)
}, s.bestPeer)
}
32 changes: 21 additions & 11 deletions beacon-chain/sync/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
Expand Down Expand Up @@ -57,6 +58,7 @@ type Querier struct {
powchain powChainService
chainStarted bool
atGenesis bool
bestPeer peer.ID
}

// NewQuerierService constructs a new Sync Querier Service.
Expand Down Expand Up @@ -151,34 +153,42 @@ func (q *Querier) run() {
ticker.Stop()
}()

q.RequestLatestHead()

timeout := time.After(5 * time.Second)
for {
select {
case <-q.ctx.Done():
queryLog.Info("Exiting goroutine")
return
case <-ticker.C:
q.RequestLatestHead()
case msg := <-q.responseBuf:
response := msg.Data.(*pb.ChainHeadResponse)
case <-timeout:
queryLog.WithField("peerID", q.bestPeer.Pretty()).Info("Peer with highest canonical head")
queryLog.Infof(
"Latest chain head is at slot: %d and state root: %#x",
response.CanonicalSlot-params.BeaconConfig().GenesisSlot, response.CanonicalStateRootHash32,
q.currentHeadSlot-params.BeaconConfig().GenesisSlot, q.currentStateRoot,
)
q.currentHeadSlot = response.CanonicalSlot
q.currentStateRoot = response.CanonicalStateRootHash32
q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S)

ticker.Stop()
responseSub.Unsubscribe()
q.cancel()
case msg := <-q.responseBuf:
response := msg.Data.(*pb.ChainHeadResponse)
queryLog.WithFields(logrus.Fields{
"peerID": msg.Peer.Pretty(),
"highestSlot": response.CanonicalSlot - params.BeaconConfig().GenesisSlot,
}).Info("Received chain head from peer")
if response.CanonicalSlot > q.currentHeadSlot {
q.currentHeadSlot = response.CanonicalSlot
q.bestPeer = msg.Peer
q.currentHeadSlot = response.CanonicalSlot
q.currentStateRoot = response.CanonicalStateRootHash32
q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S)
}
}
}
}

// RequestLatestHead broadcasts out a request for all
// the latest chain heads from the node's peers.
// RequestLatestHead broadcasts a request for
// the latest chain head slot and state root to a peer.
func (q *Querier) RequestLatestHead() {
request := &pb.ChainHeadRequest{}
q.p2p.Broadcast(context.Background(), request)
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestQuerier_ChainReqResponse(t *testing.T) {
}()

response := &pb.ChainHeadResponse{
CanonicalSlot: 0,
CanonicalSlot: 1,
CanonicalStateRootHash32: []byte{'a', 'b'},
}

Expand Down
1 change: 0 additions & 1 deletion beacon-chain/sync/regular_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (mp *mockP2P) Send(ctx context.Context, msg proto.Message, peerID peer.ID)
}

type mockChainService struct {
bFeed *event.Feed
sFeed *event.Feed
cFeed *event.Feed
db *db.BeaconDB
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (ss *Service) run() {

// Sets the highest observed slot from querier.
ss.InitialSync.InitializeObservedSlot(ss.Querier.currentHeadSlot)
ss.InitialSync.InitializeBestPeer(ss.Querier.bestPeer)
ss.InitialSync.InitializeObservedStateRoot(bytesutil.ToBytes32(ss.Querier.currentStateRoot))
// Sets the state root of the highest observed slot.
ss.InitialSync.InitializeFinalizedStateRoot(ss.Querier.currentFinalizedStateRoot)
Expand Down
Loading

0 comments on commit 81c8b13

Please sign in to comment.