diff --git a/beacon-chain/p2p/encoder/BUILD.bazel b/beacon-chain/p2p/encoder/BUILD.bazel index 7b2f1732996f..77fb79360693 100644 --- a/beacon-chain/p2p/encoder/BUILD.bazel +++ b/beacon-chain/p2p/encoder/BUILD.bazel @@ -33,6 +33,7 @@ go_test( embed = [":go_default_library"], deps = [ "//proto/testing:go_default_library", + "//shared/params:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_golang_snappy//:go_default_library", ], diff --git a/beacon-chain/p2p/encoder/network_encoding.go b/beacon-chain/p2p/encoder/network_encoding.go index ee2f7b445db7..4639f2848c29 100644 --- a/beacon-chain/p2p/encoder/network_encoding.go +++ b/beacon-chain/p2p/encoder/network_encoding.go @@ -14,20 +14,14 @@ const ( type NetworkEncoding interface { // DecodeGossip to the provided gossip message. The interface must be a pointer to the decoding destination. DecodeGossip([]byte, interface{}) error - // DecodeWithLength a bytes from a reader with a varint length prefix. The interface must be a pointer to the - // decoding destination. - DecodeWithLength(io.Reader, interface{}) error // DecodeWithMaxLength a bytes from a reader with a varint length prefix. The interface must be a pointer to the // decoding destination. The length of the message should not be more than the provided limit. - DecodeWithMaxLength(io.Reader, interface{}, uint64) error + DecodeWithMaxLength(io.Reader, interface{}) error // EncodeGossip an arbitrary gossip message to the provided writer. The interface must be a pointer object to encode. EncodeGossip(io.Writer, interface{}) (int, error) - // EncodeWithLength an arbitrary message to the provided writer with a varint length prefix. The interface must be - // a pointer object to encode. - EncodeWithLength(io.Writer, interface{}) (int, error) // EncodeWithMaxLength an arbitrary message to the provided writer with a varint length prefix. The interface must be // a pointer object to encode. The encoded message should not be bigger than the provided limit. - EncodeWithMaxLength(io.Writer, interface{}, uint64) (int, error) + EncodeWithMaxLength(io.Writer, interface{}) (int, error) // ProtocolSuffix returns the last part of the protocol ID to indicate the encoding scheme. ProtocolSuffix() string } diff --git a/beacon-chain/p2p/encoder/ssz.go b/beacon-chain/p2p/encoder/ssz.go index df6404fe0884..9f05ee47d9e8 100644 --- a/beacon-chain/p2p/encoder/ssz.go +++ b/beacon-chain/p2p/encoder/ssz.go @@ -15,9 +15,6 @@ import ( var _ = NetworkEncoding(&SszNetworkEncoder{}) -// MaxChunkSize allowed for decoding messages. -var MaxChunkSize = params.BeaconNetworkConfig().MaxChunkSize // 1Mib - // MaxGossipSize allowed for gossip messages. var MaxGossipSize = params.BeaconNetworkConfig().GossipMaxSize // 1 Mib @@ -60,30 +57,9 @@ func (e SszNetworkEncoder) EncodeGossip(w io.Writer, msg interface{}) (int, erro return w.Write(b) } -// EncodeWithLength the proto message to the io.Writer. This encoding prefixes the byte slice with a protobuf varint -// to indicate the size of the message. -func (e SszNetworkEncoder) EncodeWithLength(w io.Writer, msg interface{}) (int, error) { - if msg == nil { - return 0, nil - } - b, err := e.doEncode(msg) - if err != nil { - return 0, err - } - // write varint first - _, err = w.Write(proto.EncodeVarint(uint64(len(b)))) - if err != nil { - return 0, err - } - if e.UseSnappyCompression { - return writeSnappyBuffer(w, b) - } - return w.Write(b) -} - // EncodeWithMaxLength the proto message to the io.Writer. This encoding prefixes the byte slice with a protobuf varint // to indicate the size of the message. This checks that the encoded message isn't larger than the provided max limit. -func (e SszNetworkEncoder) EncodeWithMaxLength(w io.Writer, msg interface{}, maxSize uint64) (int, error) { +func (e SszNetworkEncoder) EncodeWithMaxLength(w io.Writer, msg interface{}) (int, error) { if msg == nil { return 0, nil } @@ -91,8 +67,12 @@ func (e SszNetworkEncoder) EncodeWithMaxLength(w io.Writer, msg interface{}, max if err != nil { return 0, err } - if uint64(len(b)) > maxSize { - return 0, fmt.Errorf("size of encoded message is %d which is larger than the provided max limit of %d", len(b), maxSize) + if uint64(len(b)) > params.BeaconNetworkConfig().MaxChunkSize { + return 0, fmt.Errorf( + "size of encoded message is %d which is larger than the provided max limit of %d", + len(b), + params.BeaconNetworkConfig().MaxChunkSize, + ) } // write varint first _, err = w.Write(proto.EncodeVarint(uint64(len(b)))) @@ -139,23 +119,19 @@ func (e SszNetworkEncoder) DecodeGossip(b []byte, to interface{}) error { return e.doDecode(b, to) } -// DecodeWithLength the bytes from io.Reader to the protobuf message provided. -func (e SszNetworkEncoder) DecodeWithLength(r io.Reader, to interface{}) error { - return e.DecodeWithMaxLength(r, to, MaxChunkSize) -} - // DecodeWithMaxLength the bytes from io.Reader to the protobuf message provided. // This checks that the decoded message isn't larger than the provided max limit. -func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to interface{}, maxSize uint64) error { - if maxSize > MaxChunkSize { - return fmt.Errorf("maxSize %d exceeds max chunk size %d", maxSize, MaxChunkSize) - } +func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to interface{}) error { msgLen, err := readVarint(r) if err != nil { return err } - if msgLen > maxSize { - return fmt.Errorf("remaining bytes %d goes over the provided max limit of %d", msgLen, maxSize) + if msgLen > params.BeaconNetworkConfig().MaxChunkSize { + return fmt.Errorf( + "remaining bytes %d goes over the provided max limit of %d", + msgLen, + params.BeaconNetworkConfig().MaxChunkSize, + ) } if e.UseSnappyCompression { r = newBufferedReader(r) diff --git a/beacon-chain/p2p/encoder/ssz_test.go b/beacon-chain/p2p/encoder/ssz_test.go index 941ace05cfeb..48d97bae5f4c 100644 --- a/beacon-chain/p2p/encoder/ssz_test.go +++ b/beacon-chain/p2p/encoder/ssz_test.go @@ -9,6 +9,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder" testpb "github.com/prysmaticlabs/prysm/proto/testing" + "github.com/prysmaticlabs/prysm/shared/params" ) func TestSszNetworkEncoder_RoundTrip(t *testing.T) { @@ -29,12 +30,12 @@ func testRoundTripWithLength(t *testing.T, e *encoder.SszNetworkEncoder) { Foo: []byte("fooooo"), Bar: 9001, } - _, err := e.EncodeWithLength(buf, msg) + _, err := e.EncodeWithMaxLength(buf, msg) if err != nil { t.Fatal(err) } decoded := &testpb.TestSimpleMessage{} - if err := e.DecodeWithLength(buf, decoded); err != nil { + if err := e.DecodeWithMaxLength(buf, decoded); err != nil { t.Fatal(err) } if !proto.Equal(decoded, msg) { @@ -70,9 +71,12 @@ func TestSszNetworkEncoder_EncodeWithMaxLength(t *testing.T) { Bar: 9001, } e := &encoder.SszNetworkEncoder{UseSnappyCompression: false} - maxLength := uint64(5) - _, err := e.EncodeWithMaxLength(buf, msg, maxLength) - wanted := fmt.Sprintf("which is larger than the provided max limit of %d", maxLength) + params.SetupTestConfigCleanup(t) + c := params.BeaconNetworkConfig() + c.MaxChunkSize = uint64(5) + params.OverrideBeaconNetworkConfig(c) + _, err := e.EncodeWithMaxLength(buf, msg) + wanted := fmt.Sprintf("which is larger than the provided max limit of %d", params.BeaconNetworkConfig().MaxChunkSize) if err == nil { t.Fatalf("wanted this error %s but got nothing", wanted) } @@ -88,14 +92,18 @@ func TestSszNetworkEncoder_DecodeWithMaxLength(t *testing.T) { Bar: 4242, } e := &encoder.SszNetworkEncoder{UseSnappyCompression: false} - maxLength := uint64(5) + params.SetupTestConfigCleanup(t) + c := params.BeaconNetworkConfig() + maxChunkSize := uint64(5) + c.MaxChunkSize = maxChunkSize + params.OverrideBeaconNetworkConfig(c) _, err := e.EncodeGossip(buf, msg) if err != nil { t.Fatal(err) } decoded := &testpb.TestSimpleMessage{} - err = e.DecodeWithMaxLength(buf, decoded, maxLength) - wanted := fmt.Sprintf("goes over the provided max limit of %d", maxLength) + err = e.DecodeWithMaxLength(buf, decoded) + wanted := fmt.Sprintf("goes over the provided max limit of %d", maxChunkSize) if err == nil { t.Fatalf("wanted this error %s but got nothing", wanted) } @@ -103,12 +111,3 @@ func TestSszNetworkEncoder_DecodeWithMaxLength(t *testing.T) { t.Errorf("error did not contain wanted message. Wanted: %s but Got: %s", wanted, err.Error()) } } - -func TestSszNetworkEncoder_DecodeWithMaxLength_TooLarge(t *testing.T) { - e := &encoder.SszNetworkEncoder{UseSnappyCompression: false} - if err := e.DecodeWithMaxLength(nil, nil, encoder.MaxChunkSize+1); err == nil { - t.Fatal("Nil error") - } else if !strings.Contains(err.Error(), "exceeds max chunk size") { - t.Error("Expected error to contain 'exceeds max chunk size'") - } -} diff --git a/beacon-chain/p2p/sender.go b/beacon-chain/p2p/sender.go index 8df56273b098..0a8e33a1ff35 100644 --- a/beacon-chain/p2p/sender.go +++ b/beacon-chain/p2p/sender.go @@ -43,7 +43,7 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin return stream, nil } - if _, err := s.Encoding().EncodeWithLength(stream, message); err != nil { + if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil { traceutil.AnnotateError(span, err) return nil, err } diff --git a/beacon-chain/p2p/sender_test.go b/beacon-chain/p2p/sender_test.go index c7939718f635..204812a7d57a 100644 --- a/beacon-chain/p2p/sender_test.go +++ b/beacon-chain/p2p/sender_test.go @@ -34,10 +34,10 @@ func TestService_Send(t *testing.T) { p2.SetStreamHandler("/testing/1/ssz", func(stream network.Stream) { rcvd := &testpb.TestSimpleMessage{} - if err := svc.Encoding().DecodeWithLength(stream, rcvd); err != nil { + if err := svc.Encoding().DecodeWithMaxLength(stream, rcvd); err != nil { t.Fatal(err) } - if _, err := svc.Encoding().EncodeWithLength(stream, rcvd); err != nil { + if _, err := svc.Encoding().EncodeWithMaxLength(stream, rcvd); err != nil { t.Fatal(err) } if err := stream.Close(); err != nil { @@ -54,7 +54,7 @@ func TestService_Send(t *testing.T) { testutil.WaitTimeout(&wg, 1*time.Second) rcvd := &testpb.TestSimpleMessage{} - if err := svc.Encoding().DecodeWithLength(stream, rcvd); err != nil { + if err := svc.Encoding().DecodeWithMaxLength(stream, rcvd); err != nil { t.Fatal(err) } if !proto.Equal(rcvd, msg) { diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 861cdbf99edd..67e77900b65d 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -88,7 +88,7 @@ func (p *TestP2P) ReceiveRPC(topic string, msg proto.Message) { } }() - n, err := p.Encoding().EncodeWithLength(s, msg) + n, err := p.Encoding().EncodeWithMaxLength(s, msg) if err != nil { p.t.Fatalf("Failed to encode message: %v", err) } @@ -232,7 +232,7 @@ func (p *TestP2P) Send(ctx context.Context, msg interface{}, topic string, pid p } if topic != "/eth2/beacon_chain/req/metadata/1" { - if _, err := p.Encoding().EncodeWithLength(stream, msg); err != nil { + if _, err := p.Encoding().EncodeWithMaxLength(stream, msg); err != nil { return nil, err } } diff --git a/beacon-chain/sync/error.go b/beacon-chain/sync/error.go index 02d713e4d672..147eb67cab2d 100644 --- a/beacon-chain/sync/error.go +++ b/beacon-chain/sync/error.go @@ -27,7 +27,7 @@ func (s *Service) generateErrorResponse(code byte, reason string) ([]byte, error resp := &pb.ErrorResponse{ Message: []byte(reason), } - if _, err := s.p2p.Encoding().EncodeWithLength(buf, resp); err != nil { + if _, err := s.p2p.Encoding().EncodeWithMaxLength(buf, resp); err != nil { return nil, err } @@ -49,7 +49,7 @@ func ReadStatusCode(stream io.Reader, encoding encoder.NetworkEncoding) (uint8, msg := &pb.ErrorResponse{ Message: []byte{}, } - if err := encoding.DecodeWithLength(stream, msg); err != nil { + if err := encoding.DecodeWithMaxLength(stream, msg); err != nil { return 0, "", err } diff --git a/beacon-chain/sync/error_test.go b/beacon-chain/sync/error_test.go index 5f70d357d3bf..2240466d304e 100644 --- a/beacon-chain/sync/error_test.go +++ b/beacon-chain/sync/error_test.go @@ -26,7 +26,7 @@ func TestRegularSync_generateErrorResponse(t *testing.T) { t.Errorf("The first byte was not the status code. Got %#x wanted %#x", b, responseCodeServerError) } msg := &pb.ErrorResponse{} - if err := r.p2p.Encoding().DecodeWithLength(buf, msg); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(buf, msg); err != nil { t.Fatal(err) } if string(msg.Message) != "something bad happened" { diff --git a/beacon-chain/sync/initial-sync/initial_sync_test.go b/beacon-chain/sync/initial-sync/initial_sync_test.go index 83fc7b520bfa..0ef5ebc7749c 100644 --- a/beacon-chain/sync/initial-sync/initial_sync_test.go +++ b/beacon-chain/sync/initial-sync/initial_sync_test.go @@ -183,7 +183,7 @@ func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus }() req := &p2ppb.BeaconBlocksByRangeRequest{} - if err := peer.Encoding().DecodeWithLength(stream, req); err != nil { + if err := peer.Encoding().DecodeWithMaxLength(stream, req); err != nil { t.Error(err) } @@ -194,7 +194,7 @@ func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus if _, err := stream.Write([]byte{0x01}); err != nil { t.Error(err) } - if _, err := peer.Encoding().EncodeWithLength(stream, "bad"); err != nil { + if _, err := peer.Encoding().EncodeWithMaxLength(stream, "bad"); err != nil { t.Error(err) } return diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index 21e948bbbbc6..75925c70efc4 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -20,11 +20,6 @@ import ( // they don't receive the first byte within 5 seconds. var ttfbTimeout = params.BeaconNetworkConfig().TtfbTimeout -// maxChunkSize would be the maximum allowed size that a request/response chunk can be. -// any size beyond that would be rejected and the corresponding stream reset. This would -// be 1048576 bytes or 1 MiB. -var maxChunkSize = params.BeaconNetworkConfig().MaxChunkSize - // rpcHandler is responsible for handling and responding to any incoming message. // This method may return an error to internal monitoring, but the error will // not be relayed to the peer. @@ -109,7 +104,7 @@ func (s *Service) registerRPC(topic string, base interface{}, handle rpcHandler) t := reflect.TypeOf(base) if t.Kind() == reflect.Ptr { msg := reflect.New(t.Elem()) - if err := s.p2p.Encoding().DecodeWithLength(stream, msg.Interface()); err != nil { + if err := s.p2p.Encoding().DecodeWithMaxLength(stream, msg.Interface()); err != nil { // Debug logs for goodbye/status errors if strings.Contains(topic, p2p.RPCGoodByeTopic) || strings.Contains(topic, p2p.RPCStatusTopic) { log.WithError(err).Debug("Failed to decode goodbye stream message") @@ -129,7 +124,7 @@ func (s *Service) registerRPC(topic string, base interface{}, handle rpcHandler) } } else { msg := reflect.New(t) - if err := s.p2p.Encoding().DecodeWithLength(stream, msg.Interface()); err != nil { + if err := s.p2p.Encoding().DecodeWithMaxLength(stream, msg.Interface()); err != nil { log.WithError(err).Warn("Failed to decode stream message") traceutil.AnnotateError(span, err) return diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go index b9ecb1849f77..51608b86f592 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go @@ -54,7 +54,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsBlocks(t *testing.T) { for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step; i += req.Step { expectSuccess(t, r, stream) res := ðpb.SignedBeaconBlock{} - if err := r.p2p.Encoding().DecodeWithLength(stream, res); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, res); err != nil { t.Error(err) } if (res.Block.Slot-req.StartSlot)%req.Step != 0 { @@ -121,7 +121,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) { for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step; i += req.Step { expectSuccess(t, r, stream) res := ðpb.SignedBeaconBlock{} - if err := r.p2p.Encoding().DecodeWithLength(stream, res); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, res); err != nil { t.Error(err) } if res.Block.Slot < prevSlot { @@ -188,7 +188,7 @@ func TestRPCBeaconBlocksByRange_ReturnsGenesisBlock(t *testing.T) { // check for genesis block expectSuccess(t, r, stream) res := ðpb.SignedBeaconBlock{} - if err := r.p2p.Encoding().DecodeWithLength(stream, res); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, res); err != nil { t.Error(err) } if res.Block.Slot != 0 { @@ -197,7 +197,7 @@ func TestRPCBeaconBlocksByRange_ReturnsGenesisBlock(t *testing.T) { for i := req.StartSlot + req.Step; i < req.Count*req.Step; i += req.Step { expectSuccess(t, r, stream) res := ðpb.SignedBeaconBlock{} - if err := r.p2p.Encoding().DecodeWithLength(stream, res); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, res); err != nil { t.Error(err) } } @@ -243,7 +243,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step; i += req.Step { expectSuccess(t, r, stream) res := ðpb.SignedBeaconBlock{} - if err := r.p2p.Encoding().DecodeWithLength(stream, res); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, res); err != nil { t.Error(err) } if (res.Block.Slot-req.StartSlot)%req.Step != 0 { diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go index ce148f3e9d27..ef20e66c9d6c 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go @@ -59,7 +59,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) { for i := range blkRoots { expectSuccess(t, r, stream) res := ðpb.SignedBeaconBlock{} - if err := r.p2p.Encoding().DecodeWithLength(stream, &res); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, &res); err != nil { t.Error(err) } if res.Block.Slot != uint64(i+1) { @@ -135,7 +135,7 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) { p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { defer wg.Done() out := [][32]byte{} - if err := p2.Encoding().DecodeWithLength(stream, &out); err != nil { + if err := p2.Encoding().DecodeWithMaxLength(stream, &out); err != nil { t.Fatal(err) } if !reflect.DeepEqual(out, expectedRoots) { @@ -146,7 +146,7 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) { if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { t.Fatalf("Failed to write to stream: %v", err) } - _, err := p2.Encoding().EncodeWithLength(stream, blk) + _, err := p2.Encoding().EncodeWithMaxLength(stream, blk) if err != nil { t.Errorf("Could not send response back: %v ", err) } diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index a3e256772f39..c9adb1108019 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -24,7 +24,7 @@ func WriteChunk(stream libp2pcore.Stream, encoding encoder.NetworkEncoding, msg if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { return err } - _, err := encoding.EncodeWithMaxLength(stream, msg, maxChunkSize) + _, err := encoding.EncodeWithMaxLength(stream, msg) return err } @@ -50,5 +50,5 @@ func readResponseChunk(stream libp2pcore.Stream, p2p p2p.P2P, to interface{}) er if code != 0 { return errors.New(errMsg) } - return p2p.Encoding().DecodeWithMaxLength(stream, to, maxChunkSize) + return p2p.Encoding().DecodeWithMaxLength(stream, to) } diff --git a/beacon-chain/sync/rpc_goodbye_test.go b/beacon-chain/sync/rpc_goodbye_test.go index ba62786b3b09..0da4b9d2451a 100644 --- a/beacon-chain/sync/rpc_goodbye_test.go +++ b/beacon-chain/sync/rpc_goodbye_test.go @@ -80,7 +80,7 @@ func TestSendGoodbye_SendsMessage(t *testing.T) { p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { defer wg.Done() out := new(uint64) - if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } if *out != failureCode { @@ -127,7 +127,7 @@ func TestSendGoodbye_DisconnectWithPeer(t *testing.T) { p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { defer wg.Done() out := new(uint64) - if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } if *out != failureCode { diff --git a/beacon-chain/sync/rpc_metadata.go b/beacon-chain/sync/rpc_metadata.go index c65c2f5ad824..db693056a51c 100644 --- a/beacon-chain/sync/rpc_metadata.go +++ b/beacon-chain/sync/rpc_metadata.go @@ -25,7 +25,7 @@ func (s *Service) metaDataHandler(ctx context.Context, msg interface{}, stream l if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { return err } - _, err := s.p2p.Encoding().EncodeWithLength(stream, s.p2p.Metadata()) + _, err := s.p2p.Encoding().EncodeWithMaxLength(stream, s.p2p.Metadata()) return err } @@ -54,7 +54,7 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta return nil, errors.New(errMsg) } msg := new(pb.MetaData) - if err := s.p2p.Encoding().DecodeWithLength(stream, msg); err != nil { + if err := s.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil { return nil, err } return msg, nil diff --git a/beacon-chain/sync/rpc_metadata_test.go b/beacon-chain/sync/rpc_metadata_test.go index 317863b018b7..e6d2d4ac04b5 100644 --- a/beacon-chain/sync/rpc_metadata_test.go +++ b/beacon-chain/sync/rpc_metadata_test.go @@ -44,7 +44,7 @@ func TestMetaDataRPCHandler_ReceivesMetadata(t *testing.T) { defer wg.Done() expectSuccess(t, r, stream) out := new(pb.MetaData) - if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } if !ssz.DeepEqual(p1.LocalMetadata, out) { diff --git a/beacon-chain/sync/rpc_ping.go b/beacon-chain/sync/rpc_ping.go index ac5376675cc8..ecc494137f12 100644 --- a/beacon-chain/sync/rpc_ping.go +++ b/beacon-chain/sync/rpc_ping.go @@ -36,7 +36,7 @@ func (s *Service) pingHandler(ctx context.Context, msg interface{}, stream libp2 } return err } - if _, err := s.p2p.Encoding().EncodeWithLength(stream, s.p2p.MetadataSeq()); err != nil { + if _, err := s.p2p.Encoding().EncodeWithMaxLength(stream, s.p2p.MetadataSeq()); err != nil { if err := stream.Close(); err != nil { log.WithError(err).Error("Failed to close stream") } @@ -101,7 +101,7 @@ func (s *Service) sendPingRequest(ctx context.Context, id peer.ID) error { return errors.New(errMsg) } msg := new(uint64) - if err := s.p2p.Encoding().DecodeWithLength(stream, msg); err != nil { + if err := s.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil { return err } valid, err := s.validateSequenceNum(*msg, stream.Conn().RemotePeer()) diff --git a/beacon-chain/sync/rpc_ping_test.go b/beacon-chain/sync/rpc_ping_test.go index 99934f790925..da871fcf0186 100644 --- a/beacon-chain/sync/rpc_ping_test.go +++ b/beacon-chain/sync/rpc_ping_test.go @@ -50,7 +50,7 @@ func TestPingRPCHandler_ReceivesPing(t *testing.T) { defer wg.Done() expectSuccess(t, r, stream) out := new(uint64) - if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } if *out != 2 { @@ -119,7 +119,7 @@ func TestPingRPCHandler_SendsPing(t *testing.T) { p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { defer wg.Done() out := new(uint64) - if err := r2.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r2.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } if *out != 2 { diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index 8708bd8952c5..7f6a05266d1e 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -141,7 +141,7 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { } msg := &pb.Status{} - if err := s.p2p.Encoding().DecodeWithLength(stream, msg); err != nil { + if err := s.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil { return err } s.p2p.Peers().SetChainState(stream.Conn().RemotePeer(), msg) @@ -261,7 +261,7 @@ func (s *Service) respondWithStatus(ctx context.Context, stream network.Stream) if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { log.WithError(err).Error("Failed to write to stream") } - _, err = s.p2p.Encoding().EncodeWithLength(stream, resp) + _, err = s.p2p.Encoding().EncodeWithMaxLength(stream, resp) return err } diff --git a/beacon-chain/sync/rpc_status_test.go b/beacon-chain/sync/rpc_status_test.go index 3ff3505aa332..8da3fec28898 100644 --- a/beacon-chain/sync/rpc_status_test.go +++ b/beacon-chain/sync/rpc_status_test.go @@ -56,7 +56,7 @@ func TestStatusRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) { defer wg.Done() expectSuccess(t, r, stream) out := &pb.Status{} - if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } if !bytes.Equal(out.FinalizedRoot, root[:]) { @@ -70,7 +70,7 @@ func TestStatusRPCHandler_Disconnects_OnForkVersionMismatch(t *testing.T) { p2.BHost.SetStreamHandler(pcl2, func(stream network.Stream) { defer wg2.Done() msg := new(uint64) - if err := r.p2p.Encoding().DecodeWithLength(stream, msg); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil { t.Error(err) } if *msg != codeWrongNetwork { @@ -131,7 +131,7 @@ func TestStatusRPCHandler_ConnectsOnGenesis(t *testing.T) { defer wg.Done() expectSuccess(t, r, stream) out := &pb.Status{} - if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } if !bytes.Equal(out.FinalizedRoot, root[:]) { @@ -232,7 +232,7 @@ func TestStatusRPCHandler_ReturnsHelloMessage(t *testing.T) { defer wg.Done() expectSuccess(t, r, stream) out := &pb.Status{} - if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } expected := &pb.Status{ @@ -339,7 +339,7 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) { p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { defer wg.Done() out := &pb.Status{} - if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } log.WithField("status", out).Warn("received status") @@ -349,7 +349,7 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) { if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { t.Fatal(err) } - _, err := r.p2p.Encoding().EncodeWithLength(stream, resp) + _, err := r.p2p.Encoding().EncodeWithMaxLength(stream, resp) if err != nil { t.Fatal(err) } @@ -365,7 +365,7 @@ func TestHandshakeHandlers_Roundtrip(t *testing.T) { p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { defer wg2.Done() out := new(uint64) - if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } if *out != 2 { @@ -480,7 +480,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) { p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { defer wg.Done() out := &pb.Status{} - if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } digest, err := r.forkDigest() @@ -588,7 +588,7 @@ func TestStatusRPCRequest_FinalizedBlockExists(t *testing.T) { p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { defer wg.Done() out := &pb.Status{} - if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } err := r2.validateStatusMessage(context.Background(), out) @@ -662,7 +662,7 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) { p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { defer wg.Done() out := &pb.Status{} - if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil { + if err := r.p2p.Encoding().DecodeWithMaxLength(stream, out); err != nil { t.Fatal(err) } expected := &pb.Status{ @@ -675,7 +675,7 @@ func TestStatusRPCRequest_BadPeerHandshake(t *testing.T) { if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { log.WithError(err).Error("Failed to write to stream") } - _, err := r.p2p.Encoding().EncodeWithLength(stream, expected) + _, err := r.p2p.Encoding().EncodeWithMaxLength(stream, expected) if err != nil { t.Errorf("Could not send status: %v", err) }