Skip to content

Commit

Permalink
network: remove ws net proto 2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy committed Jul 25, 2024
1 parent 47fd1c9 commit 1695ce7
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 167 deletions.
49 changes: 13 additions & 36 deletions network/msgCompressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,16 @@ var zstdCompressionMagic = [4]byte{0x28, 0xb5, 0x2f, 0xfd}

const zstdCompressionLevel = zstd.BestSpeed

// checkCanCompress checks if there is an proposal payload message and peers supporting compression
func checkCanCompress(request broadcastRequest, peers []*wsPeer) bool {
canCompress := false
// checkCompressible checks if there is an proposal payload message
func checkCompressible(request broadcastRequest) bool {
hasPP := false
for _, tag := range request.tags {
if tag == protocol.ProposalPayloadTag {
hasPP = true
break
}
}
// if have proposal payload check if there are any peers supporting compression
if hasPP {
for _, peer := range peers {
if peer.pfProposalCompressionSupported() {
canCompress = true
break
}
}
}
return canCompress
return hasPP
}

// zstdCompressMsg returns a concatenation of a tag and compressed data
Expand Down Expand Up @@ -89,13 +79,7 @@ type wsPeerMsgDataConverter struct {
ppdec zstdProposalDecompressor
}

type zstdProposalDecompressor struct {
active bool
}

func (dec zstdProposalDecompressor) enabled() bool {
return dec.active
}
type zstdProposalDecompressor struct{}

func (dec zstdProposalDecompressor) accept(data []byte) bool {
return len(data) > 4 && bytes.Equal(data[:4], zstdCompressionMagic[:])
Expand Down Expand Up @@ -126,18 +110,16 @@ func (dec zstdProposalDecompressor) convert(data []byte) ([]byte, error) {

func (c *wsPeerMsgDataConverter) convert(tag protocol.Tag, data []byte) ([]byte, error) {
if tag == protocol.ProposalPayloadTag {
if c.ppdec.enabled() {
// sender might support compressed payload but fail to compress for whatever reason,
// in this case it sends non-compressed payload - the receiver decompress only if it is compressed.
if c.ppdec.accept(data) {
res, err := c.ppdec.convert(data)
if err != nil {
return nil, fmt.Errorf("peer %s: %w", c.origin, err)
}
return res, nil
// sender might support compressed payload but fail to compress for whatever reason,
// in this case it sends non-compressed payload - the receiver decompress only if it is compressed.
if c.ppdec.accept(data) {
res, err := c.ppdec.convert(data)
if err != nil {
return nil, fmt.Errorf("peer %s: %w", c.origin, err)

Check warning on line 118 in network/msgCompressor.go

View check run for this annotation

Codecov / codecov/patch

network/msgCompressor.go#L118

Added line #L118 was not covered by tests
}
c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin)
return res, nil
}
c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin)
}
return data, nil
}
Expand All @@ -148,11 +130,6 @@ func makeWsPeerMsgDataConverter(wp *wsPeer) *wsPeerMsgDataConverter {
origin: wp.originAddress,
}

if wp.pfProposalCompressionSupported() {
c.ppdec = zstdProposalDecompressor{
active: true,
}
}

c.ppdec = zstdProposalDecompressor{}
return &c
}
29 changes: 5 additions & 24 deletions network/msgCompressor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,15 @@ func TestCheckCanCompress(t *testing.T) {
partitiontest.PartitionTest(t)

req := broadcastRequest{}
peers := []*wsPeer{}
r := checkCanCompress(req, peers)
r := checkCompressible(req)
require.False(t, r)

req.tags = []protocol.Tag{protocol.AgreementVoteTag}
r = checkCanCompress(req, peers)
r = checkCompressible(req)
require.False(t, r)

req.tags = []protocol.Tag{protocol.AgreementVoteTag, protocol.ProposalPayloadTag}
r = checkCanCompress(req, peers)
require.False(t, r)

peer1 := wsPeer{
features: 0,
}
peers = []*wsPeer{&peer1}
r = checkCanCompress(req, peers)
require.False(t, r)

peer2 := wsPeer{
features: pfCompressedProposal,
}
peers = []*wsPeer{&peer1, &peer2}
r = checkCanCompress(req, peers)
r = checkCompressible(req)
require.True(t, r)
}

Expand Down Expand Up @@ -108,7 +93,7 @@ func TestWsPeerMsgDataConverterConvert(t *testing.T) {
partitiontest.PartitionTest(t)

c := wsPeerMsgDataConverter{}
c.ppdec = zstdProposalDecompressor{active: false}
c.ppdec = zstdProposalDecompressor{}
tag := protocol.AgreementVoteTag
data := []byte("data")

Expand All @@ -117,13 +102,9 @@ func TestWsPeerMsgDataConverterConvert(t *testing.T) {
require.Equal(t, data, r)

tag = protocol.ProposalPayloadTag
r, err = c.convert(tag, data)
require.NoError(t, err)
require.Equal(t, data, r)

l := converterTestLogger{}
c.log = &l
c.ppdec = zstdProposalDecompressor{active: true}
c.ppdec = zstdProposalDecompressor{}
r, err = c.convert(tag, data)
require.NoError(t, err)
require.Equal(t, data, r)
Expand Down
67 changes: 13 additions & 54 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", De
var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."})
var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."})

var networkPrioBatchesPPWithCompression = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_batches_wpp_comp_sent_total", Description: "number of prio compressed batches with PP"})
var networkPrioBatchesPPWithoutCompression = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_pp_prio_batches_wpp_non_comp_sent_total", Description: "number of prio non-compressed batches with PP"})
var networkPrioPPCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_compressed_size_total", Description: "cumulative size of all compressed PP"})
var networkPrioPPNonCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_non_compressed_size_total", Description: "cumulative size of all non-compressed PP"})

// peerDisconnectionAckDuration defines the time we would wait for the peer disconnection to complete.
const peerDisconnectionAckDuration = 5 * time.Second

Expand Down Expand Up @@ -1057,7 +1052,8 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt
wn.setHeaders(responseHeader)
responseHeader.Set(ProtocolVersionHeader, matchingVersion)
responseHeader.Set(GenesisHeader, wn.GenesisID)
responseHeader.Set(PeerFeaturesHeader, PeerFeatureProposalCompression)
// set the features we support, for example
// responseHeader.Set(PeerFeaturesHeader, "ppzstd")
var challenge string
if wn.prioScheme != nil {
challenge = wn.prioScheme.NewPrioChallenge()
Expand Down Expand Up @@ -1387,20 +1383,15 @@ func (wn *WebsocketNetwork) getPeersChangeCounter() int32 {

// preparePeerData prepares batches of data for sending.
// It performs optional zstd compression for proposal massages
func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, bool) {
func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool) ([][]byte, []crypto.Digest) {
// determine if there is a payload proposal and peers supporting compressed payloads
wantCompression := false
containsPrioPPTag := false
shouldCompress := false
if prio {
wantCompression = checkCanCompress(request, peers)
shouldCompress = checkCompressible(request)
}

digests := make([]crypto.Digest, len(request.data))
data := make([][]byte, len(request.data))
var dataCompressed [][]byte
if wantCompression {
dataCompressed = make([][]byte, len(request.data))
}
for i, d := range request.data {
tbytes := []byte(request.tags[i])
mbytes := make([]byte, len(tbytes)+len(d))
Expand All @@ -1411,29 +1402,17 @@ func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool, p
digests[i] = crypto.Hash(mbytes)
}

if prio {
if request.tags[i] == protocol.ProposalPayloadTag {
networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil)
containsPrioPPTag = true
}
}

if wantCompression {
if shouldCompress {
if request.tags[i] == protocol.ProposalPayloadTag {
compressed, logMsg := zstdCompressMsg(tbytes, d)
if len(logMsg) > 0 {
wn.log.Warn(logMsg)
} else {
networkPrioPPCompressedSize.AddUint64(uint64(len(compressed)), nil)
}
dataCompressed[i] = compressed
} else {
// otherwise reuse non-compressed from above
dataCompressed[i] = mbytes
data[i] = compressed
}
}
}
return data, dataCompressed, digests, containsPrioPPTag
return data, digests
}

// prio is set if the broadcast is a high-priority broadcast.
Expand All @@ -1450,7 +1429,7 @@ func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, pe
}

start := time.Now()
data, dataWithCompression, digests, containsPrioPPTag := wn.preparePeerData(request, prio, peers)
data, digests := wn.preparePeerData(request, prio)

// first send to all the easy outbound peers who don't block, get them started.
sentMessageCount := 0
Expand All @@ -1461,23 +1440,7 @@ func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, pe
if peer == request.except {
continue
}
var ok bool
if peer.pfProposalCompressionSupported() && len(dataWithCompression) > 0 {
// if this peer supports compressed proposals and compressed data batch is filled out, use it
ok = peer.writeNonBlockMsgs(request.ctx, dataWithCompression, prio, digests, request.enqueueTime)
if prio {
if containsPrioPPTag {
networkPrioBatchesPPWithCompression.Inc(nil)
}
}
} else {
ok = peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
if prio {
if containsPrioPPTag {
networkPrioBatchesPPWithoutCompression.Inc(nil)
}
}
}
ok := peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
if ok {
sentMessageCount++
continue
Expand Down Expand Up @@ -1946,7 +1909,7 @@ const ProtocolVersionHeader = "X-Algorand-Version"
const ProtocolAcceptVersionHeader = "X-Algorand-Accept-Version"

// SupportedProtocolVersions contains the list of supported protocol versions by this node ( in order of preference ).
var SupportedProtocolVersions = []string{"2.2", "2.1"}
var SupportedProtocolVersions = []string{"2.2"}

// ProtocolVersion is the current version attached to the ProtocolVersionHeader header
/* Version history:
Expand Down Expand Up @@ -1986,10 +1949,6 @@ const UserAgentHeader = "User-Agent"
// PeerFeaturesHeader is the HTTP header listing features
const PeerFeaturesHeader = "X-Algorand-Peer-Features"

// PeerFeatureProposalCompression is a value for PeerFeaturesHeader indicating peer
// supports proposal payload compression with zstd
const PeerFeatureProposalCompression = "ppzstd"

var websocketsScheme = map[string]string{"http": "ws", "https": "wss"}

var errBadAddr = errors.New("bad address")
Expand Down Expand Up @@ -2111,8 +2070,8 @@ func (wn *WebsocketNetwork) tryConnect(netAddr, gossipAddr string) {

// for backward compatibility, include the ProtocolVersion header as well.
requestHeader.Set(ProtocolVersionHeader, wn.protocolVersion)
// set the features header (comma-separated list)
requestHeader.Set(PeerFeaturesHeader, PeerFeatureProposalCompression)
// set the features header (comma-separated list), for example
requestHeader.Set(PeerFeaturesHeader, "ppzstd")
SetUserAgentHeader(requestHeader)
myInstanceName := wn.log.GetInstanceName()
requestHeader.Set(InstanceNameHeader, myInstanceName)
Expand Down
Loading

0 comments on commit 1695ce7

Please sign in to comment.