Skip to content

Commit 55011f9

Browse files
authored
network: remove GossipNode.BroadcastArray (#6281)
1 parent 52ab8e7 commit 55011f9

File tree

5 files changed

+110
-181
lines changed

5 files changed

+110
-181
lines changed

network/gossipNode.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,9 @@ type IncomingMessage struct {
163163
// Tag is a short string (2 bytes) marking a type of message
164164
type Tag = protocol.Tag
165165

166-
func highPriorityTag(tags []protocol.Tag) bool {
167-
for _, tag := range tags {
168-
if tag == protocol.AgreementVoteTag || tag == protocol.ProposalPayloadTag {
169-
return true
170-
}
166+
func highPriorityTag(tag protocol.Tag) bool {
167+
if tag == protocol.AgreementVoteTag || tag == protocol.ProposalPayloadTag {
168+
return true
171169
}
172170
return false
173171
}

network/p2pNetwork.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ func (n *P2PNetwork) Broadcast(ctx context.Context, tag protocol.Tag, data []byt
551551
return n.service.Publish(ctx, topic, data)
552552
}
553553
// Otherwise broadcast over websocket protocol stream
554-
return n.broadcaster.BroadcastArray(ctx, []protocol.Tag{tag}, [][]byte{data}, wait, except)
554+
return n.broadcaster.broadcast(ctx, tag, data, wait, except)
555555
}
556556

557557
// Relay message

network/wsNetwork.go

Lines changed: 25 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -283,8 +283,8 @@ const (
283283
)
284284

285285
type broadcastRequest struct {
286-
tags []Tag
287-
data [][]byte
286+
tag Tag
287+
data []byte
288288
except Peer
289289
done chan struct{}
290290
enqueueTime time.Time
@@ -361,32 +361,20 @@ func (wn *WebsocketNetwork) PublicAddress() string {
361361
// If except is not nil then we will not send it to that neighboring Peer.
362362
// if wait is true then the call blocks until the packet has actually been sent to all neighbors.
363363
func (wn *WebsocketNetwork) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error {
364-
dataArray := make([][]byte, 1)
365-
dataArray[0] = data
366-
tagArray := make([]protocol.Tag, 1)
367-
tagArray[0] = tag
368-
return wn.broadcaster.BroadcastArray(ctx, tagArray, dataArray, wait, except)
364+
return wn.broadcaster.broadcast(ctx, tag, data, wait, except)
369365
}
370366

371-
// BroadcastArray sends an array of messages.
372-
// If except is not nil then we will not send it to that neighboring Peer.
373-
// if wait is true then the call blocks until the packet has actually been sent to all neighbors.
374-
// TODO: add `priority` argument so that we don't have to guess it based on tag
375-
func (wn *msgBroadcaster) BroadcastArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer) error {
367+
func (wn *msgBroadcaster) broadcast(ctx context.Context, tag Tag, data []byte, wait bool, except Peer) error {
376368
if wn.config.DisableNetworking {
377369
return nil
378370
}
379-
if len(tags) != len(data) {
380-
return errBcastInvalidArray
381-
}
382-
383-
request := broadcastRequest{tags: tags, data: data, enqueueTime: time.Now(), ctx: ctx}
371+
request := broadcastRequest{tag: tag, data: data, enqueueTime: time.Now(), ctx: ctx}
384372
if except != nil {
385373
request.except = except
386374
}
387375

388376
broadcastQueue := wn.broadcastQueueBulk
389-
if highPriorityTag(tags) {
377+
if highPriorityTag(tag) {
390378
broadcastQueue = wn.broadcastQueueHighPrio
391379
}
392380
if wait {
@@ -431,14 +419,6 @@ func (wn *WebsocketNetwork) Relay(ctx context.Context, tag protocol.Tag, data []
431419
return nil
432420
}
433421

434-
// RelayArray relays array of messages
435-
func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer) error {
436-
if wn.relayMessages {
437-
return wn.broadcaster.BroadcastArray(ctx, tags, data, wait, except)
438-
}
439-
return nil
440-
}
441-
442422
func (wn *WebsocketNetwork) disconnectThread(badnode DisconnectablePeer, reason disconnectReason) {
443423
defer wn.wg.Done()
444424
wn.disconnect(badnode, reason)
@@ -1351,28 +1331,25 @@ func (wn *WebsocketNetwork) getPeersChangeCounter() int32 {
13511331

13521332
// preparePeerData prepares batches of data for sending.
13531333
// It performs zstd compression for proposal massages if they this is a prio request and has proposal.
1354-
func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool) ([][]byte, []crypto.Digest) {
1355-
digests := make([]crypto.Digest, len(request.data))
1356-
data := make([][]byte, len(request.data))
1357-
for i, d := range request.data {
1358-
tbytes := []byte(request.tags[i])
1359-
mbytes := make([]byte, len(tbytes)+len(d))
1360-
copy(mbytes, tbytes)
1361-
copy(mbytes[len(tbytes):], d)
1362-
data[i] = mbytes
1363-
if request.tags[i] != protocol.MsgDigestSkipTag && len(d) >= messageFilterSize {
1364-
digests[i] = crypto.Hash(mbytes)
1365-
}
1366-
1367-
if prio && request.tags[i] == protocol.ProposalPayloadTag {
1368-
compressed, logMsg := zstdCompressMsg(tbytes, d)
1369-
if len(logMsg) > 0 {
1370-
wn.log.Warn(logMsg)
1371-
}
1372-
data[i] = compressed
1334+
func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool) ([]byte, crypto.Digest) {
1335+
tbytes := []byte(request.tag)
1336+
mbytes := make([]byte, len(tbytes)+len(request.data))
1337+
copy(mbytes, tbytes)
1338+
copy(mbytes[len(tbytes):], request.data)
1339+
1340+
var digest crypto.Digest
1341+
if request.tag != protocol.MsgDigestSkipTag && len(request.data) >= messageFilterSize {
1342+
digest = crypto.Hash(mbytes)
1343+
}
1344+
1345+
if prio && request.tag == protocol.ProposalPayloadTag {
1346+
compressed, logMsg := zstdCompressMsg(tbytes, request.data)
1347+
if len(logMsg) > 0 {
1348+
wn.log.Warn(logMsg)
13731349
}
1350+
mbytes = compressed
13741351
}
1375-
return data, digests
1352+
return mbytes, digest
13761353
}
13771354

13781355
// prio is set if the broadcast is a high-priority broadcast.
@@ -1389,7 +1366,7 @@ func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, pe
13891366
}
13901367

13911368
start := time.Now()
1392-
data, digests := wn.preparePeerData(request, prio)
1369+
data, digest := wn.preparePeerData(request, prio)
13931370

13941371
// first send to all the easy outbound peers who don't block, get them started.
13951372
sentMessageCount := 0
@@ -1400,7 +1377,7 @@ func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, pe
14001377
if Peer(peer) == request.except {
14011378
continue
14021379
}
1403-
ok := peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
1380+
ok := peer.writeNonBlock(request.ctx, data, prio, digest, request.enqueueTime)
14041381
if ok {
14051382
sentMessageCount++
14061383
continue

network/wsNetwork_test.go

Lines changed: 50 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -533,25 +533,6 @@ func TestWebsocketPeerData(t *testing.T) {
533533
require.Equal(t, nil, netA.GetPeerData(peerB, "foo"))
534534
}
535535

536-
// Test sending array of messages
537-
func TestWebsocketNetworkArray(t *testing.T) {
538-
partitiontest.PartitionTest(t)
539-
540-
netA, _, counter, closeFunc := setupWebsocketNetworkAB(t, 3)
541-
defer closeFunc()
542-
counterDone := counter.done
543-
544-
tags := []protocol.Tag{protocol.TxnTag, protocol.TxnTag, protocol.TxnTag}
545-
data := [][]byte{[]byte("foo"), []byte("bar"), []byte("algo")}
546-
netA.broadcaster.BroadcastArray(context.Background(), tags, data, false, nil)
547-
548-
select {
549-
case <-counterDone:
550-
case <-time.After(2 * time.Second):
551-
t.Errorf("timeout, count=%d, wanted 2", counter.count)
552-
}
553-
}
554-
555536
// Test cancelling message sends
556537
func TestWebsocketNetworkCancel(t *testing.T) {
557538
partitiontest.PartitionTest(t)
@@ -570,25 +551,29 @@ func TestWebsocketNetworkCancel(t *testing.T) {
570551
ctx, cancel := context.WithCancel(context.Background())
571552
cancel()
572553

573-
// try calling BroadcastArray
574-
netA.broadcaster.BroadcastArray(ctx, tags, data, true, nil)
554+
// try calling broadcast
555+
for i := 0; i < 100; i++ {
556+
netA.broadcaster.broadcast(ctx, tags[i], data[i], true, nil)
557+
}
575558

576559
select {
577560
case <-counterDone:
578561
t.Errorf("All messages were sent, send not cancelled")
579-
case <-time.After(2 * time.Second):
562+
case <-time.After(1 * time.Second):
580563
}
581564
assert.Equal(t, 0, counter.Count())
582565

583566
// try calling innerBroadcast
584-
request := broadcastRequest{tags: tags, data: data, enqueueTime: time.Now(), ctx: ctx}
585567
peers, _ := netA.peerSnapshot([]*wsPeer{})
586-
netA.broadcaster.innerBroadcast(request, true, peers)
568+
for i := 0; i < 100; i++ {
569+
request := broadcastRequest{tag: tags[i], data: data[i], enqueueTime: time.Now(), ctx: ctx}
570+
netA.broadcaster.innerBroadcast(request, true, peers)
571+
}
587572

588573
select {
589574
case <-counterDone:
590575
t.Errorf("All messages were sent, send not cancelled")
591-
case <-time.After(2 * time.Second):
576+
case <-time.After(1 * time.Second):
592577
}
593578
assert.Equal(t, 0, counter.Count())
594579

@@ -600,21 +585,25 @@ func TestWebsocketNetworkCancel(t *testing.T) {
600585
mbytes := make([]byte, len(tbytes)+len(msg))
601586
copy(mbytes, tbytes)
602587
copy(mbytes[len(tbytes):], msg)
603-
msgs = append(msgs, sendMessage{data: mbytes, enqueued: time.Now(), peerEnqueued: enqueueTime, hash: crypto.Hash(mbytes), ctx: context.Background()})
588+
msgs = append(msgs, sendMessage{data: mbytes, enqueued: time.Now(), peerEnqueued: enqueueTime, ctx: context.Background()})
604589
}
605590

591+
// cancel msg 50
606592
msgs[50].ctx = ctx
607593

608594
for _, peer := range peers {
609-
peer.sendBufferHighPrio <- sendMessages{msgs: msgs}
595+
for _, msg := range msgs {
596+
peer.sendBufferHighPrio <- msg
597+
}
610598
}
611599

612600
select {
613601
case <-counterDone:
614602
t.Errorf("All messages were sent, send not cancelled")
615-
case <-time.After(2 * time.Second):
603+
case <-time.After(1 * time.Second):
616604
}
617-
assert.Equal(t, 50, counter.Count())
605+
// all but msg 50 should have been sent
606+
assert.Equal(t, 99, counter.Count())
618607
}
619608

620609
// Set up two nodes, test that a.Broadcast is received by B, when B has no address.
@@ -990,8 +979,8 @@ func TestSlowOutboundPeer(t *testing.T) {
990979
for i := range destPeers {
991980
destPeers[i].closing = make(chan struct{})
992981
destPeers[i].net = node
993-
destPeers[i].sendBufferHighPrio = make(chan sendMessages, sendBufferLength)
994-
destPeers[i].sendBufferBulk = make(chan sendMessages, sendBufferLength)
982+
destPeers[i].sendBufferHighPrio = make(chan sendMessage, sendBufferLength)
983+
destPeers[i].sendBufferBulk = make(chan sendMessage, sendBufferLength)
995984
destPeers[i].conn = &nopConnSingleton
996985
destPeers[i].rootURL = fmt.Sprintf("fake %d", i)
997986
node.addPeer(&destPeers[i])
@@ -2501,14 +2490,10 @@ func TestWebsocketNetwork_checkServerResponseVariables(t *testing.T) {
25012490
}
25022491

25032492
func (wn *WebsocketNetwork) broadcastWithTimestamp(tag protocol.Tag, data []byte, when time.Time) error {
2504-
msgArr := make([][]byte, 1)
2505-
msgArr[0] = data
2506-
tagArr := make([]protocol.Tag, 1)
2507-
tagArr[0] = tag
2508-
request := broadcastRequest{tags: tagArr, data: msgArr, enqueueTime: when, ctx: context.Background()}
2493+
request := broadcastRequest{tag: tag, data: data, enqueueTime: when, ctx: context.Background()}
25092494

25102495
broadcastQueue := wn.broadcaster.broadcastQueueBulk
2511-
if highPriorityTag(tagArr) {
2496+
if highPriorityTag(tag) {
25122497
broadcastQueue = wn.broadcaster.broadcastQueueHighPrio
25132498
}
25142499
// no wait
@@ -3702,34 +3687,36 @@ func TestPreparePeerData(t *testing.T) {
37023687
partitiontest.PartitionTest(t)
37033688

37043689
// no compression
3705-
req := broadcastRequest{
3706-
tags: []protocol.Tag{protocol.AgreementVoteTag, protocol.ProposalPayloadTag},
3707-
data: [][]byte{[]byte("test"), []byte("data")},
3690+
reqs := []broadcastRequest{
3691+
{tag: protocol.AgreementVoteTag, data: []byte("test")},
3692+
{tag: protocol.ProposalPayloadTag, data: []byte("data")},
37083693
}
37093694

37103695
wn := WebsocketNetwork{}
3711-
data, digests := wn.broadcaster.preparePeerData(req, false)
3712-
require.NotEmpty(t, data)
3713-
require.NotEmpty(t, digests)
3714-
require.Equal(t, len(req.data), len(digests))
3715-
require.Equal(t, len(data), len(digests))
3696+
data := make([][]byte, len(reqs))
3697+
digests := make([]crypto.Digest, len(reqs))
3698+
for i, req := range reqs {
3699+
data[i], digests[i] = wn.broadcaster.preparePeerData(req, false)
3700+
require.NotEmpty(t, data[i])
3701+
require.Empty(t, digests[i]) // small messages have no digest
3702+
}
37163703

37173704
for i := range data {
3718-
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
3705+
require.Equal(t, append([]byte(reqs[i].tag), reqs[i].data...), data[i])
37193706
}
37203707

3721-
data, digests = wn.broadcaster.preparePeerData(req, true)
3722-
require.NotEmpty(t, data)
3723-
require.NotEmpty(t, digests)
3724-
require.Equal(t, len(req.data), len(digests))
3725-
require.Equal(t, len(data), len(digests))
3708+
for i, req := range reqs {
3709+
data[i], digests[i] = wn.broadcaster.preparePeerData(req, true)
3710+
require.NotEmpty(t, data[i])
3711+
require.Empty(t, digests[i]) // small messages have no digest
3712+
}
37263713

37273714
for i := range data {
3728-
if req.tags[i] != protocol.ProposalPayloadTag {
3729-
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
3715+
if reqs[i].tag != protocol.ProposalPayloadTag {
3716+
require.Equal(t, append([]byte(reqs[i].tag), reqs[i].data...), data[i])
37303717
require.Equal(t, data[i], data[i])
37313718
} else {
3732-
require.Equal(t, append([]byte(req.tags[i]), zstdCompressionMagic[:]...), data[i][:len(req.tags[i])+len(zstdCompressionMagic)])
3719+
require.Equal(t, append([]byte(reqs[i].tag), zstdCompressionMagic[:]...), data[i][:len(reqs[i].tag)+len(zstdCompressionMagic)])
37333720
}
37343721
}
37353722
}
@@ -4010,14 +3997,13 @@ func TestDiscardUnrequestedBlockResponse(t *testing.T) {
40103997
require.Eventually(t, func() bool { return netA.NumPeers() == 1 }, 500*time.Millisecond, 25*time.Millisecond)
40113998

40123999
// send an unrequested block response
4013-
msg := make([]sendMessage, 1)
4014-
msg[0] = sendMessage{
4000+
msg := sendMessage{
40154001
data: append([]byte(protocol.TopicMsgRespTag), []byte("foo")...),
40164002
enqueued: time.Now(),
40174003
peerEnqueued: time.Now(),
40184004
ctx: context.Background(),
40194005
}
4020-
netA.peers[0].sendBufferBulk <- sendMessages{msgs: msg}
4006+
netA.peers[0].sendBufferBulk <- msg
40214007
require.Eventually(t,
40224008
func() bool {
40234009
return networkConnectionsDroppedTotal.GetUint64ValueForLabels(map[string]string{"reason": "unrequestedTS"}) == 1
@@ -4080,7 +4066,7 @@ func TestDiscardUnrequestedBlockResponse(t *testing.T) {
40804066
netC.log.SetOutput(logBuffer)
40814067

40824068
// send a late TS response from A -> C
4083-
netA.peers[0].sendBufferBulk <- sendMessages{msgs: msg}
4069+
netA.peers[0].sendBufferBulk <- msg
40844070
require.Eventually(
40854071
t,
40864072
func() bool { return netC.peers[0].outstandingTopicRequests.Load() == int64(0) },
@@ -4497,8 +4483,8 @@ func TestSendMessageCallbackDrain(t *testing.T) {
44974483
node := makeTestWebsocketNode(t)
44984484
destPeer := wsPeer{
44994485
closing: make(chan struct{}),
4500-
sendBufferHighPrio: make(chan sendMessages, sendBufferLength),
4501-
sendBufferBulk: make(chan sendMessages, sendBufferLength),
4486+
sendBufferHighPrio: make(chan sendMessage, sendBufferLength),
4487+
sendBufferBulk: make(chan sendMessage, sendBufferLength),
45024488
conn: &nopConnSingleton,
45034489
}
45044490
node.addPeer(&destPeer)
@@ -4657,16 +4643,16 @@ func TestPeerComparisonInBroadcast(t *testing.T) {
46574643

46584644
testPeer := &wsPeer{
46594645
wsPeerCore: makePeerCore(wn.ctx, wn, log, nil, "test-addr", nil, ""),
4660-
sendBufferBulk: make(chan sendMessages, sendBufferLength),
4646+
sendBufferBulk: make(chan sendMessage, sendBufferLength),
46614647
}
46624648
exceptPeer := &wsPeer{
46634649
wsPeerCore: makePeerCore(wn.ctx, wn, log, nil, "except-addr", nil, ""),
4664-
sendBufferBulk: make(chan sendMessages, sendBufferLength),
4650+
sendBufferBulk: make(chan sendMessage, sendBufferLength),
46654651
}
46664652

46674653
request := broadcastRequest{
4668-
tags: []protocol.Tag{"test-tag"},
4669-
data: [][]byte{[]byte("test-data")},
4654+
tag: protocol.Tag("test-tag"),
4655+
data: []byte("test-data"),
46704656
enqueueTime: time.Now(),
46714657
except: exceptPeer,
46724658
}

0 commit comments

Comments
 (0)