Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catchup: NetworkFetcher service to export fetchBlock #4388

Merged
merged 14 commits into from
Aug 18, 2022
Merged
112 changes: 112 additions & 0 deletions catchup/networkFetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (C) 2019-2022 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package catchup

import (
"context"
"errors"
"fmt"
"time"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network"
)

// NetworkFetcher is the struct used to export fetchBlock function from universalFetcher
type NetworkFetcher struct {
log logging.Logger
net network.GossipNode
cfg config.Local
peerSelector *peerSelector
}

// MakeNetworkFetcher initializes a NetworkFetcher service
func MakeNetworkFetcher(log logging.Logger, net network.GossipNode, cfg config.Local, pipelineFetch bool) *NetworkFetcher {
netFetcher := &NetworkFetcher{
log: log,
net: net,
cfg: cfg,
peerSelector: createPeerSelector(net, cfg, pipelineFetch),
}
return netFetcher
}

func (netFetcher *NetworkFetcher) getHTTPPeer(count int) (network.HTTPPeer, *peerSelectorPeer, int, error) {
for retryCount := count; retryCount < netFetcher.cfg.CatchupBlockDownloadRetryAttempts; retryCount++ {
psp, err := netFetcher.peerSelector.getNextPeer()
if err != nil {
if err != errPeerSelectorNoPeerPoolsAvailable {
err = fmt.Errorf("FetchBlock: unable to obtain a list of peers to download the block from : %w", err)
return nil, nil, retryCount, err
}
// this is a possible on startup, since the network package might have yet to retrieve the list of peers.
netFetcher.log.Infof("FetchBlock: unable to obtain a list of peers to download the block from; will retry shortly.")
time.Sleep(noPeersAvailableSleepInterval)
} else {
peer := psp.Peer
httpPeer, ok := peer.(network.HTTPPeer)
if ok {
return httpPeer, psp, retryCount, nil
}
netFetcher.log.Warnf("FetchBlock: non-HTTP peer was provided by the peer selector")
netFetcher.peerSelector.rankPeer(psp, peerRankInvalidDownload)
}
}
return nil, nil, netFetcher.cfg.CatchupBlockDownloadRetryAttempts, errors.New("FetchBlock: recurring non-HTTP peer was provided by the peer selector")
}

// FetchBlock function given a round number returns a block from a http peer
func (netFetcher *NetworkFetcher) FetchBlock(ctx context.Context, round basics.Round) (*bookkeeping.Block,
*agreement.Certificate, time.Duration, error) {
// internal retry attempt to fetch the block
for retryCount := 0; retryCount < netFetcher.cfg.CatchupBlockDownloadRetryAttempts; retryCount++ {
// keep retrying until a valid http peer is selected by the peerSelector
httpPeer, psp, retryCount, err := netFetcher.getHTTPPeer(retryCount)
algonautshant marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, nil, time.Duration(0), err
}
fetcher := makeUniversalBlockFetcher(netFetcher.log, netFetcher.net, netFetcher.cfg)
blk, cert, downloadDuration, err := fetcher.fetchBlock(ctx, round, httpPeer)
if err != nil {
if ctx.Err() != nil {
// caller of the function decided to cancel the download
return nil, nil, time.Duration(0), err
}
netFetcher.log.Infof("FetchBlock: failed to download block %d on attempt %d out of %d. %v", round, retryCount, netFetcher.cfg.CatchupBlockDownloadRetryAttempts, err)
netFetcher.peerSelector.rankPeer(psp, peerRankDownloadFailed)
} else if !blk.ContentsMatchHeader() && blk.Round() > 0 {
netFetcher.peerSelector.rankPeer(psp, peerRankInvalidDownload)
// Check if this mismatch is due to an unsupported protocol version
if _, ok := config.Consensus[blk.BlockHeader.CurrentProtocol]; !ok {
netFetcher.log.Errorf("FetchBlock: downloaded block(%v) unsupported protocol version detected: '%v'", round, blk.BlockHeader.CurrentProtocol)
}
netFetcher.log.Warnf("FetchBlock: downloaded block(%v) contents do not match header", round)
netFetcher.log.Infof("FetchBlock: failed to download block %d on attempt %d out of %d. %v", round, retryCount, netFetcher.cfg.CatchupBlockDownloadRetryAttempts, err)
} else {
// upon successful download rank the peer according to the download speed
peerRank := netFetcher.peerSelector.peerDownloadDurationToRank(psp, downloadDuration)
netFetcher.peerSelector.rankPeer(psp, peerRank)
return blk, cert, downloadDuration, err
}
}
err := fmt.Errorf("FetchBlock failed after multiple blocks download attempts: %v unsuccessful attempts", netFetcher.cfg.CatchupBlockDownloadRetryAttempts)
return nil, nil, time.Duration(0), err
}
77 changes: 77 additions & 0 deletions catchup/networkFetcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (C) 2019-2022 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package catchup

import (
"context"
"testing"
"time"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/stretchr/testify/require"
)

func TestFetchBlock(t *testing.T) {
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()

ledger, next, b, err := buildTestLedger(t, bookkeeping.Block{})
if err != nil {
t.Fatal(err)
return
}

blockServiceConfig := config.GetDefaultLocal()
blockServiceConfig.EnableBlockService = true
blockServiceConfig.EnableBlockServiceFallbackToArchiver = false

net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()

net.addPeer(rootURL)

fetcher := MakeNetworkFetcher(logging.TestingLog(t), net, cfg, false)

var block *bookkeeping.Block
var cert *agreement.Certificate
var duration time.Duration
block, _, duration, err = fetcher.FetchBlock(context.Background(), next)

require.NoError(t, err)
require.Equal(t, &b, block)
require.GreaterOrEqual(t, int64(duration), int64(0))

block, _, duration, err = fetcher.FetchBlock(context.Background(), next+1)

require.Error(t, errNoBlockForRound, err)
require.Contains(t, err.Error(), "FetchBlock failed after multiple blocks download attempts")
require.Nil(t, block)
require.Nil(t, cert)
require.Equal(t, int64(duration), int64(0))
}
18 changes: 9 additions & 9 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
close(completed)
}()

peerSelector := s.createPeerSelector(true)
peerSelector := createPeerSelector(s.net, s.cfg, true)

if _, err := peerSelector.getNextPeer(); err == errPeerSelectorNoPeerPoolsAvailable {
s.log.Debugf("pipelinedFetch: was unable to obtain a peer to retrieve the block from")
Expand Down Expand Up @@ -653,7 +653,7 @@ func (s *Service) fetchRound(cert agreement.Certificate, verifier *agreement.Asy
}

blockHash := bookkeeping.BlockHash(cert.Proposal.BlockDigest) // semantic digest (i.e., hash of the block header), not byte-for-byte digest
peerSelector := s.createPeerSelector(false)
peerSelector := createPeerSelector(s.net, s.cfg, false)
for s.ledger.LastRound() < cert.Round {
psp, getPeerErr := peerSelector.getNextPeer()
if getPeerErr != nil {
Expand Down Expand Up @@ -755,11 +755,11 @@ func (s *Service) handleUnsupportedRound(nextUnsupportedRound basics.Round) {
}
}

func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
func createPeerSelector(net network.GossipNode, cfg config.Local, pipelineFetch bool) *peerSelector {
var peerClasses []peerClass
if s.cfg.EnableCatchupFromArchiveServers {
if cfg.EnableCatchupFromArchiveServers {
if pipelineFetch {
if s.cfg.NetAddress != "" { // Relay node
if cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookArchivers},
Expand All @@ -774,7 +774,7 @@ func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
}
}
} else {
if s.cfg.NetAddress != "" { // Relay node
if cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedIn},
Expand All @@ -791,7 +791,7 @@ func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
}
} else {
if pipelineFetch {
if s.cfg.NetAddress != "" { // Relay node
if cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersPhonebookRelays},
Expand All @@ -804,7 +804,7 @@ func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
}
}
} else {
if s.cfg.NetAddress != "" { // Relay node
if cfg.NetAddress != "" { // Relay node
peerClasses = []peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersConnectedOut},
{initialRank: peerRankInitialSecondPriority, peerClass: network.PeersConnectedIn},
Expand All @@ -818,5 +818,5 @@ func (s *Service) createPeerSelector(pipelineFetch bool) *peerSelector {
}
}
}
return makePeerSelector(s.net, peerClasses)
return makePeerSelector(net, peerClasses)
}
16 changes: 8 additions & 8 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ func TestCreatePeerSelector(t *testing.T) {

cfg.NetAddress = "someAddress"
s := MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
ps := s.createPeerSelector(true)
ps := createPeerSelector(s.net, s.cfg, true)
require.Equal(t, 4, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank)
Expand All @@ -850,7 +850,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = true
cfg.NetAddress = ""
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
ps = s.createPeerSelector(true)
ps = createPeerSelector(s.net, s.cfg, true)
require.Equal(t, 3, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
require.Equal(t, peerRankInitialSecondPriority, ps.peerClasses[1].initialRank)
Expand All @@ -864,7 +864,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = true
cfg.NetAddress = "someAddress"
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
ps = s.createPeerSelector(false)
ps = createPeerSelector(s.net, s.cfg, false)

require.Equal(t, 4, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
Expand All @@ -881,7 +881,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = true
cfg.NetAddress = ""
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
ps = s.createPeerSelector(false)
ps = createPeerSelector(s.net, s.cfg, false)

require.Equal(t, 3, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
Expand All @@ -896,7 +896,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = false
cfg.NetAddress = "someAddress"
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
ps = s.createPeerSelector(true)
ps = createPeerSelector(s.net, s.cfg, true)

require.Equal(t, 3, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
Expand All @@ -911,7 +911,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = false
cfg.NetAddress = ""
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
ps = s.createPeerSelector(true)
ps = createPeerSelector(s.net, s.cfg, true)

require.Equal(t, 2, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
Expand All @@ -924,7 +924,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = false
cfg.NetAddress = "someAddress"
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
ps = s.createPeerSelector(false)
ps = createPeerSelector(s.net, s.cfg, false)

require.Equal(t, 3, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
Expand All @@ -939,7 +939,7 @@ func TestCreatePeerSelector(t *testing.T) {
cfg.EnableCatchupFromArchiveServers = false
cfg.NetAddress = ""
s = MakeService(logging.Base(), cfg, &httpTestPeerSource{}, new(mockedLedger), &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil)
ps = s.createPeerSelector(false)
ps = createPeerSelector(s.net, s.cfg, false)

require.Equal(t, 2, len(ps.peerClasses))
require.Equal(t, peerRankInitialFirstPriority, ps.peerClasses[0].initialRank)
Expand Down