Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9516dcc
network: add VP message type for stateful vote compression
cce Oct 17, 2025
713bf1e
update for v37
cce Oct 17, 2025
c76eb31
Update network/wsPeer.go
cce Oct 21, 2025
875a70b
CR feedback
cce Oct 21, 2025
da1f1a0
CR feedback on config selecting VP table size
cce Oct 21, 2025
9e9af5a
fix tests
cce Oct 21, 2025
ff7aeb1
add README and fix config method
cce Oct 27, 2025
6675dc0
Merge remote-tracking branch 'upstream/master' into vpack-dynamic-net…
cce Oct 27, 2025
e07aa4f
fix errors.As check on *voteCompressionError
cce Oct 29, 2025
ff03541
configure dynamic vote compression for p2p wsPeers
cce Oct 29, 2025
97913eb
generalize vote tests to work for p2p or wsnetwork
cce Oct 29, 2025
c3e784b
Merge remote-tracking branch 'upstream/master' into vpack-dynamic-net…
cce Oct 29, 2025
45078d3
fix modernize errors
cce Oct 29, 2025
f158125
partitiontest
cce Oct 29, 2025
fdbc51a
fix asymmetric abort handling
cce Oct 30, 2025
51a319d
remove VP from default MOI tags
cce Oct 30, 2025
b0b340f
rename "dynamic" to "stateful"
cce Oct 30, 2025
3bf2291
go to a single bool flag to disable stateful compression
cce Oct 30, 2025
2b1b0fe
get rid of the term "dynamic" vote compression
cce Oct 30, 2025
268b2ba
Update config/localTemplate.go
cce Oct 30, 2025
cea7e20
some CR fixes
cce Oct 30, 2025
ae91abc
fix race in abort tests and add more cases
cce Oct 30, 2025
76aa200
require => assert
cce Oct 30, 2025
979cbb7
change require => assert
cce Oct 30, 2025
3d78c71
document decision to use stateful compression
cce Oct 30, 2025
83e0c7e
use switch in decodePeerFeatures
cce Oct 30, 2025
0b98ed3
fix lint
cce Oct 30, 2025
3e08fb9
tweak language in README
cce Oct 30, 2025
8da4a29
allow up to 2048 table size
cce Oct 30, 2025
b0ff3f2
regenerate config
cce Oct 30, 2025
f77ca3b
improved README
cce Oct 30, 2025
6e1eeae
last CR fixes
cce Nov 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,10 @@ func (l tLogger) Infof(fmts string, args ...interface{}) {
l.t.Logf(fmts, args...)
}

func (l tLogger) Warnf(fmts string, args ...interface{}) {
l.t.Logf(fmts, args...)
}

// TestEnsureAndResolveGenesisDirs confirms that paths provided in the config are resolved to absolute paths and are created if relevant
func TestEnsureAndResolveGenesisDirs(t *testing.T) {
partitiontest.PartitionTest(t)
Expand Down
42 changes: 42 additions & 0 deletions config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,15 @@ type Local struct {
// EnableVoteCompression controls whether vote compression is enabled for websocket networks
EnableVoteCompression bool `version[36]:"true"`

// StatefulVoteCompressionTableSize controls the size of the per-peer tables used for vote compression.
// If 0, stateful vote compression is disabled (but stateless vote compression will still be used if
// EnableVoteCompression is true). This value should be a power of 2 between 16 and 2048, inclusive.
// The per-peer overhead for stateful compression in one direction (from peer A => B) is 224 bytes times
// this value, plus 800 bytes of fixed overhead; it is twice that if votes are also being sent from B => A.
// So the default value of 2048 requires 459,552 bytes of memory per peer for stateful vote compression
// in one direction, or 919,104 bytes if both directions are used.
StatefulVoteCompressionTableSize uint `version[37]:"2048"`

// EnableBatchVerification controls whether ed25519 batch verification is enabled
EnableBatchVerification bool `version[37]:"true"`
}
Expand Down Expand Up @@ -871,6 +880,7 @@ func (cfg *Local) ResolveLogPaths(rootDir string) (liveLog, archive string) {

type logger interface {
Infof(format string, args ...interface{})
Warnf(format string, args ...interface{})
}

// EnsureAndResolveGenesisDirs will resolve the supplied config paths to absolute paths, and will create the genesis directories of each
Expand Down Expand Up @@ -1059,3 +1069,35 @@ func (cfg *Local) TracksCatchpoints() bool {
}
return false
}

// NormalizedVoteCompressionTableSize validates and normalizes the StatefulVoteCompressionTableSize config value.
// Supported values are powers of 2 in the range [16, 2048].
// Values >= 2048 clamp to 2048.
// Values 1-15 are below the minimum and return 0 (disabled).
// Values between supported powers of 2 round down to the nearest supported value.
// Logs a message if the configured value is adjusted.
// Returns the normalized size.
func (cfg Local) NormalizedVoteCompressionTableSize(log logger) uint {
configured := cfg.StatefulVoteCompressionTableSize
if configured == 0 {
return 0
}
if configured < 16 {
log.Warnf("StatefulVoteCompressionTableSize configured as %d is invalid (minimum 16). Stateful vote compression disabled.", configured)
return 0
}
// Round down to nearest power of 2 within supported range [16, 2048]
supportedSizes := []uint{2048, 1024, 512, 256, 128, 64, 32, 16}
for _, size := range supportedSizes {
if configured >= size {
if configured != size {
log.Infof("StatefulVoteCompressionTableSize configured as %d, using nearest supported value: %d", configured, size)
}
return size
}
}

// Should never reach here given the checks above
log.Warnf("StatefulVoteCompressionTableSize configured as %d is invalid. Stateful vote compression disabled.", configured)
return 0
}
1 change: 1 addition & 0 deletions config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ var defaultLocal = Local{
RestReadTimeoutSeconds: 15,
RestWriteTimeoutSeconds: 120,
RunHosted: false,
StatefulVoteCompressionTableSize: 2048,
StateproofDir: "",
StorageEngine: "sqlite",
SuggestedFeeBlockHistory: 3,
Expand Down
1 change: 1 addition & 0 deletions installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
"RestReadTimeoutSeconds": 15,
"RestWriteTimeoutSeconds": 120,
"RunHosted": false,
"StatefulVoteCompressionTableSize": 2048,
"StateproofDir": "",
"StorageEngine": "sqlite",
"SuggestedFeeBlockHistory": 3,
Expand Down
9 changes: 9 additions & 0 deletions network/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ var networkP2PGossipSubReceivedBytesTotal = metrics.MakeCounter(metrics.MetricNa

// var networkP2PGossipSubSentMsgs = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_message_sent", Description: "Number of complete messages that were sent to the network through gossipsub"})

var networkVoteBroadcastCompressedBytes = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vote_compressed_bytes_broadcast_total", Description: "Total AV message bytes broadcast after applying stateless compression"})
var networkVoteBroadcastUncompressedBytes = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vote_uncompressed_bytes_broadcast_total", Description: "Total AV message bytes broadcast before applying stateless compression"})
var networkVPCompressionErrors = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vpack_compression_errors_total", Description: "Total number of stateful vote compression errors"})
var networkVPDecompressionErrors = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vpack_decompression_errors_total", Description: "Total number of stateful vote decompression errors"})
var networkVPAbortMessagesSent = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vpack_abort_messages_sent_total", Description: "Total number of vpack abort messages sent to peers"})
var networkVPAbortMessagesReceived = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vpack_abort_messages_received_total", Description: "Total number of vpack abort messages received from peers"})
var networkVPCompressedBytesSent = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vpack_compressed_bytes_sent_total", Description: "Total VP message bytes sent, after compressing AV to VP messages"})
var networkVPUncompressedBytesSent = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_vpack_uncompressed_bytes_sent_total", Description: "Total VP message bytes sent, before compressing AV to VP messages"})

var _ = pubsub.RawTracer(pubsubMetricsTracer{})

// pubsubMetricsTracer is a tracer for pubsub events used to track metrics.
Expand Down
162 changes: 152 additions & 10 deletions network/msgCompressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"io"
"sync/atomic"

"github.com/DataDog/zstd"

Expand All @@ -32,6 +33,19 @@ var zstdCompressionMagic = [4]byte{0x28, 0xb5, 0x2f, 0xfd}

const zstdCompressionLevel = zstd.BestSpeed

// voteCompressionAbortMessage is a single-byte payload sent with a VP tag to signal
// that stateful compression should be disabled for this connection.
// When either encoder or decoder encounters an error, it sends VP+0xFF to notify
// the peer, then both sides disable stateful compression and fall back to AV messages.
const voteCompressionAbortMessage byte = 0xFF

// voteCompressionError wraps errors from stateful vote compression/decompression.
// This error type signals that an abort message should be sent to the peer.
type voteCompressionError struct{ err error }

func (e *voteCompressionError) Error() string { return e.err.Error() }
func (e *voteCompressionError) Unwrap() error { return e.err }

// zstdCompressMsg returns a concatenation of a tag and compressed data
func zstdCompressMsg(tbytes []byte, d []byte) ([]byte, string) {
bound := max(zstd.CompressBound(len(d)),
Expand Down Expand Up @@ -74,16 +88,30 @@ func vpackCompressVote(tbytes []byte, d []byte) ([]byte, string) {
// and should be larger.
const MaxDecompressedMessageSize = 20 * 1024 * 1024 // some large enough value

// wsPeerMsgDataDecoder performs optional incoming messages conversion.
// At the moment it only supports zstd decompression for payload proposal,
// and vpack decompression for votes.
type wsPeerMsgDataDecoder struct {
// wsPeerMsgCodec performs optional message compression/decompression for certain
// types of messages. It handles:
// - zstd compression for PP proposals (outgoing not implemented)
// - stateless vpack compression for AV votes (outgoing not implemented)
// - stateful vpack compression for VP votes (both directions)
type wsPeerMsgCodec struct {
log logging.Logger
origin string

// actual converter(s)
// decompressors
ppdec zstdProposalDecompressor
avdec vpackVoteDecompressor

// stateful vote compression (if enabled).
// If either side encounters an error, or if we receive an abort, we disable
// stateful compression entirely and fall back to stateless AV traffic.
statefulVoteEnabled atomic.Bool
statefulVoteTableSize uint
statefulVoteEnc *vpack.StatefulEncoder
statefulVoteDec *vpack.StatefulDecoder
}

func (c *wsPeerMsgCodec) switchOffStatefulVoteCompression() {
c.statefulVoteEnabled.Store(false)
}

type zstdProposalDecompressor struct{}
Expand Down Expand Up @@ -124,8 +152,58 @@ func (dec zstdProposalDecompressor) convert(data []byte) ([]byte, error) {
}
}

func (c *wsPeerMsgDataDecoder) convert(tag protocol.Tag, data []byte) ([]byte, error) {
if tag == protocol.ProposalPayloadTag {
// compress attempts to compress an outgoing message.
// Currently only supports stateful vote compression.
// Returns compressed data and nil error if compression succeeds,
// (nil, nil) if compression is not applicable,
// (nil, vpError) if stateful compression fails (caller should send abort message).
func (c *wsPeerMsgCodec) compress(tag protocol.Tag, data []byte) ([]byte, error) {
if tag == protocol.AgreementVoteTag && c.statefulVoteEnabled.Load() {
// Skip the tag bytes (first 2 bytes are the AV tag)
if len(data) < 2 {
return nil, nil
}
// Input data is AV+stateless-compressed from broadcast
// We only need to apply stateful compression on top
statelessCompressed := data[2:]

// initialize stateful encoder on first use
if c.statefulVoteEnc == nil {
enc, err := vpack.NewStatefulEncoder(c.statefulVoteTableSize)
if err != nil {
c.log.Warnf("failed to initialize stateful vote encoder for peer %s, disabling: %v", c.origin, err)
networkVPCompressionErrors.Inc(nil)
c.switchOffStatefulVoteCompression()
return nil, &voteCompressionError{err: err}
}
c.statefulVoteEnc = enc
c.log.Debugf("stateful vote encoder initialized for peer %s (table size %d)", c.origin, c.statefulVoteTableSize)
}

tagLen := len(protocol.VotePackedTag)
result := make([]byte, tagLen+vpack.MaxCompressedVoteSize)
copy(result, protocol.VotePackedTag)
// apply stateful compression to stateless-compressed data
compressed, err := c.statefulVoteEnc.Compress(result[tagLen:], statelessCompressed)
if err != nil {
c.log.Warnf("stateful vote compression failed for peer %s, disabling: %v", c.origin, err)
networkVPCompressionErrors.Inc(nil)
c.switchOffStatefulVoteCompression()
return nil, &voteCompressionError{err: err}
}
finalResult := result[:tagLen+len(compressed)]
// Track stateful compression layer only: stateless-compressed input → VP output
networkVPUncompressedBytesSent.AddUint64(uint64(len(statelessCompressed)), nil)
networkVPCompressedBytesSent.AddUint64(uint64(len(compressed)), nil)
return finalResult, nil
}
return nil, nil
}

// decompress handles incoming message decompression based on tag type
func (c *wsPeerMsgCodec) decompress(tag protocol.Tag, data []byte) ([]byte, error) {
switch tag {
case protocol.ProposalPayloadTag:
// sender might support compressed payload but fail to compress for whatever reason,
// in this case it sends non-compressed payload - the receiver decompress only if it is compressed.
if c.ppdec.accept(data) {
Expand All @@ -136,7 +214,8 @@ func (c *wsPeerMsgDataDecoder) convert(tag protocol.Tag, data []byte) ([]byte, e
return res, nil
}
c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin)
} else if tag == protocol.AgreementVoteTag {

case protocol.AgreementVoteTag:
if c.avdec.enabled {
res, err := c.avdec.convert(data)
if err != nil {
Expand All @@ -146,12 +225,59 @@ func (c *wsPeerMsgDataDecoder) convert(tag protocol.Tag, data []byte) ([]byte, e
}
return res, nil
}

case protocol.VotePackedTag:
// Check for abort message first
if len(data) == 1 && data[0] == voteCompressionAbortMessage {
c.log.Infof("Received VP abort message from peer %s, disabling stateful encoding", c.origin)
networkVPAbortMessagesReceived.Inc(nil)
// Peer signalled stateful compression should stop; disable both encode and decode paths.
c.switchOffStatefulVoteCompression()
// Drop this message silently (it's just a control signal)
return nil, nil
}

if !c.statefulVoteEnabled.Load() {
c.log.Debugf("dropping VP message from %s: stateful decompression disabled", c.origin)
return nil, nil
}
if c.statefulVoteDec == nil {
dec, err := vpack.NewStatefulDecoder(c.statefulVoteTableSize)
if err != nil {
c.log.Warnf("failed to initialize stateful vote decoder for peer %s, disabling: %v", c.origin, err)
networkVPDecompressionErrors.Inc(nil)
c.switchOffStatefulVoteCompression()
return nil, &voteCompressionError{err: err}
}
c.statefulVoteDec = dec
c.log.Debugf("stateful vote decoder initialized for peer %s (table size %d)", c.origin, c.statefulVoteTableSize)
}
// StatefulDecoder decompresses to "stateless-compressed" format
statelessCompressed, err := c.statefulVoteDec.Decompress(make([]byte, 0, vpack.MaxCompressedVoteSize), data)
if err != nil {
c.log.Warnf("stateful vote decompression failed for peer %s, disabling: %v", c.origin, err)
networkVPDecompressionErrors.Inc(nil)
c.switchOffStatefulVoteCompression()
return nil, &voteCompressionError{err: err}
}

var statelessDec vpack.StatelessDecoder
voteBody, err := statelessDec.DecompressVote(make([]byte, 0, vpack.MaxMsgpackVoteSize), statelessCompressed)
if err != nil {
c.log.Warnf("stateless vote decompression failed after stateful for peer %s, disabling: %v", c.origin, err)
networkVPDecompressionErrors.Inc(nil)
c.switchOffStatefulVoteCompression()
return nil, &voteCompressionError{err: err}
}

return voteBody, nil
}

return data, nil
}

func makeWsPeerMsgDataDecoder(wp *wsPeer) *wsPeerMsgDataDecoder {
c := wsPeerMsgDataDecoder{
func makeWsPeerMsgCodec(wp *wsPeer) *wsPeerMsgCodec {
c := wsPeerMsgCodec{
log: wp.log,
origin: wp.originAddress,
}
Expand All @@ -164,5 +290,21 @@ func makeWsPeerMsgDataDecoder(wp *wsPeer) *wsPeerMsgDataDecoder {
dec: vpack.NewStatelessDecoder(),
}
}

// Initialize stateful compression negotiation details if both nodes support it
// Stateful compression requires stateless compression to be available since VP messages
// decompress in two stages: VP → stateless-compressed → raw vote
if wp.enableVoteCompression && // this node's configuration allows vote compression
wp.voteCompressionTableSize > 0 && // this node's configuration allows stateful vote compression
wp.vpackVoteCompressionSupported() && // the other side has advertised vote compression
wp.vpackStatefulCompressionSupported() { // the other side has advertised stateful vote compression
tableSize := wp.getBestVpackTableSize()
if tableSize > 0 {
c.statefulVoteEnabled.Store(true)
c.statefulVoteTableSize = tableSize
wp.log.Debugf("Stateful compression negotiated with table size %d (our max: %d)", tableSize, wp.voteCompressionTableSize)
}
}

return &c
}
Loading