Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QSP-13 Only Allow Snappy P2P Encoding #6415

Merged
merged 22 commits into from
Jul 3, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9b4fd94
enforce only snappy
rauljordan Jun 26, 2020
e7ab7b2
Merge branch 'master' into remove-non-snappy
rauljordan Jun 26, 2020
11e1b7a
fix up tests
rauljordan Jun 26, 2020
5ad8791
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jun 26, 2020
c583ce8
fix up confs
rauljordan Jun 29, 2020
742bc68
replace with ssz snappy in tests
rauljordan Jun 29, 2020
0d5c37f
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jun 29, 2020
e2e03e3
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jun 30, 2020
86dd23d
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jun 30, 2020
b450627
Merge branch 'master' into remove-non-snappy
rauljordan Jun 30, 2020
75b3509
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jul 1, 2020
236d7cb
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jul 1, 2020
9e55d1c
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jul 1, 2020
22bbd63
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jul 1, 2020
71b9012
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jul 1, 2020
fe1f022
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jul 1, 2020
42d04ae
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jul 1, 2020
655ffb0
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jul 1, 2020
a0195de
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jul 2, 2020
d9d8e33
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jul 2, 2020
9a1d715
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jul 2, 2020
a2b11a2
Merge refs/heads/master into remove-non-snappy
prylabs-bulldozer[bot] Jul 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion beacon-chain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ var appFlags = []cli.Flag{
cmd.P2PMetadata,
cmd.P2PAllowList,
cmd.P2PDenyList,
cmd.P2PEncoding,
cmd.DataDirFlag,
cmd.VerbosityFlag,
cmd.EnableTracingFlag,
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
DenyListCIDR: sliceutil.SplitCommaSeparated(cliCtx.StringSlice(cmd.P2PDenyList.Name)),
EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name),
DisableDiscv5: cliCtx.Bool(flags.DisableDiscv5.Name),
Encoding: cliCtx.String(cmd.P2PEncoding.Name),
StateNotifier: b,
})
if err != nil {
Expand Down
8 changes: 3 additions & 5 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ func TestService_Broadcast(t *testing.T) {
}

p := &Service{
host: p1.BHost,
pubsub: p1.PubSub(),
cfg: &Config{
Encoding: "ssz",
},
host: p1.BHost,
pubsub: p1.PubSub(),
cfg: &Config{},
genesisTime: time.Now(),
genesisValidatorsRoot: []byte{'A'},
}
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,5 @@ type Config struct {
MaxPeers uint
AllowListCIDR string
DenyListCIDR []string
Encoding string
StateNotifier statefeed.Notifier
}
5 changes: 2 additions & 3 deletions beacon-chain/p2p/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func createAddrAndPrivKey(t *testing.T) (net.IP, *ecdsa.PrivateKey) {
if err != nil {
t.Fatal(err)
}
pkey, err := privKey(&Config{Encoding: "ssz", DataDir: tempPath})
pkey, err := privKey(&Config{DataDir: tempPath})
if err != nil {
t.Fatalf("Could not get private key: %v", err)
}
Expand Down Expand Up @@ -95,7 +95,6 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
port = 3000 + i
cfg := &Config{
Discv5BootStrapAddr: []string{bootNode.String()},
Encoding: "ssz",
UDPPort: uint(port),
}
ipAddr, pkey := createAddrAndPrivKey(t)
Expand Down Expand Up @@ -167,7 +166,7 @@ func TestMultiAddrConversion_OK(t *testing.T) {

func TestStaticPeering_PeersAreAdded(t *testing.T) {
cfg := &Config{
Encoding: "ssz", MaxPeers: 30,
MaxPeers: 30,
}
port := 6000
var staticPeers []string
Expand Down
6 changes: 0 additions & 6 deletions beacon-chain/p2p/encoder/network_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@ import (
"io"
)

// Defines the different encoding formats
const (
SSZ = "ssz" // SSZ is SSZ only.
SSZSnappy = "ssz-snappy" // SSZSnappy is SSZ with snappy compression.
)

// NetworkEncoding represents an encoder compatible with Ethereum 2.0 p2p.
type NetworkEncoding interface {
// DecodeGossip to the provided gossip message. The interface must be a pointer to the decoding destination.
Expand Down
39 changes: 11 additions & 28 deletions beacon-chain/p2p/encoder/ssz.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ var bufReaderPool = new(sync.Pool)

// SszNetworkEncoder supports p2p networking encoding using SimpleSerialize
// with snappy compression (if enabled).
type SszNetworkEncoder struct {
UseSnappyCompression bool
}
type SszNetworkEncoder struct{}

func (e SszNetworkEncoder) doEncode(msg interface{}) ([]byte, error) {
if v, ok := msg.(fastssz.Marshaler); ok {
Expand All @@ -51,9 +49,7 @@ func (e SszNetworkEncoder) EncodeGossip(w io.Writer, msg interface{}) (int, erro
if uint64(len(b)) > MaxGossipSize {
return 0, errors.Errorf("gossip message exceeds max gossip size: %d bytes > %d bytes", len(b), MaxGossipSize)
}
if e.UseSnappyCompression {
b = snappy.Encode(nil /*dst*/, b)
}
b = snappy.Encode(nil /*dst*/, b)
return w.Write(b)
}

Expand All @@ -79,10 +75,7 @@ func (e SszNetworkEncoder) EncodeWithMaxLength(w io.Writer, msg interface{}) (in
if err != nil {
return 0, err
}
if e.UseSnappyCompression {
return writeSnappyBuffer(w, b)
}
return w.Write(b)
return writeSnappyBuffer(w, b)
}

func (e SszNetworkEncoder) doDecode(b []byte, to interface{}) error {
Expand All @@ -106,12 +99,10 @@ func (e SszNetworkEncoder) doDecode(b []byte, to interface{}) error {

// DecodeGossip decodes the bytes to the protobuf gossip message provided.
func (e SszNetworkEncoder) DecodeGossip(b []byte, to interface{}) error {
if e.UseSnappyCompression {
var err error
b, err = snappy.Decode(nil /*dst*/, b)
if err != nil {
return err
}
var err error
b, err = snappy.Decode(nil /*dst*/, b)
if err != nil {
return err
}
if uint64(len(b)) > MaxGossipSize {
return errors.Errorf("gossip message exceeds max gossip size: %d bytes > %d bytes", len(b), MaxGossipSize)
Expand All @@ -133,10 +124,8 @@ func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to interface{}) erro
params.BeaconNetworkConfig().MaxChunkSize,
)
}
if e.UseSnappyCompression {
r = newBufferedReader(r)
defer bufReaderPool.Put(r)
}
r = newBufferedReader(r)
defer bufReaderPool.Put(r)
b := make([]byte, e.MaxLength(int(msgLen)))
numOfBytes, err := r.Read(b)
if err != nil {
Expand All @@ -147,19 +136,13 @@ func (e SszNetworkEncoder) DecodeWithMaxLength(r io.Reader, to interface{}) erro

// ProtocolSuffix returns the appropriate suffix for protocol IDs.
func (e SszNetworkEncoder) ProtocolSuffix() string {
if e.UseSnappyCompression {
return "/ssz_snappy"
}
return "/ssz"
return "/ssz_snappy"
}

// MaxLength specifies the maximum possible length of an encoded
// chunk of data.
func (e SszNetworkEncoder) MaxLength(length int) int {
if e.UseSnappyCompression {
return snappy.MaxEncodedLen(length)
}
return length
return snappy.MaxEncodedLen(length)
}

// Writes a bytes value through a snappy buffered writer.
Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/p2p/encoder/ssz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
)

func TestSszNetworkEncoder_RoundTrip(t *testing.T) {
e := &encoder.SszNetworkEncoder{UseSnappyCompression: false}
e := &encoder.SszNetworkEncoder{}
testRoundTripWithLength(t, e)
testRoundTripWithGossip(t, e)
}

func TestSszNetworkEncoder_RoundTrip_Snappy(t *testing.T) {
e := &encoder.SszNetworkEncoder{UseSnappyCompression: true}
e := &encoder.SszNetworkEncoder{}
testRoundTripWithLength(t, e)
testRoundTripWithGossip(t, e)
}
Expand Down Expand Up @@ -70,7 +70,7 @@ func TestSszNetworkEncoder_EncodeWithMaxLength(t *testing.T) {
Foo: []byte("fooooo"),
Bar: 9001,
}
e := &encoder.SszNetworkEncoder{UseSnappyCompression: false}
e := &encoder.SszNetworkEncoder{}
params.SetupTestConfigCleanup(t)
c := params.BeaconNetworkConfig()
c.MaxChunkSize = uint64(5)
Expand All @@ -91,7 +91,7 @@ func TestSszNetworkEncoder_DecodeWithMaxLength(t *testing.T) {
Foo: []byte("fooooo"),
Bar: 4242,
}
e := &encoder.SszNetworkEncoder{UseSnappyCompression: false}
e := &encoder.SszNetworkEncoder{}
params.SetupTestConfigCleanup(t)
c := params.BeaconNetworkConfig()
maxChunkSize := uint64(5)
Expand Down
6 changes: 2 additions & 4 deletions beacon-chain/p2p/fork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
bootNode := bootListener.Self()
cfg := &Config{
Discv5BootStrapAddr: []string{bootNode.String()},
Encoding: "ssz",
UDPPort: uint(port),
}

Expand Down Expand Up @@ -132,7 +131,6 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
bootNode := bootListener.Self()
cfg := &Config{
Discv5BootStrapAddr: []string{bootNode.String()},
Encoding: "ssz",
UDPPort: uint(port),
}

Expand Down Expand Up @@ -250,7 +248,7 @@ func TestDiscv5_AddRetrieveForkEntryENR(t *testing.T) {
if err := os.Mkdir(tempPath, 0700); err != nil {
t.Fatal(err)
}
pkey, err := privKey(&Config{Encoding: "ssz", DataDir: tempPath})
pkey, err := privKey(&Config{DataDir: tempPath})
if err != nil {
t.Fatalf("Could not get private key: %v", err)
}
Expand Down Expand Up @@ -288,7 +286,7 @@ func TestAddForkEntry_Genesis(t *testing.T) {
if err := os.Mkdir(tempPath, 0700); err != nil {
t.Fatal(err)
}
pkey, err := privKey(&Config{Encoding: "ssz", DataDir: tempPath})
pkey, err := privKey(&Config{DataDir: tempPath})
if err != nil {
t.Fatalf("Could not get private key: %v", err)
}
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/p2p/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func TestPrivateKeyLoading(t *testing.T) {
log.WithField("file", file.Name()).WithField("key", out).Info("Wrote key to file")
cfg := &Config{
PrivateKey: file.Name(),
Encoding: "ssz",
}
pKey, err := privKey(cfg)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestService_Send(t *testing.T) {

svc := &Service{
host: p1.BHost,
cfg: &Config{Encoding: "ssz"},
cfg: &Config{},
}

msg := &testpb.TestSimpleMessage{
Expand All @@ -32,7 +32,7 @@ func TestService_Send(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)

p2.SetStreamHandler("/testing/1/ssz", func(stream network.Stream) {
p2.SetStreamHandler("/testing/1/ssz_snappy", func(stream network.Stream) {
rcvd := &testpb.TestSimpleMessage{}
if err := svc.Encoding().DecodeWithMaxLength(stream, rcvd); err != nil {
t.Fatal(err)
Expand Down
10 changes: 1 addition & 9 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,7 @@ func (s *Service) Started() bool {

// Encoding returns the configured networking encoding.
func (s *Service) Encoding() encoder.NetworkEncoding {
encoding := s.cfg.Encoding
switch encoding {
case encoder.SSZ:
return &encoder.SszNetworkEncoder{}
case encoder.SSZSnappy:
return &encoder.SszNetworkEncoder{UseSnappyCompression: true}
default:
panic("Invalid Network Encoding Flag Provided")
}
return &encoder.SszNetworkEncoder{}
Copy link
Contributor

@0xKiwi 0xKiwi Jul 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is UseSnappyCompression: true not needed here?

}

// PubSub returns the p2p pubsub framework.
Expand Down
6 changes: 2 additions & 4 deletions beacon-chain/p2p/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) {
hook := logTest.NewGlobal()

cfg := &Config{
TCPPort: 2000,
UDPPort: 2000,
Encoding: "ssz",
TCPPort: 2000,
UDPPort: 2000,
}
s, err := NewService(cfg)
if err != nil {
Expand Down Expand Up @@ -180,7 +179,6 @@ func TestListenForNewNodes(t *testing.T) {
cfg = &Config{
BootstrapNodeAddr: []string{bootNode.String()},
Discv5BootStrapAddr: []string{bootNode.String()},
Encoding: "ssz",
MaxPeers: 30,
}
for i := 1; i <= 5; i++ {
Expand Down
2 changes: 0 additions & 2 deletions beacon-chain/p2p/subnets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
cfg := &Config{
BootstrapNodeAddr: []string{bootNode.String()},
Discv5BootStrapAddr: []string{bootNode.String()},
Encoding: "ssz",
MaxPeers: 30,
UDPPort: uint(port),
}
Expand Down Expand Up @@ -74,7 +73,6 @@ func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
cfg := &Config{
BootstrapNodeAddr: []string{bootNode.String()},
Discv5BootStrapAddr: []string{bootNode.String()},
Encoding: "ssz",
MaxPeers: 30,
UDPPort: uint(port),
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/initial_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestMakeSequence(t *testing.T) {
// Connect peers with local host. This method sets up peer statuses and the appropriate handlers
// for each test peer.
func connectPeers(t *testing.T, host *p2pt.TestP2P, data []*peerData, peerStatus *peers.Status) {
const topic = "/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz"
const topic = "/eth2/beacon_chain/req/beacon_blocks_by_range/1/ssz_snappy"

for _, d := range data {
peer := p2pt.NewTestP2P(t)
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/pending_blocks_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestRegularSyncBeaconBlockSubscriber_ProcessPendingBlocks2(t *testing.T) {
if len(p1.BHost.Network().Peers()) != 1 {
t.Error("Expected peers to be connected")
}
pcl := protocol.ID("/eth2/beacon_chain/req/hello/1/ssz")
pcl := protocol.ID("/eth2/beacon_chain/req/hello/1/ssz_snappy")
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_beacon_blocks_by_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) {
}

// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz")
pcl := protocol.ID("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz_snappy")
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/rpc_goodbye_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestSendGoodbye_SendsMessage(t *testing.T) {
failureCode := codeClientShutdown

// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz")
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestSendGoodbye_DisconnectWithPeer(t *testing.T) {
failureCode := codeClientShutdown

// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz")
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_ping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestPingRPCHandler_SendsPing(t *testing.T) {
p2p: p2,
}
// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/ping/1/ssz")
pcl := protocol.ID("/eth2/beacon_chain/req/ping/1/ssz_snappy")
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
Expand Down
Loading