Skip to content

Commit 64ae8b0

Browse files
authored
network: enable vote compression for P2PNetwork (#6331)
1 parent e01b9e7 commit 64ae8b0

File tree

3 files changed

+126
-8
lines changed

3 files changed

+126
-8
lines changed

network/p2pNetwork.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ func NewP2PNetwork(log logging.Logger, cfg config.Local, datadir string, phonebo
259259
config: cfg,
260260
broadcastQueueHighPrio: make(chan broadcastRequest, outgoingMessagesBufferSize),
261261
broadcastQueueBulk: make(chan broadcastRequest, 100),
262+
enableVoteCompression: cfg.EnableVoteCompression,
262263
}
263264

264265
if identityOpts != nil {
@@ -916,14 +917,15 @@ func (n *P2PNetwork) baseWsStreamHandler(ctx context.Context, p2pPeer peer.ID, s
916917
}
917918
peerCore := makePeerCore(ctx, n, n.log, n.handler.readBuffer, addr, client, addr)
918919
wsp := &wsPeer{
919-
wsPeerCore: peerCore,
920-
conn: &wsPeerConnP2P{stream: stream},
921-
outgoing: !incoming,
922-
identity: netIdentPeerID,
923-
peerType: peerTypeP2P,
924-
TelemetryGUID: pmi.telemetryID,
925-
InstanceName: pmi.instanceName,
926-
features: decodePeerFeatures(pmi.version, pmi.features),
920+
wsPeerCore: peerCore,
921+
conn: &wsPeerConnP2P{stream: stream},
922+
outgoing: !incoming,
923+
identity: netIdentPeerID,
924+
peerType: peerTypeP2P,
925+
TelemetryGUID: pmi.telemetryID,
926+
InstanceName: pmi.instanceName,
927+
features: decodePeerFeatures(pmi.version, pmi.features),
928+
enableVoteCompression: n.config.EnableVoteCompression,
927929
}
928930

929931
localAddr, has := n.Address()

network/p2pNetwork_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1591,3 +1591,106 @@ func TestP2PMetainfoV1vsV22(t *testing.T) {
15911591
require.False(t, peer.features&pfCompressedProposal != 0)
15921592
require.False(t, peer.vpackVoteCompressionSupported())
15931593
}
1594+
1595+
// TestP2PVoteCompression tests vote compression feature in P2P network
1596+
func TestP2PVoteCompression(t *testing.T) {
1597+
partitiontest.PartitionTest(t)
1598+
1599+
type testDef struct {
1600+
netAEnableCompression, netBEnableCompression bool
1601+
}
1602+
1603+
var tests []testDef = []testDef{
1604+
{true, true}, // both nodes with compression enabled
1605+
{true, false}, // node A with compression, node B without
1606+
{false, true}, // node A without compression, node B with compression
1607+
{false, false}, // both nodes with compression disabled
1608+
}
1609+
1610+
for _, test := range tests {
1611+
t.Run(fmt.Sprintf("A_compression_%v+B_compression_%v", test.netAEnableCompression, test.netBEnableCompression), func(t *testing.T) {
1612+
cfg := config.GetDefaultLocal()
1613+
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses
1614+
cfg.NetAddress = "127.0.0.1:0"
1615+
cfg.GossipFanout = 1
1616+
cfg.EnableVoteCompression = test.netAEnableCompression
1617+
log := logging.TestingLog(t)
1618+
netA, err := NewP2PNetwork(log.With("name", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
1619+
require.NoError(t, err)
1620+
err = netA.Start()
1621+
require.NoError(t, err)
1622+
defer netA.Stop()
1623+
1624+
peerInfoA := netA.service.AddrInfo()
1625+
addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA)
1626+
require.NoError(t, err)
1627+
require.NotZero(t, addrsA[0])
1628+
1629+
cfgB := cfg
1630+
cfgB.EnableVoteCompression = test.netBEnableCompression
1631+
cfgB.NetAddress = ""
1632+
multiAddrStr := addrsA[0].String()
1633+
phoneBookAddresses := []string{multiAddrStr}
1634+
netB, err := NewP2PNetwork(log.With("name", "netB"), cfgB, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
1635+
require.NoError(t, err)
1636+
err = netB.Start()
1637+
require.NoError(t, err)
1638+
defer netB.Stop()
1639+
1640+
// ps is empty, so this is a valid vote
1641+
vote1 := map[string]any{
1642+
"cred": map[string]any{"pf": algocrypto.VrfProof{1}},
1643+
"r": map[string]any{"rnd": uint64(2), "snd": [32]byte{3}},
1644+
"sig": map[string]any{
1645+
"p": [32]byte{4}, "p1s": [64]byte{5}, "p2": [32]byte{6},
1646+
"p2s": [64]byte{7}, "ps": [64]byte{}, "s": [64]byte{9},
1647+
},
1648+
}
1649+
// ps is not empty: vpack compression will fail, but it will still be sent through
1650+
vote2 := map[string]any{
1651+
"cred": map[string]any{"pf": algocrypto.VrfProof{10}},
1652+
"r": map[string]any{"rnd": uint64(11), "snd": [32]byte{12}},
1653+
"sig": map[string]any{
1654+
"p": [32]byte{13}, "p1s": [64]byte{14}, "p2": [32]byte{15},
1655+
"p2s": [64]byte{16}, "ps": [64]byte{17}, "s": [64]byte{18},
1656+
},
1657+
}
1658+
// Send a totally invalid message to ensure that it goes through. Even though vpack compression
1659+
// and decompression will fail, the message should still go through (as an intended fallback).
1660+
vote3 := []byte("hello")
1661+
messages := [][]byte{protocol.EncodeReflect(vote1), protocol.EncodeReflect(vote2), vote3}
1662+
matcher := newMessageMatcher(t, messages)
1663+
counterDone := matcher.done
1664+
netB.RegisterHandlers([]TaggedMessageHandler{{Tag: protocol.AgreementVoteTag, MessageHandler: matcher}})
1665+
1666+
// Wait for peers to connect
1667+
require.Eventually(t, func() bool {
1668+
return len(netA.service.Conns()) > 0 && len(netB.service.Conns()) > 0
1669+
}, 2*time.Second, 50*time.Millisecond)
1670+
1671+
for _, msg := range messages {
1672+
netA.Broadcast(context.Background(), protocol.AgreementVoteTag, msg, true, nil)
1673+
}
1674+
1675+
select {
1676+
case <-counterDone:
1677+
case <-time.After(2 * time.Second):
1678+
t.Errorf("timeout, count=%d, wanted %d", len(matcher.received), len(messages))
1679+
}
1680+
1681+
require.True(t, matcher.Match())
1682+
1683+
// Verify compression feature is correctly reflected in peer properties
1684+
// Check peers have the correct compression capability
1685+
peers := netA.GetPeers(PeersConnectedIn)
1686+
require.Len(t, peers, 1)
1687+
peer := peers[0].(*wsPeer)
1688+
require.Equal(t, test.netBEnableCompression, peer.vpackVoteCompressionSupported())
1689+
1690+
peers = netB.GetPeers(PeersConnectedOut)
1691+
require.Len(t, peers, 1)
1692+
peer = peers[0].(*wsPeer)
1693+
require.Equal(t, test.netAEnableCompression, peer.vpackVoteCompressionSupported())
1694+
})
1695+
}
1696+
}

network/wsNetwork_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,19 @@ func TestWebsocketVoteCompression(t *testing.T) {
569569
}
570570

571571
require.True(t, matcher.Match())
572+
573+
// Verify compression feature is correctly reflected in peer properties
574+
// Check peers have the correct compression capability
575+
peers := netA.GetPeers(PeersConnectedIn)
576+
require.Len(t, peers, 1)
577+
peer := peers[0].(*wsPeer)
578+
require.Equal(t, test.netBEnableCompression, peer.vpackVoteCompressionSupported())
579+
580+
peers = netB.GetPeers(PeersConnectedOut)
581+
require.Len(t, peers, 1)
582+
peer = peers[0].(*wsPeer)
583+
require.Equal(t, test.netAEnableCompression, peer.vpackVoteCompressionSupported())
584+
572585
})
573586
}
574587
}

0 commit comments

Comments
 (0)