Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

webrtc: increase maximum message size #2949

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions p2p/transport/webrtc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,28 @@ import (
)

const (
// maxMessageSize is the maximum message size of the Protobuf message we send / receive.
maxMessageSize = 16384
// maxMessageSizeRead is the maximum message size of the Protobuf message we send / receive.
maxMessageSizeRead = 256 * 1024
// maxMessageSizeWrite is the maximum message size of the Protobuf message we send / receive.
maxMessageSizeWrite = 64 * 1024
// maxSendBuffer is the maximum data we enqueue on the underlying data channel for writes.
// The underlying SCTP layer has an unbounded buffer for writes. We limit the amount enqueued
// per stream is limited to avoid a single stream monopolizing the entire connection.
maxSendBuffer = 2 * maxMessageSize
maxSendBuffer = 2 * maxMessageSizeWrite
// sendBufferLowThreshold is the threshold below which we write more data on the underlying
// data channel. We want a notification as soon as we can write 1 full sized message.
sendBufferLowThreshold = maxSendBuffer - maxMessageSize
sendBufferLowThreshold = maxSendBuffer - maxMessageSizeWrite
// maxTotalControlMessagesSize is the maximum total size of all control messages we will
// write on this stream.
// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
// send queue.
maxTotalControlMessagesSize = 50

// Proto overhead assumption is 5 bytes
// protoOverhead assumption is 5 bytes
protoOverhead = 5
// Varint overhead is assumed to be 2 bytes. This is safe since
// 1. This is only used and when writing message, and
// 2. We only send messages in chunks of `maxMessageSize - varintOverhead`
// which includes the data and the protobuf header. Since `maxMessageSize`
// is less than or equal to 2 ^ 14, the varint will not be more than
// 2 bytes in length.
varintOverhead = 2
// varintOverhead is the value of `maxMessageSizeWrite` in varint format
varintOverhead = 3
// maxFINACKWait is the maximum amount of time a stream will wait to read
// FIN_ACK before closing the data channel
maxFINACKWait = 10 * time.Second
Expand Down Expand Up @@ -106,7 +103,7 @@ func newStream(
onDone func(),
) *stream {
s := &stream{
reader: pbio.NewDelimitedReader(rwc, maxMessageSize),
reader: pbio.NewDelimitedReader(rwc, maxMessageSizeRead),
writer: pbio.NewDelimitedWriter(rwc),
writeStateChanged: make(chan struct{}, 1),
id: *channel.ID(),
Expand Down
4 changes: 2 additions & 2 deletions p2p/transport/webrtc/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func TestStreamChunking(t *testing.T) {
clientStr := newStream(client.dc, client.rwc, func() {})
serverStr := newStream(server.dc, server.rwc, func() {})

const N = (16 << 10) + 1000
const N = (64 << 10) + 1000
go func() {
data := make([]byte, N)
_, err := clientStr.Write(data)
Expand All @@ -555,7 +555,7 @@ func TestStreamChunking(t *testing.T) {
data := make([]byte, N)
n, err := serverStr.Read(data)
require.NoError(t, err)
require.LessOrEqual(t, n, 16<<10)
require.LessOrEqual(t, n, 64<<10)

nn, err := serverStr.Read(data)
require.NoError(t, err)
Expand Down
5 changes: 4 additions & 1 deletion p2p/transport/webrtc/stream_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package libp2pwebrtc

import (
"errors"
"fmt"
"os"
"time"

Expand Down Expand Up @@ -78,13 +79,14 @@ func (s *stream) Write(b []byte) (int, error) {
select {
case <-writeDeadlineChan:
s.mx.Lock()
fmt.Println("returning deadline exceeded")
return n, os.ErrDeadlineExceeded
case <-s.writeStateChanged:
}
s.mx.Lock()
continue
}
end := maxMessageSize
end := maxMessageSizeWrite
if end > availableSpace {
end = availableSpace
}
Expand All @@ -94,6 +96,7 @@ func (s *stream) Write(b []byte) (int, error) {
}
msg = pb.Message{Message: b[:end]}
if err := s.writer.WriteMsg(&msg); err != nil {
fmt.Println("err", err)
return n, err
}
n += end
Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/webrtc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ const (
DefaultFailedTimeout = 30 * time.Second
DefaultKeepaliveTimeout = 15 * time.Second

sctpReceiveBufferSize = 100_000
sctpReceiveBufferSize = maxMessageSizeRead * 20
)

type WebRTCTransport struct {
Expand Down
4 changes: 2 additions & 2 deletions p2p/transport/webrtc/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ func TestTransportWebRTC_Deadline(t *testing.T) {
require.NoError(t, err)

stream.SetWriteDeadline(time.Now().Add(100 * time.Millisecond))
largeBuffer := make([]byte, 2*1024*1024)
largeBuffer := make([]byte, 20*1024*1024)
_, err = stream.Write(largeBuffer)
require.ErrorIs(t, err, os.ErrDeadlineExceeded)

Expand Down Expand Up @@ -595,7 +595,7 @@ func TestTransportWebRTC_StreamWriteBufferContention(t *testing.T) {
require.NoError(t, err)

stream.SetWriteDeadline(time.Now().Add(200 * time.Millisecond))
largeBuffer := make([]byte, 2*1024*1024)
largeBuffer := make([]byte, 20*1024*1024)
_, err = stream.Write(largeBuffer)
errC <- err
}()
Expand Down
Loading