Skip to content

Commit

Permalink
Unlimit buffer max size
Browse files Browse the repository at this point in the history
This mimics bytes.Buffer and avoids subtle data discarding
  • Loading branch information
edaniels committed Feb 23, 2024
1 parent 536a6d1 commit 507f458
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 13 deletions.
5 changes: 3 additions & 2 deletions packetio/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package packetio
import (
"errors"
"io"
"math"
"sync"
"time"

Expand Down Expand Up @@ -50,7 +51,7 @@ type Buffer struct {
const (
minSize = 2048
cutoffSize = 128 * 1024
maxSize = 4 * 1024 * 1024
maxSize = math.MaxInt // same as bytes.Buffer
)

// NewBuffer creates a new Buffer.
Expand Down Expand Up @@ -331,7 +332,7 @@ func (b *Buffer) size() int {

// SetLimitSize controls the maximum number of bytes that can be buffered.
// Causes Write to return ErrFull when this limit is reached.
// A zero value means 4MB since v0.11.0.
// A zero value means math.MaxInt since v2.2.5.
//
// User can set packetioSizeHardLimit build tag to enable 4MB hard limit.
// When packetioSizeHardLimit build tag is set, SetLimitSize exceeding
Expand Down
12 changes: 2 additions & 10 deletions packetio/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,27 +331,19 @@ func TestBufferLimitSizes(t *testing.T) {
128 * 1024,
1024 * 1024,
8 * 1024 * 1024,
0, // default
}
const headerSize = 2
const packetSize = 0x8000

for _, size := range sizes {
size := size
name := "default"
if size > 0 {
name = fmt.Sprintf("%dkBytes", size/1024)
}
name := fmt.Sprintf("%dkBytes", size/1024)

t.Run(name, func(t *testing.T) {
assert := assert.New(t)

buffer := NewBuffer()
if size == 0 {
size = maxSize
} else {
buffer.SetLimitSize(size + headerSize)
}
buffer.SetLimitSize(size + headerSize)
now := time.Now()
assert.NoError(buffer.SetReadDeadline(now.Add(5 * time.Second))) // Set deadline to avoid test deadlock

Expand Down
19 changes: 18 additions & 1 deletion udp/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync/atomic"
"time"

"github.com/pion/logging"
"github.com/pion/transport/v2/deadline"
"github.com/pion/transport/v2/packetio"
"golang.org/x/net/ipv4"
Expand Down Expand Up @@ -51,6 +52,8 @@ type listener struct {

readDoneCh chan struct{}
errRead atomic.Value // error

logger logging.LeveledLogger
}

// Accept waits for and returns the next connection to the listener.
Expand Down Expand Up @@ -151,6 +154,8 @@ type ListenConfig struct {
WriteBufferSize int

Batch BatchIOConfig

LoggerFactory logging.LoggerFactory
}

// Listen creates a new listener based on the ListenConfig.
Expand All @@ -175,6 +180,13 @@ func (lc *ListenConfig) Listen(network string, laddr *net.UDPAddr) (net.Listener
_ = conn.SetWriteBuffer(lc.WriteBufferSize)
}

loggerFactory := lc.LoggerFactory
if loggerFactory == nil {
loggerFactory = logging.NewDefaultLoggerFactory()
}

logger := loggerFactory.NewLogger("transport")

l := &listener{
pConn: conn,
acceptCh: make(chan *Conn, lc.Backlog),
Expand All @@ -183,6 +195,7 @@ func (lc *ListenConfig) Listen(network string, laddr *net.UDPAddr) (net.Listener
acceptFilter: lc.AcceptFilter,
connWG: &sync.WaitGroup{},
readDoneCh: make(chan struct{}),
logger: logger,
}

if lc.Batch.Enable {
Expand Down Expand Up @@ -251,6 +264,7 @@ func (l *listener) read() {
n, raddr, err := l.pConn.ReadFrom(buf)
if err != nil {
l.errRead.Store(err)
l.logger.Tracef("error reading from connection err=%v", err)
return
}
l.dispatchMsg(raddr, buf[:n])
Expand All @@ -263,7 +277,10 @@ func (l *listener) dispatchMsg(addr net.Addr, buf []byte) {
return
}
if ok {
_, _ = conn.buffer.Write(buf)
_, err := conn.buffer.Write(buf)
if err != nil {
l.logger.Tracef("error dispatching message addr=%v err=%v", addr, err)
}

Check warning on line 283 in udp/conn.go

View check run for this annotation

Codecov / codecov/patch

udp/conn.go#L282-L283

Added lines #L282 - L283 were not covered by tests
}
}

Expand Down

0 comments on commit 507f458

Please sign in to comment.