Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only Sync With the Peer With the Highest Observed Slot #2280

Merged
merged 22 commits into from
Apr 26, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//shared/params:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
Expand All @@ -39,11 +40,9 @@ go_test(
"receive_block_test.go",
"regular_sync_test.go",
"service_test.go",
"simulated_sync_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/chaintest/backend:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/internal:go_default_library",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//shared/params:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_libp2p_go_libp2p_peer//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
Expand Down
32 changes: 10 additions & 22 deletions beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ import (
"sync"
"time"

"github.com/prysmaticlabs/prysm/shared/hashutil"

"github.com/ethereum/go-ethereum/common"
"github.com/gogo/protobuf/proto"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/p2p"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -110,6 +110,7 @@ type InitialSync struct {
finalizedStateRoot [32]byte
mutex *sync.Mutex
nodeIsSynced bool
bestPeer peer.ID
}

// NewInitialSyncService constructs a new InitialSyncService.
Expand Down Expand Up @@ -155,7 +156,6 @@ func (s *InitialSync) Start() {
}
s.currentSlot = cHead.Slot
go s.run()
go s.listenForNewBlocks()
go s.checkInMemoryBlocks()
}

Expand All @@ -181,6 +181,11 @@ func (s *InitialSync) InitializeFinalizedStateRoot(root [32]byte) {
s.finalizedStateRoot = root
}

// InitializeBestPeer sets the peer ID of the highest observed peer.
func (s *InitialSync) InitializeBestPeer(p peer.ID) {
s.bestPeer = p
}

// HighestObservedSlot returns the highest observed slot.
func (s *InitialSync) HighestObservedSlot() uint64 {
return s.highestObservedSlot
Expand Down Expand Up @@ -274,24 +279,6 @@ func (s *InitialSync) checkInMemoryBlocks() {
}
}

// listenForNewBlocks listens for block announcements beyond the canonical head slot that may
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer relevant as this is handled by regular sync

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still should have it, otherwise the node will always have to play catch up whenever it starts regular sync

// be received during initial sync - we must process these blocks to catch up with peers.
func (s *InitialSync) listenForNewBlocks() {
blockAnnounceSub := s.p2p.Subscribe(&pb.BeaconBlockAnnounce{}, s.blockAnnounceBuf)
defer func() {
blockAnnounceSub.Unsubscribe()
close(s.blockAnnounceBuf)
}()
for {
select {
case <-s.ctx.Done():
return
case msg := <-s.blockAnnounceBuf:
safelyHandleMessage(s.processBlockAnnounce, msg)
}
}
}

// run is the main goroutine for the initial sync service.
// delayChan is explicitly passed into this function to facilitate tests that don't require a timeout.
// It is assumed that the goroutine `run` is only called once per instance.
Expand All @@ -308,7 +295,8 @@ func (s *InitialSync) run() {
close(s.stateBuf)
}()

if err := s.requestStateFromPeer(s.ctx, s.finalizedStateRoot); err != nil {
// We send out a state request to all peers.
if err := s.requestStateFromPeer(s.ctx, s.finalizedStateRoot, s.bestPeer); err != nil {
log.Errorf("Could not request state from peer %v", err)
}

Expand Down
1 change: 0 additions & 1 deletion beacon-chain/sync/initial-sync/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func TestSavingBlock_InSync(t *testing.T) {

go func() {
ss.run()
ss.listenForNewBlocks()
exitRoutine <- true
}()

Expand Down
5 changes: 3 additions & 2 deletions beacon-chain/sync/initial-sync/sync_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package initialsync
import (
"context"

peer "github.com/libp2p/go-libp2p-peer"
"github.com/prysmaticlabs/prysm/shared/hashutil"

"github.com/prysmaticlabs/prysm/beacon-chain/core/validators"
Expand Down Expand Up @@ -96,11 +97,11 @@ func (s *InitialSync) processState(msg p2p.Message) {
}

// requestStateFromPeer requests for the canonical state, finalized state, and justified state from a peer.
func (s *InitialSync) requestStateFromPeer(ctx context.Context, lastFinalizedRoot [32]byte) error {
func (s *InitialSync) requestStateFromPeer(ctx context.Context, lastFinalizedRoot [32]byte, peerID peer.ID) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should just use s.bestPeer here, since we are only going to sync with one peer

ctx, span := trace.StartSpan(ctx, "beacon-chain.sync.initial-sync.requestStateFromPeer")
defer span.End()
stateReq.Inc()
return s.p2p.Send(ctx, &pb.BeaconStateRequest{
FinalizedStateRootHash32S: lastFinalizedRoot[:],
}, p2p.AnyPeer)
}, peerID)
}
32 changes: 21 additions & 11 deletions beacon-chain/sync/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
Expand Down Expand Up @@ -57,6 +58,7 @@ type Querier struct {
powchain powChainService
chainStarted bool
atGenesis bool
bestPeer peer.ID
}

// NewQuerierService constructs a new Sync Querier Service.
Expand Down Expand Up @@ -151,34 +153,42 @@ func (q *Querier) run() {
ticker.Stop()
}()

q.RequestLatestHead()

timeout := time.After(5 * time.Second)
for {
select {
case <-q.ctx.Done():
queryLog.Info("Exiting goroutine")
return
case <-ticker.C:
q.RequestLatestHead()
case msg := <-q.responseBuf:
response := msg.Data.(*pb.ChainHeadResponse)
case <-timeout:
queryLog.Infof("Peer with highest canonical head: %v", q.bestPeer.Pretty())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should conform to WithFields logging standard, please

queryLog.Infof(
"Latest chain head is at slot: %d and state root: %#x",
response.CanonicalSlot-params.BeaconConfig().GenesisSlot, response.CanonicalStateRootHash32,
q.currentHeadSlot-params.BeaconConfig().GenesisSlot, q.currentStateRoot,
)
q.currentHeadSlot = response.CanonicalSlot
q.currentStateRoot = response.CanonicalStateRootHash32
q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S)

ticker.Stop()
responseSub.Unsubscribe()
q.cancel()
case msg := <-q.responseBuf:
response := msg.Data.(*pb.ChainHeadResponse)
queryLog.WithFields(logrus.Fields{
"peerID": msg.Peer.Pretty(),
"highestSlot": response.CanonicalSlot - params.BeaconConfig().GenesisSlot,
}).Info("Received chain head from peer")
if response.CanonicalSlot > q.currentHeadSlot {
q.currentHeadSlot = response.CanonicalSlot
q.bestPeer = msg.Peer
q.currentHeadSlot = response.CanonicalSlot
q.currentStateRoot = response.CanonicalStateRootHash32
q.currentFinalizedStateRoot = bytesutil.ToBytes32(response.FinalizedStateRootHash32S)
}
}
}
}

// RequestLatestHead broadcasts out a request for all
// the latest chain heads from the node's peers.
// RequestLatestHead sends a request for all
// the latest chain head slot and state root to a peer.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to change this comment back? It’s still broadcasting

func (q *Querier) RequestLatestHead() {
request := &pb.ChainHeadRequest{}
q.p2p.Broadcast(context.Background(), request)
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestQuerier_ChainReqResponse(t *testing.T) {
}()

response := &pb.ChainHeadResponse{
CanonicalSlot: 0,
CanonicalSlot: 1,
CanonicalStateRootHash32: []byte{'a', 'b'},
}

Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (ss *Service) run() {

// Sets the highest observed slot from querier.
ss.InitialSync.InitializeObservedSlot(ss.Querier.currentHeadSlot)
ss.InitialSync.InitializeBestPeer(ss.Querier.bestPeer)
ss.InitialSync.InitializeObservedStateRoot(bytesutil.ToBytes32(ss.Querier.currentStateRoot))
// Sets the state root of the highest observed slot.
ss.InitialSync.InitializeFinalizedStateRoot(ss.Querier.currentFinalizedStateRoot)
Expand Down
Loading