diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index ae7a5107898e..d69e1e8aec98 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -129,6 +129,7 @@ go_test( "@com_github_libp2p_go_libp2p_core//peer:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", + "@com_github_sirupsen_logrus//hooks/test:go_default_library", ], ) diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 68f0dbe804f4..4928a4112f3a 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -50,6 +50,7 @@ var ( errFetcherCtxIsDone = errors.New("fetcher's context is done, reinitialize") errSlotIsTooHigh = errors.New("slot is higher than the finalized slot") errBlockAlreadyProcessed = errors.New("block is already processed") + errInvalidFetchedData = errors.New("invalid data returned from peer") ) // blocksFetcherConfig is a config to setup the block fetcher. @@ -357,22 +358,33 @@ func (f *blocksFetcher) requestBlocks( } }() - resp := make([]*eth.SignedBeaconBlock, 0, req.Count) + blocks := make([]*eth.SignedBeaconBlock, 0, req.Count) + var prevSlot uint64 for i := uint64(0); ; i++ { isFirstChunk := i == 0 blk, err := prysmsync.ReadChunkedBlock(stream, f.p2p, isFirstChunk) if err == io.EOF { break } - // exit if more than max request blocks are returned - if i >= params.BeaconNetworkConfig().MaxRequestBlocks { - break - } if err != nil { return nil, err } - resp = append(resp, blk) + // The response MUST contain no more than `count` blocks, and no more than + // MAX_REQUEST_BLOCKS blocks. + if i >= req.Count || i >= params.BeaconNetworkConfig().MaxRequestBlocks { + return nil, errInvalidFetchedData + } + // Returned blocks MUST be in the slot range [start_slot, start_slot + count * step). + if blk.Block.Slot < req.StartSlot || blk.Block.Slot >= req.StartSlot+req.Count*req.Step { + return nil, errInvalidFetchedData + } + // Returned blocks, where they exist, MUST be sent in a consecutive order. + // Consecutive blocks MUST have values in `step` increments (slots may be skipped in between). + if !isFirstChunk && (prevSlot >= blk.Block.Slot || (blk.Block.Slot-prevSlot)%req.Step != 0) { + return nil, errInvalidFetchedData + } + prevSlot = blk.Block.Slot + blocks = append(blocks, blk) } - - return resp, nil + return blocks, nil } diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index af2ae42bca8b..021bdb414a1f 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -21,6 +21,7 @@ import ( p2pm "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" p2pt "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" + beaconsync "github.com/prysmaticlabs/prysm/beacon-chain/sync" p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/params" "github.com/prysmaticlabs/prysm/shared/roughtime" @@ -1148,3 +1149,239 @@ func TestBlocksFetcher_removeStalePeerLocks(t *testing.T) { }) } } + +func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T) { + p1 := p2pt.NewTestP2P(t) + tests := []struct { + name string + req *p2ppb.BeaconBlocksByRangeRequest + handlerGenFn func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) + wantedErr string + validate func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) + }{ + { + name: "no error", + req: &p2ppb.BeaconBlocksByRangeRequest{ + StartSlot: 100, + Step: 4, + Count: 64, + }, + handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) { + return func(stream network.Stream) { + for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step; i += req.Step { + blk := testutil.NewBeaconBlock() + blk.Block.Slot = i + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + } + assert.NoError(t, stream.Close()) + } + }, + validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) { + assert.Equal(t, req.Count, uint64(len(blocks))) + }, + }, + { + name: "too many blocks", + req: &p2ppb.BeaconBlocksByRangeRequest{ + StartSlot: 100, + Step: 1, + Count: 64, + }, + handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) { + return func(stream network.Stream) { + for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step+1; i += req.Step { + blk := testutil.NewBeaconBlock() + blk.Block.Slot = i + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + } + assert.NoError(t, stream.Close()) + } + }, + validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) { + assert.Equal(t, 0, len(blocks)) + }, + wantedErr: errInvalidFetchedData.Error(), + }, + { + name: "not in a consecutive order", + req: &p2ppb.BeaconBlocksByRangeRequest{ + StartSlot: 100, + Step: 1, + Count: 64, + }, + handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) { + return func(stream network.Stream) { + blk := testutil.NewBeaconBlock() + blk.Block.Slot = 163 + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + + blk = testutil.NewBeaconBlock() + blk.Block.Slot = 162 + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, stream.Close()) + } + }, + validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) { + assert.Equal(t, 0, len(blocks)) + }, + wantedErr: errInvalidFetchedData.Error(), + }, + { + name: "same slot number", + req: &p2ppb.BeaconBlocksByRangeRequest{ + StartSlot: 100, + Step: 1, + Count: 64, + }, + handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) { + return func(stream network.Stream) { + blk := testutil.NewBeaconBlock() + blk.Block.Slot = 160 + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + + blk = testutil.NewBeaconBlock() + blk.Block.Slot = 160 + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, stream.Close()) + } + }, + validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) { + assert.Equal(t, 0, len(blocks)) + }, + wantedErr: errInvalidFetchedData.Error(), + }, + { + name: "slot is too low", + req: &p2ppb.BeaconBlocksByRangeRequest{ + StartSlot: 100, + Step: 1, + Count: 64, + }, + handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) { + return func(stream network.Stream) { + defer func() { + assert.NoError(t, stream.Close()) + }() + for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step; i += req.Step { + blk := testutil.NewBeaconBlock() + // Patch mid block, with invalid slot number. + if i == (req.StartSlot + req.Count*req.Step/2) { + blk.Block.Slot = req.StartSlot - 1 + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + break + } else { + blk.Block.Slot = i + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + } + } + } + }, + wantedErr: errInvalidFetchedData.Error(), + validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) { + assert.Equal(t, 0, len(blocks)) + }, + }, + { + name: "slot is too high", + req: &p2ppb.BeaconBlocksByRangeRequest{ + StartSlot: 100, + Step: 1, + Count: 64, + }, + handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) { + return func(stream network.Stream) { + defer func() { + assert.NoError(t, stream.Close()) + }() + for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step; i += req.Step { + blk := testutil.NewBeaconBlock() + // Patch mid block, with invalid slot number. + if i == (req.StartSlot + req.Count*req.Step/2) { + blk.Block.Slot = req.StartSlot + req.Count*req.Step + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + break + } else { + blk.Block.Slot = i + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + } + } + } + }, + wantedErr: errInvalidFetchedData.Error(), + validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) { + assert.Equal(t, 0, len(blocks)) + }, + }, + { + name: "valid step increment", + req: &p2ppb.BeaconBlocksByRangeRequest{ + StartSlot: 100, + Step: 5, + Count: 64, + }, + handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) { + return func(stream network.Stream) { + blk := testutil.NewBeaconBlock() + blk.Block.Slot = 100 + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + + blk = testutil.NewBeaconBlock() + blk.Block.Slot = 105 + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, stream.Close()) + } + }, + validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) { + assert.Equal(t, 2, len(blocks)) + }, + }, + { + name: "invalid step increment", + req: &p2ppb.BeaconBlocksByRangeRequest{ + StartSlot: 100, + Step: 5, + Count: 64, + }, + handlerGenFn: func(req *p2ppb.BeaconBlocksByRangeRequest) func(stream network.Stream) { + return func(stream network.Stream) { + blk := testutil.NewBeaconBlock() + blk.Block.Slot = 100 + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + + blk = testutil.NewBeaconBlock() + blk.Block.Slot = 103 + assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk)) + assert.NoError(t, stream.Close()) + } + }, + validate: func(req *p2ppb.BeaconBlocksByRangeRequest, blocks []*eth.SignedBeaconBlock) { + assert.Equal(t, 0, len(blocks)) + }, + wantedErr: errInvalidFetchedData.Error(), + }, + } + + topic := p2pm.RPCBlocksByRangeTopic + protocol := core.ProtocolID(topic + p1.Encoding().ProtocolSuffix()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1}) + fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 640, false) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p2 := p2pt.NewTestP2P(t) + p1.Connect(p2) + + p2.BHost.SetStreamHandler(protocol, tt.handlerGenFn(tt.req)) + blocks, err := fetcher.requestBlocks(ctx, tt.req, p2.PeerID()) + if tt.wantedErr != "" { + assert.ErrorContains(t, tt.wantedErr, err) + } else { + assert.NoError(t, err) + tt.validate(tt.req, blocks) + } + }) + } +} diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go index 59a1ebac3c73..1df270bee00a 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue.go +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -283,16 +283,21 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn { } response, ok := in.(*fetchRequestResponse) if !ok { - return 0, errInputNotFetchRequestParams + return m.state, errInputNotFetchRequestParams } if response.err != nil { - // Current window is already too big, re-request previous epochs. - if response.err == errSlotIsTooHigh { + switch response.err { + case errSlotIsTooHigh: + // Current window is already too big, re-request previous epochs. for _, fsm := range q.smm.machines { if fsm.start < response.start && fsm.state == stateSkipped { fsm.setState(stateNew) } } + case errInvalidFetchedData: + // Peer returned invalid data, penalize. + q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(m.pid) + log.WithField("pid", response.pid).Debug("Peer is penalized for invalid blocks") } return m.state, response.err } diff --git a/beacon-chain/sync/initial-sync/blocks_queue_test.go b/beacon-chain/sync/initial-sync/blocks_queue_test.go index 5c6b1dbb5964..b1a32cfaa5bc 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue_test.go +++ b/beacon-chain/sync/initial-sync/blocks_queue_test.go @@ -5,14 +5,17 @@ import ( "fmt" "testing" + "github.com/libp2p/go-libp2p-core/peer" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/prysm/beacon-chain/flags" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/sliceutil" + "github.com/prysmaticlabs/prysm/shared/testutil" "github.com/prysmaticlabs/prysm/shared/testutil/assert" + logTest "github.com/sirupsen/logrus/hooks/test" ) -func TestBlocksQueueInitStartStop(t *testing.T) { +func TestBlocksQueue_InitStartStop(t *testing.T) { blockBatchLimit := uint64(flags.Get().BlockBatchLimit) mc, p2p, _ := initializeTestServices(t, []uint64{}, []*peerData{}) @@ -125,7 +128,7 @@ func TestBlocksQueueInitStartStop(t *testing.T) { }) } -func TestBlocksQueueLoop(t *testing.T) { +func TestBlocksQueue_Loop(t *testing.T) { tests := []struct { name string highestExpectedSlot uint64 @@ -291,3 +294,154 @@ func TestBlocksQueueLoop(t *testing.T) { }) } } + +func TestBlocksQueue_onDataReceivedEvent(t *testing.T) { + blockBatchLimit := uint64(flags.Get().BlockBatchLimit) + mc, p2p, _ := initializeTestServices(t, []uint64{}, []*peerData{}) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ + headFetcher: mc, + finalizationFetcher: mc, + p2p: p2p, + }) + + t.Run("expired context", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: fetcher, + headFetcher: mc, + finalizationFetcher: mc, + highestExpectedSlot: blockBatchLimit, + }) + handlerFn := queue.onDataReceivedEvent(ctx) + cancel() + updatedState, err := handlerFn(&stateMachine{ + state: stateScheduled, + }, nil) + assert.ErrorContains(t, context.Canceled.Error(), err) + assert.Equal(t, stateScheduled, updatedState) + }) + + t.Run("invalid input state", func(t *testing.T) { + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: fetcher, + headFetcher: mc, + finalizationFetcher: mc, + highestExpectedSlot: blockBatchLimit, + }) + + invalidStates := []stateID{stateNew, stateDataParsed, stateSkipped, stateSent} + for _, state := range invalidStates { + t.Run(state.String(), func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + handlerFn := queue.onDataReceivedEvent(ctx) + updatedState, err := handlerFn(&stateMachine{ + state: state, + }, nil) + assert.ErrorContains(t, errInvalidInitialState.Error(), err) + assert.Equal(t, state, updatedState) + }) + } + }) + + t.Run("invalid input param", func(t *testing.T) { + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: fetcher, + headFetcher: mc, + finalizationFetcher: mc, + highestExpectedSlot: blockBatchLimit, + }) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + handlerFn := queue.onDataReceivedEvent(ctx) + updatedState, err := handlerFn(&stateMachine{ + state: stateScheduled, + }, nil) + assert.ErrorContains(t, errInputNotFetchRequestParams.Error(), err) + assert.Equal(t, stateScheduled, updatedState) + }) + + t.Run("slot is too high", func(t *testing.T) { + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: fetcher, + headFetcher: mc, + finalizationFetcher: mc, + highestExpectedSlot: blockBatchLimit, + }) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + handlerFn := queue.onDataReceivedEvent(ctx) + updatedState, err := handlerFn(&stateMachine{ + state: stateScheduled, + }, &fetchRequestResponse{ + pid: "abc", + err: errSlotIsTooHigh, + }) + assert.ErrorContains(t, errSlotIsTooHigh.Error(), err) + assert.Equal(t, stateScheduled, updatedState) + }) + + t.Run("invalid data returned", func(t *testing.T) { + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: fetcher, + headFetcher: mc, + finalizationFetcher: mc, + highestExpectedSlot: blockBatchLimit, + }) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + hook := logTest.NewGlobal() + defer hook.Reset() + handlerFn := queue.onDataReceivedEvent(ctx) + updatedState, err := handlerFn(&stateMachine{ + state: stateScheduled, + }, &fetchRequestResponse{ + pid: "abc", + err: errInvalidFetchedData, + }) + assert.ErrorContains(t, errInvalidFetchedData.Error(), err) + assert.Equal(t, stateScheduled, updatedState) + assert.LogsContain(t, hook, "msg=\"Peer is penalized for invalid blocks\" pid=ZiCa") + }) + + t.Run("transition ok", func(t *testing.T) { + queue := newBlocksQueue(ctx, &blocksQueueConfig{ + blocksFetcher: fetcher, + headFetcher: mc, + finalizationFetcher: mc, + highestExpectedSlot: blockBatchLimit, + }) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + handlerFn := queue.onDataReceivedEvent(ctx) + response := &fetchRequestResponse{ + pid: "abc", + blocks: []*eth.SignedBeaconBlock{ + testutil.NewBeaconBlock(), + testutil.NewBeaconBlock(), + }, + } + fsm := &stateMachine{ + state: stateScheduled, + } + assert.Equal(t, (peer.ID)(""), fsm.pid) + assert.DeepEqual(t, ([]*eth.SignedBeaconBlock)(nil), fsm.blocks) + updatedState, err := handlerFn(fsm, response) + assert.NoError(t, err) + assert.Equal(t, stateDataParsed, updatedState) + assert.Equal(t, response.pid, fsm.pid) + assert.DeepEqual(t, response.blocks, fsm.blocks) + }) +}