Skip to content

Commit

Permalink
more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
JekaMas committed May 17, 2022
1 parent bae9c44 commit 3dd7f59
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 72 deletions.
17 changes: 11 additions & 6 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,11 @@ func (d *Downloader) LegacySync(id string, head common.Hash, td, ttd *big.Int, m
case nil, errBusy, errCanceled:
return err
}
if errors.Is(err, whitelist.ErrCheckpointMismatch) {
// TODO: what better can be done here?
log.Warn("Mismatch in last checkpointed block", "peer", id, "err", err)
}

if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) ||
errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) ||
errors.Is(err, whitelist.ErrCheckpointMismatch) {
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
Expand All @@ -359,10 +357,17 @@ func (d *Downloader) LegacySync(id string, head common.Hash, td, ttd *big.Int, m
}
return err
}

if errors.Is(err, ErrMergeTransition) {
return err // This is an expected fault, don't keep printing it in a spin-loop
}
log.Warn("Synchronisation failed, retrying", "err", err)

if errors.Is(err, whitelist.ErrNoRemoteCheckoint) {
log.Warn("Doesn't have remote checkpoint yet", "peer", id, "err", err)
}

log.Warn("Synchronisation failed, retrying", "peer", id, "err", err)

return err
}

Expand Down
173 changes: 141 additions & 32 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func newTester() *downloadTester {
return tester
}

func (dl *downloadTester) setWhitelist(w ChainValidator) {
dl.downloader.ChainValidator = w
}

// terminate aborts any operations on the embedded downloader and releases all
// held resources.
func (dl *downloadTester) terminate() {
Expand Down Expand Up @@ -158,7 +162,7 @@ func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
}

func unmarshalRlpHeaders(rlpdata []rlp.RawValue) []*types.Header {
var headers = make([]*types.Header, len(rlpdata))
headers := make([]*types.Header, len(rlpdata))
for i, data := range rlpdata {
var h types.Header
if err := rlp.DecodeBytes(data, &h); err != nil {
Expand Down Expand Up @@ -620,9 +624,11 @@ func testBoundedForkedSync(t *testing.T, protocol uint, mode SyncMode) {
func TestBoundedHeavyForkedSync66Full(t *testing.T) {
testBoundedHeavyForkedSync(t, eth.ETH66, FullSync)
}

func TestBoundedHeavyForkedSync66Snap(t *testing.T) {
testBoundedHeavyForkedSync(t, eth.ETH66, SnapSync)
}

func TestBoundedHeavyForkedSync66Light(t *testing.T) {
testBoundedHeavyForkedSync(t, eth.ETH66, LightSync)
}
Expand Down Expand Up @@ -714,7 +720,7 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) {

// Create peers of every type
tester.newPeer("peer 66", eth.ETH66, chain.blocks[1:])
//tester.newPeer("peer 65", eth.ETH67, chain.blocks[1:)
// tester.newPeer("peer 65", eth.ETH67, chain.blocks[1:)

// Synchronise with the requested peer and make sure all blocks were retrieved
if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil {
Expand Down Expand Up @@ -916,9 +922,11 @@ func testInvalidHeaderRollback(t *testing.T, protocol uint, mode SyncMode) {
func TestHighTDStarvationAttack66Full(t *testing.T) {
testHighTDStarvationAttack(t, eth.ETH66, FullSync)
}

func TestHighTDStarvationAttack66Snap(t *testing.T) {
testHighTDStarvationAttack(t, eth.ETH66, SnapSync)
}

func TestHighTDStarvationAttack66Light(t *testing.T) {
testHighTDStarvationAttack(t, eth.ETH66, LightSync)
}
Expand Down Expand Up @@ -1271,36 +1279,45 @@ func TestRemoteHeaderRequestSpan(t *testing.T) {
expected []int
}{
// Remote is way higher. We should ask for the remote head and go backwards
{1500, 1000,
{
1500, 1000,
[]int{1323, 1339, 1355, 1371, 1387, 1403, 1419, 1435, 1451, 1467, 1483, 1499},
},
{15000, 13006,
{
15000, 13006,
[]int{14823, 14839, 14855, 14871, 14887, 14903, 14919, 14935, 14951, 14967, 14983, 14999},
},
// Remote is pretty close to us. We don't have to fetch as many
{1200, 1150,
{
1200, 1150,
[]int{1149, 1154, 1159, 1164, 1169, 1174, 1179, 1184, 1189, 1194, 1199},
},
// Remote is equal to us (so on a fork with higher td)
// We should get the closest couple of ancestors
{1500, 1500,
{
1500, 1500,
[]int{1497, 1499},
},
// We're higher than the remote! Odd
{1000, 1500,
{
1000, 1500,
[]int{997, 999},
},
// Check some weird edgecases that it behaves somewhat rationally
{0, 1500,
{
0, 1500,
[]int{0, 2},
},
{6000000, 0,
{
6000000, 0,
[]int{5999823, 5999839, 5999855, 5999871, 5999887, 5999903, 5999919, 5999935, 5999951, 5999967, 5999983, 5999999},
},
{0, 0,
{
0, 0,
[]int{0, 2},
},
}

reqs := func(from, count, span int) []int {
var r []int
num := from
Expand All @@ -1310,32 +1327,38 @@ func TestRemoteHeaderRequestSpan(t *testing.T) {
}
return r
}

for i, tt := range testCases {
from, count, span, max := calculateRequestSpan(tt.remoteHeight, tt.localHeight)
data := reqs(int(from), count, span)
i := i
tt := tt

if max != uint64(data[len(data)-1]) {
t.Errorf("test %d: wrong last value %d != %d", i, data[len(data)-1], max)
}
failed := false
if len(data) != len(tt.expected) {
failed = true
t.Errorf("test %d: length wrong, expected %d got %d", i, len(tt.expected), len(data))
} else {
for j, n := range data {
if n != tt.expected[j] {
failed = true
break
t.Run("", func(t *testing.T) {
from, count, span, max := calculateRequestSpan(tt.remoteHeight, tt.localHeight)
data := reqs(int(from), count, span)

if max != uint64(data[len(data)-1]) {
t.Errorf("test %d: wrong last value %d != %d", i, data[len(data)-1], max)
}
failed := false
if len(data) != len(tt.expected) {
failed = true
t.Errorf("test %d: length wrong, expected %d got %d", i, len(tt.expected), len(data))
} else {
for j, n := range data {
if n != tt.expected[j] {
failed = true
break
}
}
}
}
if failed {
res := strings.Replace(fmt.Sprint(data), " ", ",", -1)
exp := strings.Replace(fmt.Sprint(tt.expected), " ", ",", -1)
t.Logf("got: %v\n", res)
t.Logf("exp: %v\n", exp)
t.Errorf("test %d: wrong values", i)
}
if failed {
res := strings.Replace(fmt.Sprint(data), " ", ",", -1)
exp := strings.Replace(fmt.Sprint(tt.expected), " ", ",", -1)
t.Logf("got: %v\n", res)
t.Logf("exp: %v\n", exp)
t.Errorf("test %d: wrong values", i)
}
})
}
}

Expand Down Expand Up @@ -1371,3 +1394,89 @@ func testCheckpointEnforcement(t *testing.T, protocol uint, mode SyncMode) {
assertOwnChain(t, tester, len(chain.blocks))
}
}

type whitelistFake struct {
err error
res bool
}

func newWhitelistFake(res bool, err error) *whitelistFake {
return &whitelistFake{err, res}
}

func (w *whitelistFake) IsValidChain(remoteHeader *types.Header, fetchHeadersByNumber func(number uint64, amount int, skip int, reverse bool) ([]*types.Header, []common.Hash, error)) (bool, error) {
return w.res, w.err
}

func (w *whitelistFake) ProcessCheckpoint(endBlockNum uint64, endBlockHash common.Hash) {}

func TestFakedSyncProgress66Whitelist(t *testing.T) {
protocol := uint(eth.ETH66)
mode := FullSync
tester := newTester()
defer tester.terminate()

chain := testChainBase.shorten(blockCacheMaxItems - 15)

// Set a sync init hook to catch progress changes
starting := make(chan struct{})
progress := make(chan struct{})
tester.downloader.syncInitHook = func(_, _ uint64) {
starting <- struct{}{}
<-progress
}
checkProgress(t, tester.downloader, "pristine", ethereum.SyncProgress{})

// Create and sync with an attacker that promises a higher chain than available.
attacker := tester.newPeer("attack", protocol, chain.blocks[1:])
numMissing := 5
for i := len(chain.blocks) - 2; i > len(chain.blocks)-numMissing; i-- {
attacker.withholdHeaders[chain.blocks[i].Hash()] = struct{}{}
}
attacker.dl.setWhitelist(newWhitelistFake(false, whitelist.ErrCheckpointMismatch))

pending := new(sync.WaitGroup)
pending.Add(1)
go func() {
defer pending.Done()
if err := tester.sync("attack", nil, mode); err == nil {
panic("succeeded attacker synchronisation")
}
}()
<-starting

checkProgress(t, tester.downloader, "initial", ethereum.SyncProgress{
HighestBlock: uint64(len(chain.blocks) - 1),
})
progress <- struct{}{}
pending.Wait()

afterFailedSync := tester.downloader.Progress()

// Synchronise with a good peer and check that the progress height has been reduced to
// the true value.
validChain := chain.shorten(len(chain.blocks) - numMissing)
tester.newPeer("valid", protocol, validChain.blocks[1:])
pending.Add(1)

go func() {
defer pending.Done()
if err := tester.sync("valid", nil, mode); err != nil {
panic(fmt.Sprintf("failed to synchronise blocks: %v", err))
}
}()
<-starting

checkProgress(t, tester.downloader, "completing", ethereum.SyncProgress{
CurrentBlock: afterFailedSync.CurrentBlock,
HighestBlock: uint64(len(validChain.blocks) - 1),
})
// Check final progress after successful sync.
progress <- struct{}{}
pending.Wait()

checkProgress(t, tester.downloader, "final", ethereum.SyncProgress{
CurrentBlock: uint64(len(validChain.blocks) - 1),
HighestBlock: uint64(len(validChain.blocks) - 1),
})
}
39 changes: 22 additions & 17 deletions eth/downloader/whitelist/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@ package whitelist

import (
"errors"
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

var (
ErrCheckpointMismatch = errors.New("checkpoint mismatch")
)

// Checkpoint whitelist
type Service struct {
m sync.RWMutex
Expand All @@ -29,14 +26,19 @@ func NewService(maxCapacity uint) *Service {
}
}

var (
ErrCheckpointMismatch = errors.New("checkpoint mismatch")
ErrNoRemoteCheckoint = errors.New("remote peer doesn't have a checkoint")
)

// IsValidChain checks if the chain we're about to receive from this peer is valid or not
// in terms of reorgs. We won't reorg beyond the last bor checkpoint submitted to mainchain.
func (w *Service) IsValidChain(remoteHeader *types.Header, fetchHeadersByNumber func(number uint64, amount int, skip int, reverse bool) ([]*types.Header, []common.Hash, error)) (bool, error) {
// We want to validate the chain by comparing the last checkpointed block
// we're storing in `checkpointWhitelist` with the peer's block.

// Check for availaibility of the last checkpointed block.
// This can be also be empty if our heimdall is not responsing
// This can be also be empty if our heimdall is not responding
// or we're running without it.
if len(w.checkpointWhitelist) == 0 {
// worst case, we don't have the checkpoints in memory
Expand All @@ -49,9 +51,11 @@ func (w *Service) IsValidChain(remoteHeader *types.Header, fetchHeadersByNumber

// todo: we can extract this as an interface and mock as well or just test IsValidChain in isolation from downloader passing fake fetchHeadersByNumber functions
headers, hashes, err := fetchHeadersByNumber(lastCheckpointBlockNum, 1, 0, false)
if err != nil || len(headers) == 0 {
// TODO: what better can be done here?
return true, nil
if err != nil {
return false, fmt.Errorf("%w: last checkpoint %d, err %v", ErrNoRemoteCheckoint, lastCheckpointBlockNum, err)
}
if len(headers) == 0 {
return true, fmt.Errorf("%w: last checkpoint %d", ErrNoRemoteCheckoint, lastCheckpointBlockNum)
}

reqBlockNum := headers[0].Number.Uint64()
Expand All @@ -69,24 +73,24 @@ func (w *Service) ProcessCheckpoint(endBlockNum uint64, endBlockHash common.Hash
w.m.Lock()
defer w.m.Unlock()

w.EnqueueCheckpointWhitelist(endBlockNum, endBlockHash)
w.enqueueCheckpointWhitelist(endBlockNum, endBlockHash)
// If size of checkpoint whitelist map is greater than 10, remove the oldest entry.

if len(w.GetCheckpointWhitelist()) > int(w.maxCapacity) {
w.DequeueCheckpointWhitelist()
if w.length() > int(w.maxCapacity) {
w.dequeueCheckpointWhitelist()
}
}

// PurgeWhitelistMap purges data from checkpoint whitelist map
func (w *Service) PurgeWhitelistMap() error {
func (w *Service) purgeWhitelistMap() error {
for k := range w.checkpointWhitelist {
delete(w.checkpointWhitelist, k)
}
return nil
}

// EnqueueWhitelistBlock enqueues blockNumber, blockHash to the checkpoint whitelist map
func (w *Service) EnqueueCheckpointWhitelist(key uint64, val common.Hash) {
func (w *Service) enqueueCheckpointWhitelist(key uint64, val common.Hash) {
if _, ok := w.checkpointWhitelist[key]; !ok {
log.Debug("Enqueing new checkpoint whitelist", "block number", key, "block hash", val)

Expand All @@ -96,15 +100,16 @@ func (w *Service) EnqueueCheckpointWhitelist(key uint64, val common.Hash) {
}

// DequeueWhitelistBlock dequeues block, blockhash from the checkpoint whitelist map
func (w *Service) DequeueCheckpointWhitelist() {
func (w *Service) dequeueCheckpointWhitelist() {
if len(w.checkpointOrder) > 0 {
log.Debug("Dequeing checkpoint whitelist", "block number", w.checkpointOrder[0], "block hash", w.checkpointWhitelist[w.checkpointOrder[0]])

delete(w.checkpointWhitelist, w.checkpointOrder[0])
w.checkpointOrder = w.checkpointOrder[1:]
}
}

// GetCheckpointWhitelist returns the checkpoints whitelisted.
func (w *Service) GetCheckpointWhitelist() map[uint64]common.Hash {
return w.checkpointWhitelist
// length returns the len of the whitelist.
func (w *Service) length() int {
return len(w.checkpointWhitelist)
}
Loading

0 comments on commit 3dd7f59

Please sign in to comment.