Skip to content

Commit

Permalink
[syncer] Forgetting non-Dogechain protocol peer (#271)
Browse files Browse the repository at this point in the history
# Description

Currently, peers will discover and connect to non-Dogechain peers, which
is frustrating and extremely draining of connection resources.

The PR fixes it by forgetting (disconnect and remove it from store) the
peer which is not supporting Dogechain `syncer` protocol.

Also, we print out the node ID that is currently syncing in
`eth_syncing` endpoint and update its progress when the node status is
updated.

# Changes include

- [x] Bugfix (non-breaking change that solves an issue)
- [x] New feature (non-breaking change that adds functionality)

## Testing

- [x] I have tested this code with the official test suite
- [x] I have tested this code manually

### Manual tests

* Start a new MainNet node from Genesis block.
* Observe its peers protocol list.
* `dogechain peers list | grep "=" | awk '{print $3}' | while read p; do
dogechain peers status --peer-id $p; done`
* Observe its log.
* Observe its syncing status.
* `curl -X POST http://localhost:8545 -d '{"id": 0, "jsonrpc":"2.0",
"method": "eth_syncing", "params": []}'`

PR branch result:
* The connected nodes all support the Dogechain `syncer` protocol.
* There are some `forget peer` in the log.
* Return correct and upgraded progression.

Base branch result:
* Due to unfiltered protocol reply discovery, connected peers contain
some non-Dogechain peers.
* No and never "forget" any peer in the log.
* Print out unobtrusive and sometimes erroneous sync status.

* The connected peers contain some non-Dogechain peers due to unfiltered
protocol reply discovery.
* No and never 'forget' any peer in the log.
* Progress sometimes go wrong.

# Documentation update

Should update official documentation once the version bumped.
  • Loading branch information
DarianShawn authored Dec 12, 2022
1 parent feb1600 commit b192cda
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 19 deletions.
2 changes: 1 addition & 1 deletion archive/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func importBlocks(chain blockchainInterface, blockStream *blockStream, progressi
}

// Create a blockchain subscription for the sync progression and start tracking
progression.StartProgression(firstBlock.Number(), chain.SubscribeEvents())
progression.StartProgression("", firstBlock.Number(), chain.SubscribeEvents())
// Stop monitoring the sync progression upon exit
defer progression.StopProgression()

Expand Down
13 changes: 13 additions & 0 deletions helper/progress/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Progression struct {
// SyncType is indicating the sync method
SyncType ChainSyncType

// SyncingPeer is current syncing peer id
SyncingPeer string

// StartingBlock is the initial block that the node is starting
// the sync from. It is reset after every sync batch
StartingBlock uint64
Expand Down Expand Up @@ -54,15 +57,25 @@ func NewProgressionWrapper(syncType ChainSyncType) *ProgressionWrapper {

// startProgression initializes the progression tracking
func (pw *ProgressionWrapper) StartProgression(
syncingPeer string,
startingBlock uint64,
subscription blockchain.Subscription,
) {
pw.lock.Lock()
defer pw.lock.Unlock()

// set current block
var current uint64

if startingBlock > 0 {
current = startingBlock - 1
}

pw.progression = &Progression{
SyncType: pw.syncType,
SyncingPeer: syncingPeer,
StartingBlock: startingBlock,
CurrentBlock: current,
}

go pw.RunUpdateLoop(subscription)
Expand Down
1 change: 1 addition & 0 deletions jsonrpc/eth_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (e *Eth) Syncing() (interface{}, error) {
// Node is bulk syncing, return the status
return progression{
Type: string(syncProgression.SyncType),
SyncingPeer: syncProgression.SyncingPeer,
StartingBlock: hex.EncodeUint64(syncProgression.StartingBlock),
CurrentBlock: hex.EncodeUint64(syncProgression.CurrentBlock),
HighestBlock: hex.EncodeUint64(syncProgression.HighestBlock),
Expand Down
1 change: 1 addition & 0 deletions jsonrpc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ type txnArgs struct {

type progression struct {
Type string `json:"type"`
SyncingPeer string `json:"syncingPeer"`
StartingBlock string `json:"startingBlock"`
CurrentBlock string `json:"currentBlock"`
HighestBlock string `json:"highestBlock"`
Expand Down
36 changes: 31 additions & 5 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,40 @@ func (s *Server) updateBootnodeConnCount(peerID peer.ID, delta int64) {
s.bootnodes.increaseBootnodeConnCount(delta)
}

// ForgetPeer disconnects, remove and forget peer to prevent broadcast discovery to other peers
//
// Cauction: take care of using this to ignore peer from store, which may break peer discovery
func (s *Server) ForgetPeer(peer peer.ID, reason string) {
s.logger.Warn("forget peer", "id", peer, "reason", reason)

s.DisconnectFromPeer(peer, reason)
s.removePeer(peer)
s.forgetPeer(peer)
}

func (s *Server) forgetPeer(peer peer.ID) {
p := s.GetPeerInfo(peer)
if p == nil || len(p.Addrs) == 0 { // already removed?
s.logger.Info("peer already removed from store", "id", peer)

return
}

s.logger.Info("remove peer from store", "id", peer)

s.RemoveFromPeerStore(p)
}

// DisconnectFromPeer disconnects the networking server from the specified peer
func (s *Server) DisconnectFromPeer(peer peer.ID, reason string) {
if s.host.Network().Connectedness(peer) == network.Connected {
s.logger.Info(fmt.Sprintf("Closing connection to peer [%s] for reason [%s]", peer.String(), reason))
if !s.IsConnected(peer) {
return
}

if closeErr := s.host.Network().ClosePeer(peer); closeErr != nil {
s.logger.Error(fmt.Sprintf("Unable to gracefully close peer connection, %v", closeErr))
}
s.logger.Info("closing connection to peer", "id", peer, "reason", reason)

if closeErr := s.host.Network().ClosePeer(peer); closeErr != nil {
s.logger.Error("unable to gracefully close peer connection", "err", closeErr)
}
}

Expand Down
2 changes: 2 additions & 0 deletions protocol/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ func (client *syncPeerClient) newSyncPeerClient(peerID peer.ID) (proto.V1Client,
// create new connection
conn, err = client.network.NewProtoConnection(_syncerV1, peerID)
if err != nil {
client.network.ForgetPeer(peerID, "not support syncer v1 protocol")

return nil, fmt.Errorf("failed to open a stream, err %w", err)
}

Expand Down
60 changes: 60 additions & 0 deletions protocol/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,3 +520,63 @@ func Test_syncPeerClient_GetBlocks(t *testing.T) {

assert.Equal(t, expected, syncedBlocks)
}

// setupIncompatibleGRPCServer setups an incompatible protocol GRPC server
func (s *syncPeerService) setupIncompatibleGRPCServer() {
s.stream = grpc.NewGrpcStream()

proto.RegisterV1Server(s.stream.GrpcServer(), s)
s.stream.Serve()
s.network.RegisterProtocol("/fake-syncer/1.0", s.stream)
}

func createNonSyncerService(t *testing.T, chain Blockchain) (*syncPeerService, *network.Server) {
t.Helper()

srv := newTestNetwork(t)

service := &syncPeerService{
blockchain: chain,
network: srv,
}

service.setupIncompatibleGRPCServer()

return service, srv
}

func Test_newSyncPeerClient_forgetNonProtocolPeer(t *testing.T) {
t.Parallel()

clientSrv := newTestNetwork(t)
client := newTestSyncPeerClient(clientSrv, nil)

_, peerSrv := createNonSyncerService(t, &mockBlockchain{
headerHandler: newSimpleHeaderHandler(10),
})
srvID := peerSrv.AddrInfo().ID

t.Cleanup(func() {
client.CloseStream(srvID)
client.Close()
peerSrv.Close()
})

err := network.JoinAndWait(
clientSrv,
peerSrv,
network.DefaultBufferTimeout,
network.DefaultJoinTimeout,
)

assert.NoError(t, err)

_, err = client.GetPeerStatus(srvID)
assert.Error(t, err)

// client should disconnect with peer do not support syncer protocol
assert.False(t, client.network.IsConnected(srvID))
// client should be forget
peers := client.network.Peers()
assert.Equal(t, 0, len(peers))
}
4 changes: 3 additions & 1 deletion protocol/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ type Network interface {
SaveProtocolStream(protocol string, stream *rawGrpc.ClientConn, peerID peer.ID)
// CloseProtocolStream closes stream
CloseProtocolStream(protocol string, peerID peer.ID) error
// ForgetPeer disconnects, remove and forget peer to prevent broadcast discovery to other peers
ForgetPeer(peer peer.ID, reason string)
}

type Progression interface {
// StartProgression starts progression
StartProgression(startingBlock uint64, subscription blockchain.Subscription)
StartProgression(syncingPeer string, startingBlock uint64, subscription blockchain.Subscription)
// UpdateHighestProgression updates highest block number
UpdateHighestProgression(highestBlock uint64)
// GetProgression returns Progression
Expand Down
6 changes: 6 additions & 0 deletions protocol/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ func NewPeerMap(peers []*NoForkPeer) *PeerMap {
return peerMap
}

func (m *PeerMap) Exists(peerID peer.ID) bool {
_, exists := m.Load(peerID.String())

return exists
}

func (m *PeerMap) Put(peers ...*NoForkPeer) {
for _, peer := range peers {
m.Store(peer.ID.String(), peer)
Expand Down
34 changes: 23 additions & 11 deletions protocol/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ type noForkSyncer struct {

// Channel to notify Sync that a new status arrived
newStatusCh chan struct{}
// syncing state
syncing *atomic.Bool
syncingPeer string

// stop chan
stopCh chan struct{}
Expand Down Expand Up @@ -309,8 +311,11 @@ func (s *noForkSyncer) syncWithSkipList(
return
}

// set up a peer to receive its status updates for progress updates
s.syncingPeer = bestPeer.ID.String()

// use subscription for updating progression
s.syncProgression.StartProgression(localLatest, s.blockchain.SubscribeEvents())
s.syncProgression.StartProgression(s.syncingPeer, localLatest, s.blockchain.SubscribeEvents())
s.syncProgression.UpdateHighestProgression(bestPeer.Number)

// fetch block from the peer
Expand Down Expand Up @@ -490,29 +495,36 @@ func (s *noForkSyncer) startPeerConnectionEventProcess() {

// initNewPeerStatus fetches status of the peer and put to peer map
func (s *noForkSyncer) initNewPeerStatus(peerID peer.ID) {
s.logger.Info("peer connected", "id", peerID)

status, err := s.syncPeerClient.GetPeerStatus(peerID)
if err != nil {
s.logger.Warn("failed to get peer status, skip", "id", peerID, "err", err)

return
status = &NoForkPeer{
ID: peerID,
}
}

// update its status
s.putToPeerMap(status)
}

// putToPeerMap puts given status to peer map
func (s *noForkSyncer) putToPeerMap(status *NoForkPeer) {
// update progression if OK
if p := s.syncProgression; p != nil && status != nil {
p.UpdateHighestProgression(status.Number)
if status == nil {
// it should not be
return
}

if status != nil {
if _, exists := s.peerMap.Load(status.ID); !exists {
s.logger.Info("new connected peer", "id", status.ID, "number", status.Number)
} else {
s.logger.Debug("connected peer update status", "id", status.ID, "number", status.Number)
}
if !s.peerMap.Exists(status.ID) {
s.logger.Info("new connected peer", "id", status.ID, "number", status.Number)
}

// update progression if needed
if status.ID.String() == s.syncingPeer && status.Number > 0 {
s.logger.Debug("connected peer update status", "id", status.ID, "number", status.Number)
s.syncProgression.UpdateHighestProgression(status.Number)
}

s.peerMap.Put(status)
Expand Down
2 changes: 1 addition & 1 deletion protocol/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type mockProgression struct {
highestBlock uint64
}

func (m *mockProgression) StartProgression(startingBlock uint64, subscription blockchain.Subscription) {
func (m *mockProgression) StartProgression(syncingPeer string, startingBlock uint64, subscription blockchain.Subscription) {
m.startingBlock = startingBlock
}

Expand Down

0 comments on commit b192cda

Please sign in to comment.