Skip to content

Commit

Permalink
Catchup: NetworkFetcher service to export fetchBlock (#4388)
Browse files Browse the repository at this point in the history
  • Loading branch information
algoganesh authored Aug 18, 2022
1 parent e53605d commit a29e35a
Show file tree
Hide file tree
Showing 4 changed files with 341 additions and 17 deletions.
134 changes: 134 additions & 0 deletions catchup/networkFetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (C) 2019-2022 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package catchup

import (
"context"
"errors"
"fmt"
"time"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network"
)

// NetworkFetcher is the struct used to export fetchBlock function from universalFetcher
type NetworkFetcher struct {
log logging.Logger
cfg config.Local
auth BlockAuthenticator
peerSelector *peerSelector
fetcher *universalBlockFetcher
}

// MakeNetworkFetcher initializes a NetworkFetcher service
func MakeNetworkFetcher(log logging.Logger, net network.GossipNode, cfg config.Local, auth BlockAuthenticator, pipelineFetch bool) *NetworkFetcher {
netFetcher := &NetworkFetcher{
log: log,
cfg: cfg,
auth: auth,
peerSelector: createPeerSelector(net, cfg, pipelineFetch),
fetcher: makeUniversalBlockFetcher(log, net, cfg),
}
return netFetcher
}

func (netFetcher *NetworkFetcher) getHTTPPeer() (network.HTTPPeer, *peerSelectorPeer, error) {
for retryCount := 0; retryCount < netFetcher.cfg.CatchupBlockDownloadRetryAttempts; retryCount++ {
psp, err := netFetcher.peerSelector.getNextPeer()
if err != nil {
if err != errPeerSelectorNoPeerPoolsAvailable {
err = fmt.Errorf("FetchBlock: unable to obtain a list of peers to download the block from : %w", err)
return nil, nil, err
}
// this is a possible on startup, since the network package might have yet to retrieve the list of peers.
netFetcher.log.Infof("FetchBlock: unable to obtain a list of peers to download the block from; will retry shortly.")
time.Sleep(noPeersAvailableSleepInterval)
continue
}
peer := psp.Peer
httpPeer, ok := peer.(network.HTTPPeer)
if ok {
return httpPeer, psp, nil
}
netFetcher.log.Warnf("FetchBlock: non-HTTP peer was provided by the peer selector")
netFetcher.peerSelector.rankPeer(psp, peerRankInvalidDownload)
}
return nil, nil, errors.New("FetchBlock: recurring non-HTTP peer was provided by the peer selector")
}

// FetchBlock function given a round number returns a block from a http peer
func (netFetcher *NetworkFetcher) FetchBlock(ctx context.Context, round basics.Round) (*bookkeeping.Block,
*agreement.Certificate, time.Duration, error) {
// internal retry attempt to fetch the block
for retryCount := 0; retryCount < netFetcher.cfg.CatchupBlockDownloadRetryAttempts; retryCount++ {
httpPeer, psp, err := netFetcher.getHTTPPeer()
if err != nil {
return nil, nil, time.Duration(0), err
}

blk, cert, downloadDuration, err := netFetcher.fetcher.fetchBlock(ctx, round, httpPeer)
if err != nil {
if ctx.Err() != nil {
// caller of the function decided to cancel the download
return nil, nil, time.Duration(0), err
}
netFetcher.log.Infof("FetchBlock: failed to download block %d on attempt %d out of %d. %v",
round, retryCount+1, netFetcher.cfg.CatchupBlockDownloadRetryAttempts, err)
netFetcher.peerSelector.rankPeer(psp, peerRankDownloadFailed)
continue // retry the fetch
}

// Check that the block's contents match the block header
if !blk.ContentsMatchHeader() && blk.Round() > 0 {
netFetcher.peerSelector.rankPeer(psp, peerRankInvalidDownload)
// Check if this mismatch is due to an unsupported protocol version
if _, ok := config.Consensus[blk.BlockHeader.CurrentProtocol]; !ok {
netFetcher.log.Errorf("FetchBlock: downloaded block(%v) unsupported protocol version detected: '%v'",
round, blk.BlockHeader.CurrentProtocol)
}
netFetcher.log.Warnf("FetchBlock: downloaded block(%v) contents do not match header", round)
netFetcher.log.Infof("FetchBlock: failed to download block %d on attempt %d out of %d. %v",
round, retryCount+1, netFetcher.cfg.CatchupBlockDownloadRetryAttempts, err)
continue // retry the fetch
}

// Authenticate the block. for correct execution, caller should call FetchBlock only when the lookback block is available
if netFetcher.cfg.CatchupVerifyCertificate() {
err = netFetcher.auth.Authenticate(blk, cert)
if err != nil {
netFetcher.log.Warnf("FetchBlock: cert authenticatation failed for block %d on attempt %d out of %d. %v",
round, retryCount+1, netFetcher.cfg.CatchupBlockDownloadRetryAttempts, err)
netFetcher.peerSelector.rankPeer(psp, peerRankInvalidDownload)
continue // retry the fetch
}
}

// upon successful download rank the peer according to the download speed
peerRank := netFetcher.peerSelector.peerDownloadDurationToRank(psp, downloadDuration)
netFetcher.peerSelector.rankPeer(psp, peerRank)
return blk, cert, downloadDuration, err

}
err := fmt.Errorf("FetchBlock failed after multiple blocks download attempts: %v unsuccessful attempts",
netFetcher.cfg.CatchupBlockDownloadRetryAttempts)
return nil, nil, time.Duration(0), err
}
190 changes: 190 additions & 0 deletions catchup/networkFetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright (C) 2019-2022 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package catchup

import (
"context"
"sync"
"testing"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/stretchr/testify/require"
)

func TestFetchBlock(t *testing.T) {
partitiontest.PartitionTest(t)

ledger, next, b, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}

blockServiceConfig := config.GetDefaultLocal()
blockServiceConfig.EnableBlockService = true
blockServiceConfig.EnableBlockServiceFallbackToArchiver = false

net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID")

node := basicRPCNode{}
node.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
node.start()
defer node.stop()
rootURL := node.rootURL()

net.addPeer(rootURL)

// Disable block authentication
cfg := config.GetDefaultLocal()
cfg.CatchupBlockValidateMode = 1
fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, nil, false)

block, _, duration, err := fetcher.FetchBlock(context.Background(), next)

require.NoError(t, err)
require.Equal(t, &b, block)
require.GreaterOrEqual(t, int64(duration), int64(0))

block, cert, duration, err := fetcher.FetchBlock(context.Background(), next+1)

require.Error(t, errNoBlockForRound, err)
require.Contains(t, err.Error(), "FetchBlock failed after multiple blocks download attempts")
require.Nil(t, block)
require.Nil(t, cert)
require.Equal(t, int64(duration), int64(0))
}

func TestConcurrentAttemptsToFetchBlockSuccess(t *testing.T) {
partitiontest.PartitionTest(t)

ledger, next, b, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}

blockServiceConfig := config.GetDefaultLocal()
blockServiceConfig.EnableBlockService = true
blockServiceConfig.EnableBlockServiceFallbackToArchiver = false

net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID")

node := basicRPCNode{}
node.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
node.start()
defer node.stop()
rootURL := node.rootURL()

net.addPeer(rootURL)

// Disable block authentication
cfg := config.GetDefaultLocal()
cfg.CatchupBlockValidateMode = 1
fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, nil, false)

// start is used to synchronize concurrent fetchBlock attempts
// parallelRequests represents number of concurrent attempts
start := make(chan struct{})
parallelRequests := int(cfg.CatchupParallelBlocks)
var wg sync.WaitGroup
wg.Add(parallelRequests)
for i := 0; i < parallelRequests; i++ {
go func() {
<-start
block, _, duration, err := fetcher.FetchBlock(context.Background(), next)
require.NoError(t, err)
require.Equal(t, &b, block)
require.GreaterOrEqual(t, int64(duration), int64(0))
wg.Done()
}()
}
close(start)
wg.Wait()
}

func TestHTTPPeerNotAvailable(t *testing.T) {
partitiontest.PartitionTest(t)

net := &httpTestPeerSource{}

// Disable block authentication
cfg := config.GetDefaultLocal()
cfg.CatchupBlockValidateMode = 1
cfg.CatchupBlockDownloadRetryAttempts = 1

fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, nil, false)

_, _, _, err := fetcher.FetchBlock(context.Background(), 1)
require.Contains(t, err.Error(), "recurring non-HTTP peer was provided by the peer selector")
}

func TestFetchBlockFailed(t *testing.T) {
partitiontest.PartitionTest(t)

net := &httpTestPeerSource{}
wsPeer := makeTestUnicastPeer(net, t)
net.addPeer(wsPeer.GetAddress())

// Disable block authentication
cfg := config.GetDefaultLocal()
cfg.CatchupBlockValidateMode = 1
cfg.CatchupBlockDownloadRetryAttempts = 1

fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, nil, false)

_, _, _, err := fetcher.FetchBlock(context.Background(), 1)
require.Contains(t, err.Error(), "FetchBlock failed after multiple blocks download attempts")
}

func TestFetchBlockAuthenticationFailed(t *testing.T) {
partitiontest.PartitionTest(t)

ledger, next, _, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}

blockServiceConfig := config.GetDefaultLocal()
blockServiceConfig.EnableBlockService = true
blockServiceConfig.EnableBlockServiceFallbackToArchiver = false

net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID")

node := basicRPCNode{}
node.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
node.start()
defer node.stop()
rootURL := node.rootURL()

net.addPeer(rootURL)

cfg := config.GetDefaultLocal()
cfg.CatchupBlockDownloadRetryAttempts = 1

fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, &mockedAuthenticator{errorRound: int(next)}, false)

_, _, _, err = fetcher.FetchBlock(context.Background(), next)
require.Contains(t, err.Error(), "FetchBlock failed after multiple blocks download attempts")
}
18 changes: 9 additions & 9 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
close(completed)
}()

peerSelector := s.createPeerSelector(true)
peerSelector := createPeerSelector(s.net, s.cfg, true)

if _, err := peerSelector.getNextPeer(); err == errPeerSelectorNoPeerPoolsAvailable {
s.log.Debugf("pipelinedFetch: was unable to obtain a peer to retrieve the block from")
Expand Down Expand Up @@ -653,7 +653,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
}

blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest
peerSelector := s.createPeerSelector(false)
peerSelector := createPeerSelector(s.net, s.cfg, false)
for s.ledger.LastRound() < cert.Round {
psp, getPeerErr := peerSelector.getNextPeer()
if getPeerErr != nil {
Expand Down Expand Up @@ -755,11 +755,11 @@ func (s *Service) handleUnsupportedRound(nextUnsupportedRound basics.Round) {
}
}

func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
func createPeerSelector(net network.GossipNode, cfg config.Local, pipelineFetch bool) *peerSelector {
var peerClasses []peerClass
if s.cfg.EnableCatchupFromArchiveServers {
if cfg.EnableCatchupFromArchiveServers {
if pipelineFetch {
if s.cfg.NetAddress != "" { // Relay node
if cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivers},
Expand All @@ -774,7 +774,7 @@ func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
}
}
} else {
if s.cfg.NetAddress != "" { // Relay node
if cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedIn},
Expand All @@ -791,7 +791,7 @@ func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
}
} else {
if pipelineFetch {
if s.cfg.NetAddress != "" { // Relay node
if cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays},
Expand All @@ -804,7 +804,7 @@ func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
}
}
} else {
if s.cfg.NetAddress != "" { // Relay node
if cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedIn},
Expand All @@ -818,5 +818,5 @@ func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
}
}
}
return makePeerSelector(s.net, peerClasses)
return makePeerSelector(net, peerClasses)
}
Loading

0 comments on commit a29e35a

Please sign in to comment.