Skip to content

Commit

Permalink
Hardens block validation in init-sync (#7168)
Browse files Browse the repository at this point in the history
* assert response invariants in beaconBlocksByRange
* Merge branch 'master' into assert-invariants-in-beaconblocksbyrange
* enforce specs invariant in requestBlocks
* blocks queue: onDataReceivedEvent - part1
* Merge branch 'master' into init-sync-block-validation
* blocks queue: onDataReceivedEvent - part2
* Merge branch 'master' into init-sync-block-validation
* block fetcher: test returned blocks validation
* gazelle
* Nishant's suggestion to not loop more than req.count of times
* combine checks
* enforce slot ordering
* enforce skip increments
  • Loading branch information
farazdagi authored Sep 3, 2020
1 parent 0e6797d commit 7fd2536
Show file tree
Hide file tree
Showing 5 changed files with 422 additions and 13 deletions.
1 change: 1 addition & 0 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ go_test(
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

Expand Down
28 changes: 20 additions & 8 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var (
errFetcherCtxIsDone = errors.New("fetcher's context is done, reinitialize")
errSlotIsTooHigh = errors.New("slot is higher than the finalized slot")
errBlockAlreadyProcessed = errors.New("block is already processed")
errInvalidFetchedData = errors.New("invalid data returned from peer")
)

// blocksFetcherConfig is a config to setup the block fetcher.
Expand Down Expand Up @@ -357,22 +358,33 @@ func (f *blocksFetcher) requestBlocks(
}
}()

resp := make([]*eth.SignedBeaconBlock, 0, req.Count)
blocks := make([]*eth.SignedBeaconBlock, 0, req.Count)
var prevSlot uint64
for i := uint64(0); ; i++ {
isFirstChunk := i == 0
blk, err := prysmsync.ReadChunkedBlock(stream, f.p2p, isFirstChunk)
if err == io.EOF {
break
}
// exit if more than max request blocks are returned
if i >= params.BeaconNetworkConfig().MaxRequestBlocks {
break
}
if err != nil {
return nil, err
}
resp = append(resp, blk)
// The response MUST contain no more than `count` blocks, and no more than
// MAX_REQUEST_BLOCKS blocks.
if i >= req.Count || i >= params.BeaconNetworkConfig().MaxRequestBlocks {
return nil, errInvalidFetchedData
}
// Returned blocks MUST be in the slot range [start_slot, start_slot + count * step).
if blk.Block.Slot < req.StartSlot || blk.Block.Slot >= req.StartSlot+req.Count*req.Step {
return nil, errInvalidFetchedData
}
// Returned blocks, where they exist, MUST be sent in a consecutive order.
// Consecutive blocks MUST have values in `step` increments (slots may be skipped in between).
if !isFirstChunk && (prevSlot >= blk.Block.Slot || (blk.Block.Slot-prevSlot)%req.Step != 0) {
return nil, errInvalidFetchedData
}
prevSlot = blk.Block.Slot
blocks = append(blocks, blk)
}

return resp, nil
return blocks, nil
}
237 changes: 237 additions & 0 deletions beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
p2pm "github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
beaconsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
Expand Down Expand Up @@ -1148,3 +1149,239 @@ func TestBlocksFetcher_removeStalePeerLocks(t *testing.T) {
})
}
}

func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) {
p1 := p2pt.NewTestP2P(t)
tests := []struct {
name string
req *p2ppb.BeaconBlocksByRangeRequest
handlerGenFn func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream)
wantedErr string
validate func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock)
}{
{
name: "no error",
req: &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 4,
Count: 64,
},
handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) {
return func(stream network.Stream) {
for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step; i += req.Step {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = i
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
}
assert.NoError(t, stream.Close())
}
},
validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) {
assert.Equal(t, req.Count, uint64(len(blocks)))
},
},
{
name: "too many blocks",
req: &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 1,
Count: 64,
},
handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) {
return func(stream network.Stream) {
for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step+1; i += req.Step {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = i
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
}
assert.NoError(t, stream.Close())
}
},
validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) {
assert.Equal(t, 0, len(blocks))
},
wantedErr: errInvalidFetchedData.Error(),
},
{
name: "not in a consecutive order",
req: &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 1,
Count: 64,
},
handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) {
return func(stream network.Stream) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 163
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))

blk = testutil.NewBeaconBlock()
blk.Block.Slot = 162
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, stream.Close())
}
},
validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) {
assert.Equal(t, 0, len(blocks))
},
wantedErr: errInvalidFetchedData.Error(),
},
{
name: "same slot number",
req: &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 1,
Count: 64,
},
handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) {
return func(stream network.Stream) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 160
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))

blk = testutil.NewBeaconBlock()
blk.Block.Slot = 160
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, stream.Close())
}
},
validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) {
assert.Equal(t, 0, len(blocks))
},
wantedErr: errInvalidFetchedData.Error(),
},
{
name: "slot is too low",
req: &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 1,
Count: 64,
},
handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) {
return func(stream network.Stream) {
defer func() {
assert.NoError(t, stream.Close())
}()
for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step; i += req.Step {
blk := testutil.NewBeaconBlock()
// Patch mid block, with invalid slot number.
if i == (req.StartSlot + req.Count*req.Step/2) {
blk.Block.Slot = req.StartSlot - 1
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
break
} else {
blk.Block.Slot = i
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
}
}
}
},
wantedErr: errInvalidFetchedData.Error(),
validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) {
assert.Equal(t, 0, len(blocks))
},
},
{
name: "slot is too high",
req: &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 1,
Count: 64,
},
handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) {
return func(stream network.Stream) {
defer func() {
assert.NoError(t, stream.Close())
}()
for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step; i += req.Step {
blk := testutil.NewBeaconBlock()
// Patch mid block, with invalid slot number.
if i == (req.StartSlot + req.Count*req.Step/2) {
blk.Block.Slot = req.StartSlot + req.Count*req.Step
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
break
} else {
blk.Block.Slot = i
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
}
}
}
},
wantedErr: errInvalidFetchedData.Error(),
validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) {
assert.Equal(t, 0, len(blocks))
},
},
{
name: "valid step increment",
req: &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 5,
Count: 64,
},
handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) {
return func(stream network.Stream) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 100
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))

blk = testutil.NewBeaconBlock()
blk.Block.Slot = 105
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, stream.Close())
}
},
validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) {
assert.Equal(t, 2, len(blocks))
},
},
{
name: "invalid step increment",
req: &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: 100,
Step: 5,
Count: 64,
},
handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) {
return func(stream network.Stream) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 100
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))

blk = testutil.NewBeaconBlock()
blk.Block.Slot = 103
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, stream.Close())
}
},
validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) {
assert.Equal(t, 0, len(blocks))
},
wantedErr: errInvalidFetchedData.Error(),
},
}

topic := p2pm.RPCBlocksByRangeTopic
protocol := core.ProtocolID(topic + p1.Encoding().ProtocolSuffix())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1})
fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 640, false)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p2 := p2pt.NewTestP2P(t)
p1.Connect(p2)

p2.BHost.SetStreamHandler(protocol, tt.handlerGenFn(tt.req))
blocks, err := fetcher.requestBlocks(ctx, tt.req, p2.PeerID())
if tt.wantedErr != "" {
assert.ErrorContains(t, tt.wantedErr, err)
} else {
assert.NoError(t, err)
tt.validate(tt.req, blocks)
}
})
}
}
11 changes: 8 additions & 3 deletions beacon-chain/sync/initial-sync/blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,21 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn {
}
response, ok := in.(*fetchRequestResponse)
if !ok {
return 0, errInputNotFetchRequestParams
return m.state, errInputNotFetchRequestParams
}
if response.err != nil {
// Current window is already too big, re-request previous epochs.
if response.err == errSlotIsTooHigh {
switch response.err {
case errSlotIsTooHigh:
// Current window is already too big, re-request previous epochs.
for _, fsm := range q.smm.machines {
if fsm.start < response.start && fsm.state == stateSkipped {
fsm.setState(stateNew)
}
}
case errInvalidFetchedData:
// Peer returned invalid data, penalize.
q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(m.pid)
log.WithField("pid", response.pid).Debug("Peer is penalized for invalid blocks")
}
return m.state, response.err
}
Expand Down
Loading

0 comments on commit 7fd2536

Please sign in to comment.