Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[syncer] Forgetting non-Dogechain protocol peer #271

Merged
merged 11 commits into from
Dec 12, 2022
2 changes: 1 addition & 1 deletion archive/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func importBlocks(chain blockchainInterface, blockStream *blockStream, progressi
}

// Create a blockchain subscription for the sync progression and start tracking
progression.StartProgression(firstBlock.Number(), chain.SubscribeEvents())
progression.StartProgression("", firstBlock.Number(), chain.SubscribeEvents())
// Stop monitoring the sync progression upon exit
defer progression.StopProgression()

Expand Down
13 changes: 13 additions & 0 deletions helper/progress/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Progression struct {
// SyncType is indicating the sync method
SyncType ChainSyncType

// SyncingPeer is current syncing peer id
SyncingPeer string

// StartingBlock is the initial block that the node is starting
// the sync from. It is reset after every sync batch
StartingBlock uint64
Expand Down Expand Up @@ -54,15 +57,25 @@ func NewProgressionWrapper(syncType ChainSyncType) *ProgressionWrapper {

// startProgression initializes the progression tracking
func (pw *ProgressionWrapper) StartProgression(
syncingPeer string,
startingBlock uint64,
subscription blockchain.Subscription,
) {
pw.lock.Lock()
defer pw.lock.Unlock()

// set current block
var current uint64

if startingBlock > 0 {
current = startingBlock - 1
}

pw.progression = &Progression{
SyncType: pw.syncType,
SyncingPeer: syncingPeer,
StartingBlock: startingBlock,
CurrentBlock: current,
}

go pw.RunUpdateLoop(subscription)
Expand Down
1 change: 1 addition & 0 deletions jsonrpc/eth_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (e *Eth) Syncing() (interface{}, error) {
// Node is bulk syncing, return the status
return progression{
Type: string(syncProgression.SyncType),
SyncingPeer: syncProgression.SyncingPeer,
StartingBlock: hex.EncodeUint64(syncProgression.StartingBlock),
CurrentBlock: hex.EncodeUint64(syncProgression.CurrentBlock),
HighestBlock: hex.EncodeUint64(syncProgression.HighestBlock),
Expand Down
1 change: 1 addition & 0 deletions jsonrpc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ type txnArgs struct {

type progression struct {
Type string `json:"type"`
SyncingPeer string `json:"syncingPeer"`
StartingBlock string `json:"startingBlock"`
CurrentBlock string `json:"currentBlock"`
HighestBlock string `json:"highestBlock"`
Expand Down
36 changes: 31 additions & 5 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,40 @@ func (s *Server) updateBootnodeConnCount(peerID peer.ID, delta int64) {
s.bootnodes.increaseBootnodeConnCount(delta)
}

// ForgetPeer disconnects, remove and forget peer to prevent broadcast discovery to other peers
//
// Cauction: take care of using this to ignore peer from store, which may break peer discovery
func (s *Server) ForgetPeer(peer peer.ID, reason string) {
s.logger.Warn("forget peer", "id", peer, "reason", reason)

s.DisconnectFromPeer(peer, reason)
s.removePeer(peer)
s.forgetPeer(peer)
}

func (s *Server) forgetPeer(peer peer.ID) {
p := s.GetPeerInfo(peer)
if p == nil || len(p.Addrs) == 0 { // already removed?
s.logger.Info("peer already removed from store", "id", peer)

return
}

s.logger.Info("remove peer from store", "id", peer)

s.RemoveFromPeerStore(p)
}

// DisconnectFromPeer disconnects the networking server from the specified peer
func (s *Server) DisconnectFromPeer(peer peer.ID, reason string) {
if s.host.Network().Connectedness(peer) == network.Connected {
s.logger.Info(fmt.Sprintf("Closing connection to peer [%s] for reason [%s]", peer.String(), reason))
if !s.IsConnected(peer) {
return
}

if closeErr := s.host.Network().ClosePeer(peer); closeErr != nil {
s.logger.Error(fmt.Sprintf("Unable to gracefully close peer connection, %v", closeErr))
}
s.logger.Info("closing connection to peer", "id", peer, "reason", reason)

if closeErr := s.host.Network().ClosePeer(peer); closeErr != nil {
s.logger.Error("unable to gracefully close peer connection", "err", closeErr)
}
}

Expand Down
2 changes: 2 additions & 0 deletions protocol/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ func (client *syncPeerClient) newSyncPeerClient(peerID peer.ID) (proto.V1Client,
// create new connection
conn, err = client.network.NewProtoConnection(_syncerV1, peerID)
if err != nil {
client.network.ForgetPeer(peerID, "not support syncer v1 protocol")

return nil, fmt.Errorf("failed to open a stream, err %w", err)
}

Expand Down
60 changes: 60 additions & 0 deletions protocol/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,3 +520,63 @@ func Test_syncPeerClient_GetBlocks(t *testing.T) {

assert.Equal(t, expected, syncedBlocks)
}

// setupIncompatibleGRPCServer setups an incompatible protocol GRPC server
func (s *syncPeerService) setupIncompatibleGRPCServer() {
s.stream = grpc.NewGrpcStream()

proto.RegisterV1Server(s.stream.GrpcServer(), s)
s.stream.Serve()
s.network.RegisterProtocol("/fake-syncer/1.0", s.stream)
}

func createNonSyncerService(t *testing.T, chain Blockchain) (*syncPeerService, *network.Server) {
t.Helper()

srv := newTestNetwork(t)

service := &syncPeerService{
blockchain: chain,
network: srv,
}

service.setupIncompatibleGRPCServer()

return service, srv
}

func Test_newSyncPeerClient_forgetNonProtocolPeer(t *testing.T) {
t.Parallel()

clientSrv := newTestNetwork(t)
client := newTestSyncPeerClient(clientSrv, nil)

_, peerSrv := createNonSyncerService(t, &mockBlockchain{
headerHandler: newSimpleHeaderHandler(10),
})
srvID := peerSrv.AddrInfo().ID

t.Cleanup(func() {
client.CloseStream(srvID)
client.Close()
peerSrv.Close()
})

err := network.JoinAndWait(
clientSrv,
peerSrv,
network.DefaultBufferTimeout,
network.DefaultJoinTimeout,
)

assert.NoError(t, err)

_, err = client.GetPeerStatus(srvID)
assert.Error(t, err)

// client should disconnect with peer do not support syncer protocol
assert.False(t, client.network.IsConnected(srvID))
// client should be forget
peers := client.network.Peers()
assert.Equal(t, 0, len(peers))
}
4 changes: 3 additions & 1 deletion protocol/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ type Network interface {
SaveProtocolStream(protocol string, stream *rawGrpc.ClientConn, peerID peer.ID)
// CloseProtocolStream closes stream
CloseProtocolStream(protocol string, peerID peer.ID) error
// ForgetPeer disconnects, remove and forget peer to prevent broadcast discovery to other peers
ForgetPeer(peer peer.ID, reason string)
}

type Progression interface {
// StartProgression starts progression
StartProgression(startingBlock uint64, subscription blockchain.Subscription)
StartProgression(syncingPeer string, startingBlock uint64, subscription blockchain.Subscription)
// UpdateHighestProgression updates highest block number
UpdateHighestProgression(highestBlock uint64)
// GetProgression returns Progression
Expand Down
6 changes: 6 additions & 0 deletions protocol/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ func NewPeerMap(peers []*NoForkPeer) *PeerMap {
return peerMap
}

func (m *PeerMap) Exists(peerID peer.ID) bool {
_, exists := m.Load(peerID.String())

return exists
}

func (m *PeerMap) Put(peers ...*NoForkPeer) {
for _, peer := range peers {
m.Store(peer.ID.String(), peer)
Expand Down
34 changes: 23 additions & 11 deletions protocol/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ type noForkSyncer struct {

// Channel to notify Sync that a new status arrived
newStatusCh chan struct{}
// syncing state
syncing *atomic.Bool
syncingPeer string

// stop chan
stopCh chan struct{}
Expand Down Expand Up @@ -309,8 +311,11 @@ func (s *noForkSyncer) syncWithSkipList(
return
}

// set up a peer to receive its status updates for progress updates
s.syncingPeer = bestPeer.ID.String()

// use subscription for updating progression
s.syncProgression.StartProgression(localLatest, s.blockchain.SubscribeEvents())
s.syncProgression.StartProgression(s.syncingPeer, localLatest, s.blockchain.SubscribeEvents())
s.syncProgression.UpdateHighestProgression(bestPeer.Number)

// fetch block from the peer
Expand Down Expand Up @@ -490,29 +495,36 @@ func (s *noForkSyncer) startPeerConnectionEventProcess() {

// initNewPeerStatus fetches status of the peer and put to peer map
func (s *noForkSyncer) initNewPeerStatus(peerID peer.ID) {
s.logger.Info("peer connected", "id", peerID)

status, err := s.syncPeerClient.GetPeerStatus(peerID)
if err != nil {
s.logger.Warn("failed to get peer status, skip", "id", peerID, "err", err)

return
status = &NoForkPeer{
ID: peerID,
}
}

// update its status
s.putToPeerMap(status)
}

// putToPeerMap puts given status to peer map
func (s *noForkSyncer) putToPeerMap(status *NoForkPeer) {
// update progression if OK
if p := s.syncProgression; p != nil && status != nil {
p.UpdateHighestProgression(status.Number)
if status == nil {
// it should not be
return
}

if status != nil {
if _, exists := s.peerMap.Load(status.ID); !exists {
s.logger.Info("new connected peer", "id", status.ID, "number", status.Number)
} else {
s.logger.Debug("connected peer update status", "id", status.ID, "number", status.Number)
}
if !s.peerMap.Exists(status.ID) {
s.logger.Info("new connected peer", "id", status.ID, "number", status.Number)
}

// update progression if needed
if status.ID.String() == s.syncingPeer && status.Number > 0 {
s.logger.Debug("connected peer update status", "id", status.ID, "number", status.Number)
s.syncProgression.UpdateHighestProgression(status.Number)
}

s.peerMap.Put(status)
Expand Down
2 changes: 1 addition & 1 deletion protocol/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type mockProgression struct {
highestBlock uint64
}

func (m *mockProgression) StartProgression(startingBlock uint64, subscription blockchain.Subscription) {
func (m *mockProgression) StartProgression(syncingPeer string, startingBlock uint64, subscription blockchain.Subscription) {
m.startingBlock = startingBlock
}

Expand Down