Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,9 @@ type IncomingMessage struct {
// Tag is a short string (2 bytes) marking a type of message
type Tag = protocol.Tag

func highPriorityTag(tags []protocol.Tag) bool {
for _, tag := range tags {
if tag == protocol.AgreementVoteTag || tag == protocol.ProposalPayloadTag {
return true
}
func highPriorityTag(tag protocol.Tag) bool {
if tag == protocol.AgreementVoteTag || tag == protocol.ProposalPayloadTag {
return true
}
return false
}
Expand Down
2 changes: 1 addition & 1 deletion network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (n *P2PNetwork) Broadcast(ctx context.Context, tag protocol.Tag, data []byt
return n.service.Publish(ctx, topic, data)
}
// Otherwise broadcast over websocket protocol stream
return n.broadcaster.BroadcastArray(ctx, []protocol.Tag{tag}, [][]byte{data}, wait, except)
return n.broadcaster.broadcast(ctx, tag, data, wait, except)
}

// Relay message
Expand Down
73 changes: 25 additions & 48 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@
)

type broadcastRequest struct {
tags []Tag
data [][]byte
tag Tag
data []byte
except Peer
done chan struct{}
enqueueTime time.Time
Expand Down Expand Up @@ -361,32 +361,20 @@
// If except is not nil then we will not send it to that neighboring Peer.
// if wait is true then the call blocks until the packet has actually been sent to all neighbors.
func (wn *WebsocketNetwork) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, except Peer) error {
dataArray := make([][]byte, 1)
dataArray[0] = data
tagArray := make([]protocol.Tag, 1)
tagArray[0] = tag
return wn.broadcaster.BroadcastArray(ctx, tagArray, dataArray, wait, except)
return wn.broadcaster.broadcast(ctx, tag, data, wait, except)
}

// BroadcastArray sends an array of messages.
// If except is not nil then we will not send it to that neighboring Peer.
// if wait is true then the call blocks until the packet has actually been sent to all neighbors.
// TODO: add `priority` argument so that we don't have to guess it based on tag
func (wn *msgBroadcaster) BroadcastArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer) error {
func (wn *msgBroadcaster) broadcast(ctx context.Context, tag Tag, data []byte, wait bool, except Peer) error {
if wn.config.DisableNetworking {
return nil
}
if len(tags) != len(data) {
return errBcastInvalidArray
}

request := broadcastRequest{tags: tags, data: data, enqueueTime: time.Now(), ctx: ctx}
request := broadcastRequest{tag: tag, data: data, enqueueTime: time.Now(), ctx: ctx}
if except != nil {
request.except = except
}

broadcastQueue := wn.broadcastQueueBulk
if highPriorityTag(tags) {
if highPriorityTag(tag) {
broadcastQueue = wn.broadcastQueueHighPrio
}
if wait {
Expand Down Expand Up @@ -431,14 +419,6 @@
return nil
}

// RelayArray relays array of messages
func (wn *WebsocketNetwork) RelayArray(ctx context.Context, tags []protocol.Tag, data [][]byte, wait bool, except Peer) error {
if wn.relayMessages {
return wn.broadcaster.BroadcastArray(ctx, tags, data, wait, except)
}
return nil
}

func (wn *WebsocketNetwork) disconnectThread(badnode DisconnectablePeer, reason disconnectReason) {
defer wn.wg.Done()
wn.disconnect(badnode, reason)
Expand Down Expand Up @@ -1351,28 +1331,25 @@

// preparePeerData prepares batches of data for sending.
// It performs zstd compression for proposal massages if they this is a prio request and has proposal.
func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool) ([][]byte, []crypto.Digest) {
digests := make([]crypto.Digest, len(request.data))
data := make([][]byte, len(request.data))
for i, d := range request.data {
tbytes := []byte(request.tags[i])
mbytes := make([]byte, len(tbytes)+len(d))
copy(mbytes, tbytes)
copy(mbytes[len(tbytes):], d)
data[i] = mbytes
if request.tags[i] != protocol.MsgDigestSkipTag && len(d) >= messageFilterSize {
digests[i] = crypto.Hash(mbytes)
}

if prio && request.tags[i] == protocol.ProposalPayloadTag {
compressed, logMsg := zstdCompressMsg(tbytes, d)
if len(logMsg) > 0 {
wn.log.Warn(logMsg)
}
data[i] = compressed
func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool) ([]byte, crypto.Digest) {
tbytes := []byte(request.tag)
mbytes := make([]byte, len(tbytes)+len(request.data))
copy(mbytes, tbytes)
copy(mbytes[len(tbytes):], request.data)

var digest crypto.Digest
if request.tag != protocol.MsgDigestSkipTag && len(request.data) >= messageFilterSize {
digest = crypto.Hash(mbytes)
}

if prio && request.tag == protocol.ProposalPayloadTag {
compressed, logMsg := zstdCompressMsg(tbytes, request.data)
if len(logMsg) > 0 {
wn.log.Warn(logMsg)

Check warning on line 1348 in network/wsNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/wsNetwork.go#L1348

Added line #L1348 was not covered by tests
}
mbytes = compressed
}
return data, digests
return mbytes, digest
}

// prio is set if the broadcast is a high-priority broadcast.
Expand All @@ -1389,7 +1366,7 @@
}

start := time.Now()
data, digests := wn.preparePeerData(request, prio)
data, digest := wn.preparePeerData(request, prio)

// first send to all the easy outbound peers who don't block, get them started.
sentMessageCount := 0
Expand All @@ -1400,7 +1377,7 @@
if Peer(peer) == request.except {
continue
}
ok := peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
ok := peer.writeNonBlock(request.ctx, data, prio, digest, request.enqueueTime)
if ok {
sentMessageCount++
continue
Expand Down
114 changes: 50 additions & 64 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,25 +533,6 @@ func TestWebsocketPeerData(t *testing.T) {
require.Equal(t, nil, netA.GetPeerData(peerB, "foo"))
}

// Test sending array of messages
func TestWebsocketNetworkArray(t *testing.T) {
partitiontest.PartitionTest(t)

netA, _, counter, closeFunc := setupWebsocketNetworkAB(t, 3)
defer closeFunc()
counterDone := counter.done

tags := []protocol.Tag{protocol.TxnTag, protocol.TxnTag, protocol.TxnTag}
data := [][]byte{[]byte("foo"), []byte("bar"), []byte("algo")}
netA.broadcaster.BroadcastArray(context.Background(), tags, data, false, nil)

select {
case <-counterDone:
case <-time.After(2 * time.Second):
t.Errorf("timeout, count=%d, wanted 2", counter.count)
}
}

// Test cancelling message sends
func TestWebsocketNetworkCancel(t *testing.T) {
partitiontest.PartitionTest(t)
Expand All @@ -570,25 +551,29 @@ func TestWebsocketNetworkCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

// try calling BroadcastArray
netA.broadcaster.BroadcastArray(ctx, tags, data, true, nil)
// try calling broadcast
for i := 0; i < 100; i++ {
netA.broadcaster.broadcast(ctx, tags[i], data[i], true, nil)
}

select {
case <-counterDone:
t.Errorf("All messages were sent, send not cancelled")
case <-time.After(2 * time.Second):
case <-time.After(1 * time.Second):
}
assert.Equal(t, 0, counter.Count())

// try calling innerBroadcast
request := broadcastRequest{tags: tags, data: data, enqueueTime: time.Now(), ctx: ctx}
peers, _ := netA.peerSnapshot([]*wsPeer{})
netA.broadcaster.innerBroadcast(request, true, peers)
for i := 0; i < 100; i++ {
request := broadcastRequest{tag: tags[i], data: data[i], enqueueTime: time.Now(), ctx: ctx}
netA.broadcaster.innerBroadcast(request, true, peers)
}

select {
case <-counterDone:
t.Errorf("All messages were sent, send not cancelled")
case <-time.After(2 * time.Second):
case <-time.After(1 * time.Second):
}
assert.Equal(t, 0, counter.Count())

Expand All @@ -600,21 +585,25 @@ func TestWebsocketNetworkCancel(t *testing.T) {
mbytes := make([]byte, len(tbytes)+len(msg))
copy(mbytes, tbytes)
copy(mbytes[len(tbytes):], msg)
msgs = append(msgs, sendMessage{data: mbytes, enqueued: time.Now(), peerEnqueued: enqueueTime, hash: crypto.Hash(mbytes), ctx: context.Background()})
msgs = append(msgs, sendMessage{data: mbytes, enqueued: time.Now(), peerEnqueued: enqueueTime, ctx: context.Background()})
}

// cancel msg 50
msgs[50].ctx = ctx

for _, peer := range peers {
peer.sendBufferHighPrio <- sendMessages{msgs: msgs}
for _, msg := range msgs {
peer.sendBufferHighPrio <- msg
}
}

select {
case <-counterDone:
t.Errorf("All messages were sent, send not cancelled")
case <-time.After(2 * time.Second):
case <-time.After(1 * time.Second):
}
assert.Equal(t, 50, counter.Count())
// all but msg 50 should have been sent
assert.Equal(t, 99, counter.Count())
}

// Set up two nodes, test that a.Broadcast is received by B, when B has no address.
Expand Down Expand Up @@ -990,8 +979,8 @@ func TestSlowOutboundPeer(t *testing.T) {
for i := range destPeers {
destPeers[i].closing = make(chan struct{})
destPeers[i].net = node
destPeers[i].sendBufferHighPrio = make(chan sendMessages, sendBufferLength)
destPeers[i].sendBufferBulk = make(chan sendMessages, sendBufferLength)
destPeers[i].sendBufferHighPrio = make(chan sendMessage, sendBufferLength)
destPeers[i].sendBufferBulk = make(chan sendMessage, sendBufferLength)
destPeers[i].conn = &nopConnSingleton
destPeers[i].rootURL = fmt.Sprintf("fake %d", i)
node.addPeer(&destPeers[i])
Expand Down Expand Up @@ -2501,14 +2490,10 @@ func TestWebsocketNetwork_checkServerResponseVariables(t *testing.T) {
}

func (wn *WebsocketNetwork) broadcastWithTimestamp(tag protocol.Tag, data []byte, when time.Time) error {
msgArr := make([][]byte, 1)
msgArr[0] = data
tagArr := make([]protocol.Tag, 1)
tagArr[0] = tag
request := broadcastRequest{tags: tagArr, data: msgArr, enqueueTime: when, ctx: context.Background()}
request := broadcastRequest{tag: tag, data: data, enqueueTime: when, ctx: context.Background()}

broadcastQueue := wn.broadcaster.broadcastQueueBulk
if highPriorityTag(tagArr) {
if highPriorityTag(tag) {
broadcastQueue = wn.broadcaster.broadcastQueueHighPrio
}
// no wait
Expand Down Expand Up @@ -3702,34 +3687,36 @@ func TestPreparePeerData(t *testing.T) {
partitiontest.PartitionTest(t)

// no compression
req := broadcastRequest{
tags: []protocol.Tag{protocol.AgreementVoteTag, protocol.ProposalPayloadTag},
data: [][]byte{[]byte("test"), []byte("data")},
reqs := []broadcastRequest{
{tag: protocol.AgreementVoteTag, data: []byte("test")},
{tag: protocol.ProposalPayloadTag, data: []byte("data")},
}

wn := WebsocketNetwork{}
data, digests := wn.broadcaster.preparePeerData(req, false)
require.NotEmpty(t, data)
require.NotEmpty(t, digests)
require.Equal(t, len(req.data), len(digests))
require.Equal(t, len(data), len(digests))
data := make([][]byte, len(reqs))
digests := make([]crypto.Digest, len(reqs))
for i, req := range reqs {
data[i], digests[i] = wn.broadcaster.preparePeerData(req, false)
require.NotEmpty(t, data[i])
require.Empty(t, digests[i]) // small messages have no digest
}

for i := range data {
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
require.Equal(t, append([]byte(reqs[i].tag), reqs[i].data...), data[i])
}

data, digests = wn.broadcaster.preparePeerData(req, true)
require.NotEmpty(t, data)
require.NotEmpty(t, digests)
require.Equal(t, len(req.data), len(digests))
require.Equal(t, len(data), len(digests))
for i, req := range reqs {
data[i], digests[i] = wn.broadcaster.preparePeerData(req, true)
require.NotEmpty(t, data[i])
require.Empty(t, digests[i]) // small messages have no digest
}

for i := range data {
if req.tags[i] != protocol.ProposalPayloadTag {
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
if reqs[i].tag != protocol.ProposalPayloadTag {
require.Equal(t, append([]byte(reqs[i].tag), reqs[i].data...), data[i])
require.Equal(t, data[i], data[i])
} else {
require.Equal(t, append([]byte(req.tags[i]), zstdCompressionMagic[:]...), data[i][:len(req.tags[i])+len(zstdCompressionMagic)])
require.Equal(t, append([]byte(reqs[i].tag), zstdCompressionMagic[:]...), data[i][:len(reqs[i].tag)+len(zstdCompressionMagic)])
}
}
}
Expand Down Expand Up @@ -4010,14 +3997,13 @@ func TestDiscardUnrequestedBlockResponse(t *testing.T) {
require.Eventually(t, func() bool { return netA.NumPeers() == 1 }, 500*time.Millisecond, 25*time.Millisecond)

// send an unrequested block response
msg := make([]sendMessage, 1)
msg[0] = sendMessage{
msg := sendMessage{
data: append([]byte(protocol.TopicMsgRespTag), []byte("foo")...),
enqueued: time.Now(),
peerEnqueued: time.Now(),
ctx: context.Background(),
}
netA.peers[0].sendBufferBulk <- sendMessages{msgs: msg}
netA.peers[0].sendBufferBulk <- msg
require.Eventually(t,
func() bool {
return networkConnectionsDroppedTotal.GetUint64ValueForLabels(map[string]string{"reason": "unrequestedTS"}) == 1
Expand Down Expand Up @@ -4080,7 +4066,7 @@ func TestDiscardUnrequestedBlockResponse(t *testing.T) {
netC.log.SetOutput(logBuffer)

// send a late TS response from A -> C
netA.peers[0].sendBufferBulk <- sendMessages{msgs: msg}
netA.peers[0].sendBufferBulk <- msg
require.Eventually(
t,
func() bool { return netC.peers[0].outstandingTopicRequests.Load() == int64(0) },
Expand Down Expand Up @@ -4497,8 +4483,8 @@ func TestSendMessageCallbackDrain(t *testing.T) {
node := makeTestWebsocketNode(t)
destPeer := wsPeer{
closing: make(chan struct{}),
sendBufferHighPrio: make(chan sendMessages, sendBufferLength),
sendBufferBulk: make(chan sendMessages, sendBufferLength),
sendBufferHighPrio: make(chan sendMessage, sendBufferLength),
sendBufferBulk: make(chan sendMessage, sendBufferLength),
conn: &nopConnSingleton,
}
node.addPeer(&destPeer)
Expand Down Expand Up @@ -4657,16 +4643,16 @@ func TestPeerComparisonInBroadcast(t *testing.T) {

testPeer := &wsPeer{
wsPeerCore: makePeerCore(wn.ctx, wn, log, nil, "test-addr", nil, ""),
sendBufferBulk: make(chan sendMessages, sendBufferLength),
sendBufferBulk: make(chan sendMessage, sendBufferLength),
}
exceptPeer := &wsPeer{
wsPeerCore: makePeerCore(wn.ctx, wn, log, nil, "except-addr", nil, ""),
sendBufferBulk: make(chan sendMessages, sendBufferLength),
sendBufferBulk: make(chan sendMessage, sendBufferLength),
}

request := broadcastRequest{
tags: []protocol.Tag{"test-tag"},
data: [][]byte{[]byte("test-data")},
tag: protocol.Tag("test-tag"),
data: []byte("test-data"),
enqueueTime: time.Now(),
except: exceptPeer,
}
Expand Down
Loading