diff --git a/catchup/networkFetcher.go b/catchup/networkFetcher.go
new file mode 100644
index 0000000000..d82395e8d3
--- /dev/null
+++ b/catchup/networkFetcher.go
@@ -0,0 +1,134 @@
+// Copyright (C) 2019-2022 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package catchup
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/algorand/go-algorand/agreement"
+ "github.com/algorand/go-algorand/config"
+ "github.com/algorand/go-algorand/data/basics"
+ "github.com/algorand/go-algorand/data/bookkeeping"
+ "github.com/algorand/go-algorand/logging"
+ "github.com/algorand/go-algorand/network"
+)
+
+// NetworkFetcher is the struct used to export fetchBlock function from universalFetcher
+type NetworkFetcher struct {
+ log logging.Logger
+ cfg config.Local
+ auth BlockAuthenticator
+ peerSelector *peerSelector
+ fetcher *universalBlockFetcher
+}
+
+// MakeNetworkFetcher initializes a NetworkFetcher service
+func MakeNetworkFetcher(log logging.Logger, net network.GossipNode, cfg config.Local, auth BlockAuthenticator, pipelineFetch bool) *NetworkFetcher {
+ netFetcher := &NetworkFetcher{
+ log: log,
+ cfg: cfg,
+ auth: auth,
+ peerSelector: createPeerSelector(net, cfg, pipelineFetch),
+ fetcher: makeUniversalBlockFetcher(log, net, cfg),
+ }
+ return netFetcher
+}
+
+func (netFetcher *NetworkFetcher) getHTTPPeer() (network.HTTPPeer, *peerSelectorPeer, error) {
+ for retryCount := 0; retryCount < netFetcher.cfg.CatchupBlockDownloadRetryAttempts; retryCount++ {
+ psp, err := netFetcher.peerSelector.getNextPeer()
+ if err != nil {
+ if err != errPeerSelectorNoPeerPoolsAvailable {
+ err = fmt.Errorf("FetchBlock: unable to obtain a list of peers to download the block from : %w", err)
+ return nil, nil, err
+ }
+ // this is a possible on startup, since the network package might have yet to retrieve the list of peers.
+ netFetcher.log.Infof("FetchBlock: unable to obtain a list of peers to download the block from; will retry shortly.")
+ time.Sleep(noPeersAvailableSleepInterval)
+ continue
+ }
+ peer := psp.Peer
+ httpPeer, ok := peer.(network.HTTPPeer)
+ if ok {
+ return httpPeer, psp, nil
+ }
+ netFetcher.log.Warnf("FetchBlock: non-HTTP peer was provided by the peer selector")
+ netFetcher.peerSelector.rankPeer(psp, peerRankInvalidDownload)
+ }
+ return nil, nil, errors.New("FetchBlock: recurring non-HTTP peer was provided by the peer selector")
+}
+
+// FetchBlock function given a round number returns a block from a http peer
+func (netFetcher *NetworkFetcher) FetchBlock(ctx context.Context, round basics.Round) (*bookkeeping.Block,
+ *agreement.Certificate, time.Duration, error) {
+ // internal retry attempt to fetch the block
+ for retryCount := 0; retryCount < netFetcher.cfg.CatchupBlockDownloadRetryAttempts; retryCount++ {
+ httpPeer, psp, err := netFetcher.getHTTPPeer()
+ if err != nil {
+ return nil, nil, time.Duration(0), err
+ }
+
+ blk, cert, downloadDuration, err := netFetcher.fetcher.fetchBlock(ctx, round, httpPeer)
+ if err != nil {
+ if ctx.Err() != nil {
+ // caller of the function decided to cancel the download
+ return nil, nil, time.Duration(0), err
+ }
+ netFetcher.log.Infof("FetchBlock: failed to download block %d on attempt %d out of %d. %v",
+ round, retryCount+1, netFetcher.cfg.CatchupBlockDownloadRetryAttempts, err)
+ netFetcher.peerSelector.rankPeer(psp, peerRankDownloadFailed)
+ continue // retry the fetch
+ }
+
+ // Check that the block's contents match the block header
+ if !blk.ContentsMatchHeader() && blk.Round() > 0 {
+ netFetcher.peerSelector.rankPeer(psp, peerRankInvalidDownload)
+ // Check if this mismatch is due to an unsupported protocol version
+ if _, ok := config.Consensus[blk.BlockHeader.CurrentProtocol]; !ok {
+ netFetcher.log.Errorf("FetchBlock: downloaded block(%v) unsupported protocol version detected: '%v'",
+ round, blk.BlockHeader.CurrentProtocol)
+ }
+ netFetcher.log.Warnf("FetchBlock: downloaded block(%v) contents do not match header", round)
+ netFetcher.log.Infof("FetchBlock: failed to download block %d on attempt %d out of %d. %v",
+ round, retryCount+1, netFetcher.cfg.CatchupBlockDownloadRetryAttempts, err)
+ continue // retry the fetch
+ }
+
+ // Authenticate the block. for correct execution, caller should call FetchBlock only when the lookback block is available
+ if netFetcher.cfg.CatchupVerifyCertificate() {
+ err = netFetcher.auth.Authenticate(blk, cert)
+ if err != nil {
+ netFetcher.log.Warnf("FetchBlock: cert authenticatation failed for block %d on attempt %d out of %d. %v",
+ round, retryCount+1, netFetcher.cfg.CatchupBlockDownloadRetryAttempts, err)
+ netFetcher.peerSelector.rankPeer(psp, peerRankInvalidDownload)
+ continue // retry the fetch
+ }
+ }
+
+ // upon successful download rank the peer according to the download speed
+ peerRank := netFetcher.peerSelector.peerDownloadDurationToRank(psp, downloadDuration)
+ netFetcher.peerSelector.rankPeer(psp, peerRank)
+ return blk, cert, downloadDuration, err
+
+ }
+ err := fmt.Errorf("FetchBlock failed after multiple blocks download attempts: %v unsuccessful attempts",
+ netFetcher.cfg.CatchupBlockDownloadRetryAttempts)
+ return nil, nil, time.Duration(0), err
+}
diff --git a/catchup/networkFetcher_test.go b/catchup/networkFetcher_test.go
new file mode 100644
index 0000000000..7c6a2c885b
--- /dev/null
+++ b/catchup/networkFetcher_test.go
@@ -0,0 +1,190 @@
+// Copyright (C) 2019-2022 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package catchup
+
+import (
+ "context"
+ "sync"
+ "testing"
+
+ "github.com/algorand/go-algorand/config"
+ "github.com/algorand/go-algorand/data/bookkeeping"
+ "github.com/algorand/go-algorand/logging"
+ "github.com/algorand/go-algorand/rpcs"
+ "github.com/algorand/go-algorand/test/partitiontest"
+ "github.com/stretchr/testify/require"
+)
+
+func TestFetchBlock(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ ledger, next, b, err := buildTestLedger(t, bookkeeping.Block{})
+ if err != nil {
+ t.Fatal(err)
+ return
+ }
+
+ blockServiceConfig := config.GetDefaultLocal()
+ blockServiceConfig.EnableBlockService = true
+ blockServiceConfig.EnableBlockServiceFallbackToArchiver = false
+
+ net := &httpTestPeerSource{}
+ ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID")
+
+ node := basicRPCNode{}
+ node.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
+ node.start()
+ defer node.stop()
+ rootURL := node.rootURL()
+
+ net.addPeer(rootURL)
+
+ // Disable block authentication
+ cfg := config.GetDefaultLocal()
+ cfg.CatchupBlockValidateMode = 1
+ fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, nil, false)
+
+ block, _, duration, err := fetcher.FetchBlock(context.Background(), next)
+
+ require.NoError(t, err)
+ require.Equal(t, &b, block)
+ require.GreaterOrEqual(t, int64(duration), int64(0))
+
+ block, cert, duration, err := fetcher.FetchBlock(context.Background(), next+1)
+
+ require.Error(t, errNoBlockForRound, err)
+ require.Contains(t, err.Error(), "FetchBlock failed after multiple blocks download attempts")
+ require.Nil(t, block)
+ require.Nil(t, cert)
+ require.Equal(t, int64(duration), int64(0))
+}
+
+func TestConcurrentAttemptsToFetchBlockSuccess(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ ledger, next, b, err := buildTestLedger(t, bookkeeping.Block{})
+ if err != nil {
+ t.Fatal(err)
+ return
+ }
+
+ blockServiceConfig := config.GetDefaultLocal()
+ blockServiceConfig.EnableBlockService = true
+ blockServiceConfig.EnableBlockServiceFallbackToArchiver = false
+
+ net := &httpTestPeerSource{}
+ ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID")
+
+ node := basicRPCNode{}
+ node.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
+ node.start()
+ defer node.stop()
+ rootURL := node.rootURL()
+
+ net.addPeer(rootURL)
+
+ // Disable block authentication
+ cfg := config.GetDefaultLocal()
+ cfg.CatchupBlockValidateMode = 1
+ fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, nil, false)
+
+ // start is used to synchronize concurrent fetchBlock attempts
+ // parallelRequests represents number of concurrent attempts
+ start := make(chan struct{})
+ parallelRequests := int(cfg.CatchupParallelBlocks)
+ var wg sync.WaitGroup
+ wg.Add(parallelRequests)
+ for i := 0; i < parallelRequests; i++ {
+ go func() {
+ <-start
+ block, _, duration, err := fetcher.FetchBlock(context.Background(), next)
+ require.NoError(t, err)
+ require.Equal(t, &b, block)
+ require.GreaterOrEqual(t, int64(duration), int64(0))
+ wg.Done()
+ }()
+ }
+ close(start)
+ wg.Wait()
+}
+
+func TestHTTPPeerNotAvailable(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ net := &httpTestPeerSource{}
+
+ // Disable block authentication
+ cfg := config.GetDefaultLocal()
+ cfg.CatchupBlockValidateMode = 1
+ cfg.CatchupBlockDownloadRetryAttempts = 1
+
+ fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, nil, false)
+
+ _, _, _, err := fetcher.FetchBlock(context.Background(), 1)
+ require.Contains(t, err.Error(), "recurring non-HTTP peer was provided by the peer selector")
+}
+
+func TestFetchBlockFailed(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ net := &httpTestPeerSource{}
+ wsPeer := makeTestUnicastPeer(net, t)
+ net.addPeer(wsPeer.GetAddress())
+
+ // Disable block authentication
+ cfg := config.GetDefaultLocal()
+ cfg.CatchupBlockValidateMode = 1
+ cfg.CatchupBlockDownloadRetryAttempts = 1
+
+ fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, nil, false)
+
+ _, _, _, err := fetcher.FetchBlock(context.Background(), 1)
+ require.Contains(t, err.Error(), "FetchBlock failed after multiple blocks download attempts")
+}
+
+func TestFetchBlockAuthenticationFailed(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ ledger, next, _, err := buildTestLedger(t, bookkeeping.Block{})
+ if err != nil {
+ t.Fatal(err)
+ return
+ }
+
+ blockServiceConfig := config.GetDefaultLocal()
+ blockServiceConfig.EnableBlockService = true
+ blockServiceConfig.EnableBlockServiceFallbackToArchiver = false
+
+ net := &httpTestPeerSource{}
+ ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID")
+
+ node := basicRPCNode{}
+ node.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
+ node.start()
+ defer node.stop()
+ rootURL := node.rootURL()
+
+ net.addPeer(rootURL)
+
+ cfg := config.GetDefaultLocal()
+ cfg.CatchupBlockDownloadRetryAttempts = 1
+
+ fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, &mockedAuthenticator{errorRound: int(next)}, false)
+
+ _, _, _, err = fetcher.FetchBlock(context.Background(), next)
+ require.Contains(t, err.Error(), "FetchBlock failed after multiple blocks download attempts")
+}
diff --git a/catchup/service.go b/catchup/service.go
index adc313db64..1ebaf0fd3b 100644
--- a/catchup/service.go
+++ b/catchup/service.go
@@ -425,7 +425,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
close(completed)
}()
- peerSelector := s.createPeerSelector(true)
+ peerSelector := createPeerSelector(s.net, s.cfg, true)
if _, err := peerSelector.getNextPeer(); err == errPeerSelectorNoPeerPoolsAvailable {
s.log.Debugf("pipelinedFetch: was unable to obtain a peer to retrieve the block from")
@@ -653,7 +653,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
}
blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest
- peerSelector := s.createPeerSelector(false)
+ peerSelector := createPeerSelector(s.net, s.cfg, false)
for s.ledger.LastRound() < cert.Round {
psp, getPeerErr := peerSelector.getNextPeer()
if getPeerErr != nil {
@@ -755,11 +755,11 @@ func (s *Service) handleUnsupportedRound(nextUnsupportedRound basics.Round) {
}
}
-func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
+func createPeerSelector(net network.GossipNode, cfg config.Local, pipelineFetch bool) *peerSelector {
var peerClasses []peerClass
- if s.cfg.EnableCatchupFromArchiveServers {
+ if cfg.EnableCatchupFromArchiveServers {
if pipelineFetch {
- if s.cfg.NetAddress != "" { // Relay node
+ if cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivers},
@@ -774,7 +774,7 @@ func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
}
}
} else {
- if s.cfg.NetAddress != "" { // Relay node
+ if cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedIn},
@@ -791,7 +791,7 @@ func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
}
} else {
if pipelineFetch {
- if s.cfg.NetAddress != "" { // Relay node
+ if cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays},
@@ -804,7 +804,7 @@ func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
}
}
} else {
- if s.cfg.NetAddress != "" { // Relay node
+ if cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedIn},
@@ -818,5 +818,5 @@ func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
}
}
}
- return makePeerSelector(s.net, peerClasses)
+ return makePeerSelector(net, peerClasses)
}
diff --git a/catchup/service_test.go b/catchup/service_test.go
index 676a283bad..f364b7a465 100644
--- a/catchup/service_test.go
+++ b/catchup/service_test.go
@@ -834,7 +834,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.NetAddress = "someAddress"
s := MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps := s.createPeerSelector(true)
+ ps := createPeerSelector(s.net, s.cfg, true)
require.Equal(t, 4, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank)
@@ -850,7 +850,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = true
cfg.NetAddress = ""
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps = s.createPeerSelector(true)
+ ps = createPeerSelector(s.net, s.cfg, true)
require.Equal(t, 3, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank)
@@ -864,7 +864,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = true
cfg.NetAddress = "someAddress"
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps = s.createPeerSelector(false)
+ ps = createPeerSelector(s.net, s.cfg, false)
require.Equal(t, 4, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
@@ -881,7 +881,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = true
cfg.NetAddress = ""
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps = s.createPeerSelector(false)
+ ps = createPeerSelector(s.net, s.cfg, false)
require.Equal(t, 3, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
@@ -896,7 +896,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = false
cfg.NetAddress = "someAddress"
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps = s.createPeerSelector(true)
+ ps = createPeerSelector(s.net, s.cfg, true)
require.Equal(t, 3, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
@@ -911,7 +911,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = false
cfg.NetAddress = ""
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps = s.createPeerSelector(true)
+ ps = createPeerSelector(s.net, s.cfg, true)
require.Equal(t, 2, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
@@ -924,7 +924,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = false
cfg.NetAddress = "someAddress"
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps = s.createPeerSelector(false)
+ ps = createPeerSelector(s.net, s.cfg, false)
require.Equal(t, 3, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
@@ -939,7 +939,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = false
cfg.NetAddress = ""
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
- ps = s.createPeerSelector(false)
+ ps = createPeerSelector(s.net, s.cfg, false)
require.Equal(t, 2, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)