Skip to content

Commit 9e9af5a

Browse files
committed
fix tests
1 parent da1f1a0 commit 9e9af5a

File tree

4 files changed

+36
-34
lines changed

4 files changed

+36
-34
lines changed

network/msgCompressor.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"bytes"
2121
"fmt"
2222
"io"
23+
"sync/atomic"
2324

2425
"github.com/DataDog/zstd"
2526

@@ -105,8 +106,8 @@ type wsPeerMsgCodec struct {
105106
// When encoder fails, we send abort message and disable encoding; peer will switch to AV.
106107
// When decoder fails, we send abort message and disable decoding; we'll receive AV from peer.
107108
// When we receive abort message from peer, we disable encoding (they can't decode our VP).
108-
statefulVoteEncEnabled bool
109-
statefulVoteDecEnabled bool
109+
statefulVoteEncEnabled atomic.Bool
110+
statefulVoteDecEnabled atomic.Bool
110111
statefulVoteTableSize uint
111112
statefulVoteEnc *vpack.StatefulEncoder
112113
statefulVoteDec *vpack.StatefulDecoder
@@ -156,7 +157,7 @@ func (dec zstdProposalDecompressor) convert(data []byte) ([]byte, error) {
156157
// (nil, nil) if compression is not applicable,
157158
// (nil, vpError) if stateful compression fails (caller should send abort message).
158159
func (c *wsPeerMsgCodec) compress(tag protocol.Tag, data []byte) ([]byte, error) {
159-
if tag == protocol.AgreementVoteTag && c.statefulVoteEncEnabled {
160+
if tag == protocol.AgreementVoteTag && c.statefulVoteEncEnabled.Load() {
160161
// Skip the tag bytes (first 2 bytes are the AV tag)
161162
if len(data) < 2 {
162163
return nil, nil
@@ -171,7 +172,7 @@ func (c *wsPeerMsgCodec) compress(tag protocol.Tag, data []byte) ([]byte, error)
171172
if err != nil {
172173
c.log.Warnf("failed to initialize stateful vote encoder for peer %s, disabling: %v", c.origin, err)
173174
networkVPCompressionErrors.Inc(nil)
174-
c.statefulVoteEncEnabled = false
175+
c.statefulVoteEncEnabled.Store(false)
175176
return nil, &voteCompressionError{err: err}
176177
}
177178
c.statefulVoteEnc = enc
@@ -186,7 +187,7 @@ func (c *wsPeerMsgCodec) compress(tag protocol.Tag, data []byte) ([]byte, error)
186187
if err != nil {
187188
c.log.Warnf("stateful vote compression failed for peer %s, disabling: %v", c.origin, err)
188189
networkVPCompressionErrors.Inc(nil)
189-
c.statefulVoteEncEnabled = false
190+
c.statefulVoteEncEnabled.Store(false)
190191
return nil, &voteCompressionError{err: err}
191192
}
192193
finalResult := result[:tagLen+len(compressed)]
@@ -230,20 +231,20 @@ func (c *wsPeerMsgCodec) decompress(tag protocol.Tag, data []byte) ([]byte, erro
230231
c.log.Infof("Received VP abort message from peer %s, disabling stateful encoding", c.origin)
231232
networkVPAbortMessagesReceived.Inc(nil)
232233
// Peer is telling us they can't decode our VP messages, so stop encoding
233-
c.statefulVoteEncEnabled = false
234+
c.statefulVoteEncEnabled.Store(false)
234235
// Drop this message silently (it's just a control signal)
235236
return nil, nil
236237
}
237238

238-
if !c.statefulVoteDecEnabled {
239+
if !c.statefulVoteDecEnabled.Load() {
239240
return nil, fmt.Errorf("received VP message but stateful decompression not enabled")
240241
}
241242
if c.statefulVoteDec == nil {
242243
dec, err := vpack.NewStatefulDecoder(c.statefulVoteTableSize)
243244
if err != nil {
244245
c.log.Warnf("failed to initialize stateful vote decoder for peer %s, disabling: %v", c.origin, err)
245246
networkVPDecompressionErrors.Inc(nil)
246-
c.statefulVoteDecEnabled = false
247+
c.statefulVoteDecEnabled.Store(false)
247248
return nil, &voteCompressionError{err: err}
248249
}
249250
c.statefulVoteDec = dec
@@ -254,7 +255,7 @@ func (c *wsPeerMsgCodec) decompress(tag protocol.Tag, data []byte) ([]byte, erro
254255
if err != nil {
255256
c.log.Warnf("stateful vote decompression failed for peer %s, disabling: %v", c.origin, err)
256257
networkVPDecompressionErrors.Inc(nil)
257-
c.statefulVoteDecEnabled = false
258+
c.statefulVoteDecEnabled.Store(false)
258259
return nil, &voteCompressionError{err: err}
259260
}
260261

@@ -263,7 +264,7 @@ func (c *wsPeerMsgCodec) decompress(tag protocol.Tag, data []byte) ([]byte, erro
263264
if err != nil {
264265
c.log.Warnf("stateless vote decompression failed after stateful for peer %s, disabling: %v", c.origin, err)
265266
networkVPDecompressionErrors.Inc(nil)
266-
c.statefulVoteDecEnabled = false
267+
c.statefulVoteDecEnabled.Store(false)
267268
return nil, &voteCompressionError{err: err}
268269
}
269270

@@ -295,8 +296,8 @@ func makeWsPeerMsgCodec(wp *wsPeer) *wsPeerMsgCodec {
295296
wp.vpackVoteCompressionSupported() && wp.vpackDynamicCompressionSupported() {
296297
tableSize := wp.getBestVpackTableSize()
297298
if tableSize > 0 {
298-
c.statefulVoteEncEnabled = true
299-
c.statefulVoteDecEnabled = true
299+
c.statefulVoteEncEnabled.Store(true)
300+
c.statefulVoteDecEnabled.Store(true)
300301
c.statefulVoteTableSize = tableSize
301302
wp.log.Debugf("Stateful compression negotiated with table size %d (our max: %d)", tableSize, wp.voteCompressionDynamicTableSize)
302303
}

network/msgCompressor_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,9 @@ func TestMakeWsPeerMsgCodec_StatefulRequiresStateless(t *testing.T) {
125125

126126
// Stateful should not be enabled even though dynamic features are advertised
127127
// because stateful requires stateless to work (VP → stateless → raw)
128-
require.False(t, codec.statefulVoteEncEnabled,
128+
require.False(t, codec.statefulVoteEncEnabled.Load(),
129129
"Stateful encoding should not be enabled without stateless support")
130-
require.False(t, codec.statefulVoteDecEnabled,
130+
require.False(t, codec.statefulVoteDecEnabled.Load(),
131131
"Stateful decoding should not be enabled without stateless support")
132132

133133
// Now test with both stateless AND dynamic enabled
@@ -138,8 +138,8 @@ func TestMakeWsPeerMsgCodec_StatefulRequiresStateless(t *testing.T) {
138138
// Both stateless and stateful should be enabled
139139
require.True(t, codec.avdec.enabled,
140140
"Stateless decompression should be enabled when pfCompressedVoteVpack is advertised")
141-
require.True(t, codec.statefulVoteEncEnabled,
141+
require.True(t, codec.statefulVoteEncEnabled.Load(),
142142
"Stateful encoding should be enabled when both stateless and dynamic are supported")
143-
require.True(t, codec.statefulVoteDecEnabled,
143+
require.True(t, codec.statefulVoteDecEnabled.Load(),
144144
"Stateful decoding should be enabled when both stateless and dynamic are supported")
145145
}

network/wsNetwork_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -692,10 +692,10 @@ func TestWebsocketVoteDynamicCompressionAbortMessage(t *testing.T) {
692692
}
693693

694694
// Verify VP compression is established
695-
require.True(t, peerAtoB.msgCodec.statefulVoteEncEnabled, "VP encoding not established on A->B")
696-
require.True(t, peerAtoB.msgCodec.statefulVoteDecEnabled, "VP decoding not established on A->B")
697-
require.True(t, peerBtoA.msgCodec.statefulVoteEncEnabled, "VP encoding not established on B->A")
698-
require.True(t, peerBtoA.msgCodec.statefulVoteDecEnabled, "VP decoding not established on B->A")
695+
require.True(t, peerAtoB.msgCodec.statefulVoteEncEnabled.Load(), "VP encoding not established on A->B")
696+
require.True(t, peerAtoB.msgCodec.statefulVoteDecEnabled.Load(), "VP decoding not established on A->B")
697+
require.True(t, peerBtoA.msgCodec.statefulVoteEncEnabled.Load(), "VP encoding not established on B->A")
698+
require.True(t, peerBtoA.msgCodec.statefulVoteDecEnabled.Load(), "VP decoding not established on B->A")
699699

700700
// Send VP abort message from A to B
701701
abortMsg := append([]byte(protocol.VotePackedTag), voteCompressionAbortMessage)
@@ -704,7 +704,7 @@ func TestWebsocketVoteDynamicCompressionAbortMessage(t *testing.T) {
704704

705705
// Wait for abort to be processed - verify B disabled its encoder (can't send VP to A anymore)
706706
require.Eventually(t, func() bool {
707-
return !peerBtoA.msgCodec.statefulVoteEncEnabled
707+
return !peerBtoA.msgCodec.statefulVoteEncEnabled.Load()
708708
}, 2*time.Second, 50*time.Millisecond, "VP encoding not disabled on B->A after receiving abort message")
709709

710710
// Verify connection is still up after abort
@@ -792,13 +792,13 @@ func testWebsocketVoteDynamicCompressionMessages(t *testing.T, msgs [][]byte, ex
792792

793793
// Check if dynamic compression is enabled
794794
if tc.expectDynamic {
795-
require.True(t, peerAtoB.msgCodec.statefulVoteEncEnabled,
795+
require.True(t, peerAtoB.msgCodec.statefulVoteEncEnabled.Load(),
796796
"A->B peer should have dynamic encoding enabled")
797-
require.True(t, peerAtoB.msgCodec.statefulVoteDecEnabled,
797+
require.True(t, peerAtoB.msgCodec.statefulVoteDecEnabled.Load(),
798798
"A->B peer should have dynamic decoding enabled")
799-
require.True(t, peerBtoA.msgCodec.statefulVoteEncEnabled,
799+
require.True(t, peerBtoA.msgCodec.statefulVoteEncEnabled.Load(),
800800
"B->A peer should have dynamic encoding enabled")
801-
require.True(t, peerBtoA.msgCodec.statefulVoteDecEnabled,
801+
require.True(t, peerBtoA.msgCodec.statefulVoteDecEnabled.Load(),
802802
"B->A peer should have dynamic decoding enabled")
803803

804804
// Check negotiated table size
@@ -807,13 +807,13 @@ func testWebsocketVoteDynamicCompressionMessages(t *testing.T, msgs [][]byte, ex
807807
require.Equal(t, uint(tc.expectedSize), peerBtoA.getBestVpackTableSize(),
808808
"B->A peer should have expected table size")
809809
} else {
810-
require.False(t, peerAtoB.msgCodec.statefulVoteEncEnabled,
810+
require.False(t, peerAtoB.msgCodec.statefulVoteEncEnabled.Load(),
811811
"A->B peer should not have dynamic encoding enabled")
812-
require.False(t, peerAtoB.msgCodec.statefulVoteDecEnabled,
812+
require.False(t, peerAtoB.msgCodec.statefulVoteDecEnabled.Load(),
813813
"A->B peer should not have dynamic decoding enabled")
814-
require.False(t, peerBtoA.msgCodec.statefulVoteEncEnabled,
814+
require.False(t, peerBtoA.msgCodec.statefulVoteEncEnabled.Load(),
815815
"B->A peer should not have dynamic encoding enabled")
816-
require.False(t, peerBtoA.msgCodec.statefulVoteDecEnabled,
816+
require.False(t, peerBtoA.msgCodec.statefulVoteDecEnabled.Load(),
817817
"B->A peer should not have dynamic decoding enabled")
818818
}
819819

@@ -838,17 +838,17 @@ func testWebsocketVoteDynamicCompressionMessages(t *testing.T, msgs [][]byte, ex
838838
if tc.expectDynamic {
839839
if expectCompressionAfter {
840840
// Valid messages - compression should still be enabled
841-
require.True(t, peerAtoB.msgCodec.statefulVoteEncEnabled,
841+
require.True(t, peerAtoB.msgCodec.statefulVoteEncEnabled.Load(),
842842
"Stateful encoding should still be enabled after sending valid votes")
843-
require.True(t, peerAtoB.msgCodec.statefulVoteDecEnabled,
843+
require.True(t, peerAtoB.msgCodec.statefulVoteDecEnabled.Load(),
844844
"Stateful decoding should still be enabled after sending valid votes")
845-
require.True(t, peerBtoA.msgCodec.statefulVoteEncEnabled,
845+
require.True(t, peerBtoA.msgCodec.statefulVoteEncEnabled.Load(),
846846
"Stateful encoding should still be enabled after receiving valid votes")
847-
require.True(t, peerBtoA.msgCodec.statefulVoteDecEnabled,
847+
require.True(t, peerBtoA.msgCodec.statefulVoteDecEnabled.Load(),
848848
"Stateful decoding should still be enabled after receiving valid votes")
849849
} else {
850850
// Invalid messages - sender's encoder should be disabled when it fails to compress
851-
require.False(t, peerAtoB.msgCodec.statefulVoteEncEnabled,
851+
require.False(t, peerAtoB.msgCodec.statefulVoteEncEnabled.Load(),
852852
"Stateful encoding should be disabled after sending invalid messages")
853853
// Note: peerBtoA never receives VP messages, only AV messages, so its decoder is not affected
854854
}

protocol/tags_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ func TestLockdownTagList(t *testing.T) {
289289
TxnTag,
290290
UniEnsBlockReqTag,
291291
VoteBundleTag,
292+
VotePackedTag,
292293
}
293294
require.Equal(t, len(tagList), len(TagList))
294295
tagMap := make(map[Tag]bool)

0 commit comments

Comments
 (0)