Skip to content

Commit

Permalink
Merge branch 'fixAttestationNoVerify' of https://github.com/prysmatic…
Browse files Browse the repository at this point in the history
…labs/geth-sharding into fixAttestationNoVerify
  • Loading branch information
nisdas committed Aug 13, 2020
2 parents 5cc4dbf + 50ebedc commit 98f67d4
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 145 deletions.
13 changes: 7 additions & 6 deletions beacon-chain/p2p/peers/score_block_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ import (

const (
// DefaultBlockProviderProcessedBatchWeight is a default reward weight of a processed batch of blocks.
DefaultBlockProviderProcessedBatchWeight = float64(0.05)
DefaultBlockProviderProcessedBatchWeight = float64(0.1)
// DefaultBlockProviderProcessedBlocksCap defines default value for processed blocks cap.
// e.g. 20 * 64 := 20 batches of size 64 (with 0.05 per batch reward, 20 batches result in score of 1.0).
DefaultBlockProviderProcessedBlocksCap = uint64(20 * 64)
DefaultBlockProviderProcessedBlocksCap = uint64(10 * 64)
// DefaultBlockProviderDecayInterval defines how often the decaying routine is called.
DefaultBlockProviderDecayInterval = 30 * time.Second
// DefaultBlockProviderDecay defines default blocks that are to be subtracted from stats on each
// decay interval. Effectively, this param provides minimum expected performance for a peer to remain
// high scorer.
DefaultBlockProviderDecay = uint64(5 * 64)
DefaultBlockProviderDecay = uint64(1 * 64)
// DefaultBlockProviderStalePeerRefreshInterval defines default interval at which peers should be given
// opportunity to provide blocks (their score gets boosted, up until they are selected for
// fetching).
DefaultBlockProviderStalePeerRefreshInterval = 1 * time.Minute
DefaultBlockProviderStalePeerRefreshInterval = 5 * time.Minute
)

// BlockProviderScorer represents block provider scoring service.
Expand Down Expand Up @@ -209,14 +209,15 @@ func (s *BlockProviderScorer) WeightSorted(
nextPID := func(weights map[peer.ID]float64) peer.ID {
totalWeight := 0
for _, w := range weights {
totalWeight += int(w)
// Factor by 100, to allow weights in (0; 1) range.
totalWeight += int(w * 100)
}
if totalWeight <= 0 {
return ""
}
rnd := r.Intn(totalWeight)
for pid, w := range weights {
rnd -= int(w)
rnd -= int(w * 100)
if rnd < 0 {
return pid
}
Expand Down
20 changes: 11 additions & 9 deletions beacon-chain/p2p/peers/score_block_providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ func TestPeerScorer_BlockProvider_Score(t *testing.T) {
{
name: "boost score of stale peer",
update: func(scorer *peers.BlockProviderScorer) {
batchWeight := scorer.Params().ProcessedBatchWeight
scorer.IncrementProcessedBlocks("peer1", batchSize*3)
assert.Equal(t, 0.05*3, scorer.Score("peer1"), "Unexpected score")
assert.Equal(t, roundScore(batchWeight*3), scorer.Score("peer1"), "Unexpected score")
scorer.Touch("peer1", roughtime.Now().Add(-1*scorer.Params().StalePeerRefreshInterval))
},
check: func(scorer *peers.BlockProviderScorer) {
Expand Down Expand Up @@ -85,23 +86,26 @@ func TestPeerScorer_BlockProvider_Score(t *testing.T) {
scorer.IncrementProcessedBlocks("peer1", batchSize)
},
check: func(scorer *peers.BlockProviderScorer) {
assert.Equal(t, roundScore(0.05), scorer.Score("peer1"), "Unexpected score")
batchWeight := scorer.Params().ProcessedBatchWeight
assert.Equal(t, roundScore(batchWeight), scorer.Score("peer1"), "Unexpected score")
},
},
{
name: "multiple batches",
update: func(scorer *peers.BlockProviderScorer) {
scorer.IncrementProcessedBlocks("peer1", batchSize*13)
scorer.IncrementProcessedBlocks("peer1", batchSize*7)
},
check: func(scorer *peers.BlockProviderScorer) {
assert.Equal(t, roundScore(0.05*13), scorer.Score("peer1"), "Unexpected score")
batchWeight := scorer.Params().ProcessedBatchWeight
assert.Equal(t, roundScore(batchWeight*7), scorer.Score("peer1"), "Unexpected score")
},
},
{
name: "maximum score cap",
update: func(scorer *peers.BlockProviderScorer) {
batchWeight := scorer.Params().ProcessedBatchWeight
scorer.IncrementProcessedBlocks("peer1", batchSize*2)
assert.Equal(t, roundScore(0.05*2), scorer.Score("peer1"), "Unexpected score")
assert.Equal(t, roundScore(batchWeight*2), scorer.Score("peer1"), "Unexpected score")
scorer.IncrementProcessedBlocks("peer1", scorer.Params().ProcessedBlocksCap)
},
check: func(scorer *peers.BlockProviderScorer) {
Expand All @@ -116,9 +120,7 @@ func TestPeerScorer_BlockProvider_Score(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
PeerLimit: 30,
ScorerParams: &peers.PeerScorerConfig{
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.05,
},
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{},
},
})
scorer := peerStatuses.Scorers().BlockProviderScorer()
Expand Down Expand Up @@ -150,7 +152,7 @@ func TestPeerScorer_BlockProvider_WeightSorted(t *testing.T) {
peerStatuses := peers.NewStatus(ctx, &peers.StatusConfig{
ScorerParams: &peers.PeerScorerConfig{
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
ProcessedBatchWeight: 1,
ProcessedBatchWeight: 0.01,
},
},
})
Expand Down
27 changes: 14 additions & 13 deletions beacon-chain/p2p/peers/scorer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) {
Threshold: 5,
},
BlockProviderScorerConfig: &peers.BlockProviderScorerConfig{
ProcessedBatchWeight: 0.05,
Decay: 64,
Decay: 64,
},
},
})
Expand Down Expand Up @@ -156,54 +155,56 @@ func TestPeerScorer_PeerScorerManager_Score(t *testing.T) {
s, pids := setupScorer()
s1 := s.BlockProviderScorer()
zeroScore := s.BlockProviderScorer().MaxScore()
batchWeight := s1.Params().ProcessedBatchWeight

// Partial batch.
s1.IncrementProcessedBlocks("peer1", batchSize/4)
assert.Equal(t, 0.0, s.Score("peer1"), "Unexpected %q score", "peer1")

// Single batch.
s1.IncrementProcessedBlocks("peer1", batchSize)
assert.DeepEqual(t, pack(s, 0.05, zeroScore, zeroScore), peerScores(s, pids), "Unexpected scores")
assert.DeepEqual(t, pack(s, batchWeight, zeroScore, zeroScore), peerScores(s, pids), "Unexpected scores")

// Multiple batches.
s1.IncrementProcessedBlocks("peer2", batchSize*4)
assert.DeepEqual(t, pack(s, 0.05, 0.05*4, zeroScore), peerScores(s, pids), "Unexpected scores")
assert.DeepEqual(t, pack(s, batchWeight, batchWeight*4, zeroScore), peerScores(s, pids), "Unexpected scores")

// Partial batch.
s1.IncrementProcessedBlocks("peer3", batchSize/2)
assert.DeepEqual(t, pack(s, 0.05, 0.05*4, 0), peerScores(s, pids), "Unexpected scores")
assert.DeepEqual(t, pack(s, batchWeight, batchWeight*4, 0), peerScores(s, pids), "Unexpected scores")

// See effect of decaying.
assert.Equal(t, batchSize+batchSize/4, s1.ProcessedBlocks("peer1"))
assert.Equal(t, batchSize*4, s1.ProcessedBlocks("peer2"))
assert.Equal(t, batchSize/2, s1.ProcessedBlocks("peer3"))
assert.DeepEqual(t, pack(s, 0.05, 0.05*4, 0), peerScores(s, pids), "Unexpected scores")
assert.DeepEqual(t, pack(s, batchWeight, batchWeight*4, 0), peerScores(s, pids), "Unexpected scores")
s1.Decay()
assert.Equal(t, batchSize/4, s1.ProcessedBlocks("peer1"))
assert.Equal(t, batchSize*3, s1.ProcessedBlocks("peer2"))
assert.Equal(t, uint64(0), s1.ProcessedBlocks("peer3"))
assert.DeepEqual(t, pack(s, 0, 0.05*3, 0), peerScores(s, pids), "Unexpected scores")
assert.DeepEqual(t, pack(s, 0, batchWeight*3, 0), peerScores(s, pids), "Unexpected scores")
})

t.Run("overall score", func(t *testing.T) {
// Full score, no penalty.
s, _ := setupScorer()
s1 := s.BlockProviderScorer()
s2 := s.BadResponsesScorer()
batchWeight := s1.Params().ProcessedBatchWeight

s1.IncrementProcessedBlocks("peer1", batchSize*10)
assert.Equal(t, roundScore(0.05*10), s1.Score("peer1"))
s1.IncrementProcessedBlocks("peer1", batchSize*5)
assert.Equal(t, roundScore(batchWeight*5), s1.Score("peer1"))
// Now, adjust score by introducing penalty for bad responses.
s2.Increment("peer1")
s2.Increment("peer1")
assert.Equal(t, -0.4, s2.Score("peer1"), "Unexpected bad responses score")
assert.Equal(t, roundScore(0.05*10), s1.Score("peer1"), "Unexpected block provider score")
assert.Equal(t, roundScore(0.05*10-0.4), s.Score("peer1"), "Unexpected overall score")
assert.Equal(t, roundScore(batchWeight*5), s1.Score("peer1"), "Unexpected block provider score")
assert.Equal(t, roundScore(batchWeight*5-0.4), s.Score("peer1"), "Unexpected overall score")
// If peer continues to misbehave, score becomes negative.
s2.Increment("peer1")
assert.Equal(t, -0.6, s2.Score("peer1"), "Unexpected bad responses score")
assert.Equal(t, roundScore(0.05*10), s1.Score("peer1"), "Unexpected block provider score")
assert.Equal(t, -0.1, s.Score("peer1"), "Unexpected overall score")
assert.Equal(t, roundScore(batchWeight*5), s1.Score("peer1"), "Unexpected block provider score")
assert.Equal(t, roundScore(batchWeight*5-0.6), s.Score("peer1"), "Unexpected overall score")
})
}

Expand Down
16 changes: 7 additions & 9 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type fetchRequestParams struct {
// fetchRequestResponse is a combined type to hold results of both successful executions and errors.
// Valid usage pattern will be to check whether result's `err` is nil, before using `blocks`.
type fetchRequestResponse struct {
pid peer.ID
start, count uint64
blocks []*eth.SignedBeaconBlock
err error
Expand Down Expand Up @@ -245,7 +246,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start, count uint64)
return response
}

response.blocks, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
response.blocks, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
return response
}

Expand All @@ -254,18 +255,15 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
ctx context.Context,
start, count uint64,
peers []peer.ID,
) ([]*eth.SignedBeaconBlock, error) {
) ([]*eth.SignedBeaconBlock, peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlocksFromPeer")
defer span.End()

blocks := []*eth.SignedBeaconBlock{}
var blocks []*eth.SignedBeaconBlock
var err error
peers, err = f.filterPeers(peers, peersPercentagePerRequest)
if err != nil {
return blocks, err
}
if len(peers) == 0 {
return blocks, errNoPeersAvailable
return blocks, "", err
}
req := &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: start,
Expand All @@ -274,10 +272,10 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
}
for i := 0; i < len(peers); i++ {
if blocks, err = f.requestBlocks(ctx, req, peers[i]); err == nil {
return blocks, err
return blocks, peers[i], err
}
}
return blocks, nil
return blocks, "", errNoPeersAvailable
}

// requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams.
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,20 +469,20 @@ func TestBlocksFetcher_requestBeaconBlocksByRange(t *testing.T) {
p2p: p2p,
})

_, peers := p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(mc.HeadSlot()))
_, peerIDs := p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, helpers.SlotToEpoch(mc.HeadSlot()))
req := &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: 1,
Step: 1,
Count: blockBatchLimit,
}
blocks, err := fetcher.requestBlocks(ctx, req, peers[0])
blocks, err := fetcher.requestBlocks(ctx, req, peerIDs[0])
assert.NoError(t, err)
assert.Equal(t, blockBatchLimit, uint64(len(blocks)), "Incorrect number of blocks returned")

// Test context cancellation.
ctx, cancel = context.WithCancel(context.Background())
cancel()
blocks, err = fetcher.requestBlocks(ctx, req, peers[0])
blocks, err = fetcher.requestBlocks(ctx, req, peerIDs[0])
assert.ErrorContains(t, "context canceled", err)
}

Expand Down Expand Up @@ -583,15 +583,15 @@ func TestBlocksFetcher_nonSkippedSlotAfter(t *testing.T) {
blocks = append(blocks, makeSequence(51200, 51264)...)
blocks = append(blocks, 55000)
blocks = append(blocks, makeSequence(57000, 57256)...)
var peers []*peerData
var peersData []*peerData
for i := 0; i < size; i++ {
peers = append(peers, &peerData{
peersData = append(peersData, &peerData{
blocks: blocks,
finalizedEpoch: 1800,
headSlot: 57000,
})
}
return peers
return peersData
}
chainConfig := struct {
peers []*peerData
Expand Down
22 changes: 17 additions & 5 deletions beacon-chain/sync/initial-sync/blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"time"

"github.com/libp2p/go-libp2p-core/peer"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
Expand Down Expand Up @@ -50,8 +51,14 @@ type blocksQueue struct {
blocksFetcher *blocksFetcher
headFetcher blockchain.HeadFetcher
highestExpectedSlot uint64
fetchedBlocks chan []*eth.SignedBeaconBlock // output channel for ready blocks
quit chan struct{} // termination notifier
fetchedData chan *blocksQueueFetchedData // output channel for ready blocks
quit chan struct{} // termination notifier
}

// blocksQueueFetchedData is a data container that is returned from a queue on each step.
type blocksQueueFetchedData struct {
pid peer.ID
blocks []*eth.SignedBeaconBlock
}

// newBlocksQueue creates initialized priority queue.
Expand All @@ -76,7 +83,7 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
highestExpectedSlot: highestExpectedSlot,
blocksFetcher: blocksFetcher,
headFetcher: cfg.headFetcher,
fetchedBlocks: make(chan []*eth.SignedBeaconBlock, 1),
fetchedData: make(chan *blocksQueueFetchedData, 1),
quit: make(chan struct{}),
}

Expand Down Expand Up @@ -119,7 +126,7 @@ func (q *blocksQueue) loop() {

defer func() {
q.blocksFetcher.stop()
close(q.fetchedBlocks)
close(q.fetchedData)
}()

if err := q.blocksFetcher.start(); err != nil {
Expand Down Expand Up @@ -246,6 +253,7 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn {
}
return m.state, response.err
}
m.pid = response.pid
m.blocks = response.blocks
return stateDataParsed, nil
}
Expand All @@ -266,10 +274,14 @@ func (q *blocksQueue) onReadyToSendEvent(ctx context.Context) eventHandlerFn {
}

send := func() (stateID, error) {
data := &blocksQueueFetchedData{
pid: m.pid,
blocks: m.blocks,
}
select {
case <-ctx.Done():
return m.state, ctx.Err()
case q.fetchedBlocks <- m.blocks:
case q.fetchedData <- data:
}
return stateSent, nil
}
Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/sync/initial-sync/blocks_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func TestBlocksQueueInitStartStop(t *testing.T) {
// Blocks up until all resources are reclaimed (or timeout is called)
assert.NoError(t, queue.stop())
select {
case <-queue.fetchedBlocks:
case <-queue.fetchedData:
default:
t.Error("queue.fetchedBlocks channel is leaked")
t.Error("queue.fetchedData channel is leaked")
}
select {
case <-fetcher.fetchResponses:
Expand Down Expand Up @@ -255,8 +255,8 @@ func TestBlocksQueueLoop(t *testing.T) {
}

var blocks []*eth.SignedBeaconBlock
for fetchedBlocks := range queue.fetchedBlocks {
for _, block := range fetchedBlocks {
for data := range queue.fetchedData {
for _, block := range data.blocks {
if err := processBlock(block); err != nil {
continue
}
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/sync/initial-sync/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sort"
"time"

"github.com/libp2p/go-libp2p-core/peer"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/roughtime"
Expand Down Expand Up @@ -43,6 +44,7 @@ type stateMachine struct {
smm *stateMachineManager
start uint64
state stateID
pid peer.ID
blocks []*eth.SignedBeaconBlock
updated time.Time
}
Expand Down
Loading

0 comments on commit 98f67d4

Please sign in to comment.