Skip to content

Commit

Permalink
feat: add WithReadThreshold API
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Dec 21, 2023
1 parent 9707178 commit d5b0914
Show file tree
Hide file tree
Showing 23 changed files with 551 additions and 164 deletions.
17 changes: 10 additions & 7 deletions connection_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
ErrEOF = syscall.Errno(0x106)
// Write I/O buffer timeout, calling by Connection.Writer
ErrWriteTimeout = syscall.Errno(0x107)
// The wait read size large than read threshold
ErrReadOutOfThreshold = syscall.Errno(0x108)
)

const ErrnoMask = 0xFF
Expand Down Expand Up @@ -90,11 +92,12 @@ func (e *exception) Unwrap() error {

// Errors defined in netpoll
var errnos = [...]string{
ErrnoMask & ErrConnClosed: "connection has been closed",
ErrnoMask & ErrReadTimeout: "connection read timeout",
ErrnoMask & ErrDialTimeout: "dial wait timeout",
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll dose not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
ErrnoMask & ErrConnClosed: "connection has been closed",
ErrnoMask & ErrReadTimeout: "connection read timeout",
ErrnoMask & ErrDialTimeout: "dial wait timeout",
ErrnoMask & ErrDialNoDeadline: "dial no deadline",
ErrnoMask & ErrUnsupported: "netpoll dose not support",
ErrnoMask & ErrEOF: "EOF",
ErrnoMask & ErrWriteTimeout: "connection write timeout",
ErrnoMask & ErrReadOutOfThreshold: "connection read size is out of threshold",
}
69 changes: 46 additions & 23 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,21 @@ type connection struct {
netFD
onEvent
locker
operator *FDOperator
readTimeout time.Duration
readTimer *time.Timer
readTrigger chan error
waitReadSize int64
writeTimeout time.Duration
writeTimer *time.Timer
writeTrigger chan error
inputBuffer *LinkBuffer
outputBuffer *LinkBuffer
outputBarrier *barrier
supportZeroCopy bool
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
operator *FDOperator
readTimeout time.Duration
readTimer *time.Timer
readTrigger chan error
waitReadSize int64
writeTimeout time.Duration
writeTimer *time.Timer
writeTrigger chan error
inputBuffer *LinkBuffer
outputBuffer *LinkBuffer
outputBarrier *barrier
supportZeroCopy bool
maxSize int // The maximum size of data between two Release().
bookSize int // The size of data that can be read at once.
readBufferThreshold int64 // The readBufferThreshold limit the size of connection inputBuffer. In bytes.
}

var (
Expand Down Expand Up @@ -94,6 +95,12 @@ func (c *connection) SetWriteTimeout(timeout time.Duration) error {
return nil
}

// SetReadBufferThreshold implements Connection.
func (c *connection) SetReadBufferThreshold(threshold int64) error {
c.readBufferThreshold = threshold
return nil
}

// ------------------------------------------ implement zero-copy reader ------------------------------------------

// Next implements Connection.
Expand Down Expand Up @@ -394,28 +401,44 @@ func (c *connection) triggerWrite(err error) {
// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {
if n <= c.inputBuffer.Len() {
return nil
goto CLEANUP
}
// cannot wait read with an out of threshold size
if c.readBufferThreshold > 0 && int64(n) > c.readBufferThreshold {
// just return error and dont do cleanup
return Exception(ErrReadOutOfThreshold, "wait read")
}

atomic.StoreInt64(&c.waitReadSize, int64(n))
defer atomic.StoreInt64(&c.waitReadSize, 0)
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
err = c.waitReadWithTimeout(n)
goto CLEANUP
}
// wait full n
for c.inputBuffer.Len() < n {
switch c.status(closing) {
case poller:
return Exception(ErrEOF, "wait read")
err = Exception(ErrEOF, "wait read")
case user:
return Exception(ErrConnClosed, "wait read")
err = Exception(ErrConnClosed, "wait read")
default:
err = <-c.readTrigger
if err != nil {
return err
}
}
if err != nil {
goto CLEANUP
}
}
return nil
CLEANUP:
atomic.StoreInt64(&c.waitReadSize, 0)
if c.readBufferThreshold > 0 && err == nil {
// only resume read when current read size could make newBufferSize < readBufferThreshold
bufferSize := int64(c.inputBuffer.Len())
newBufferSize := bufferSize - int64(n)
if bufferSize >= c.readBufferThreshold && newBufferSize < c.readBufferThreshold {
c.resumeRead()
}
}
return err
}

// waitReadWithTimeout will wait full n bytes or until timeout.
Expand Down
1 change: 1 addition & 0 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (c *connection) onPrepare(opts *options) (err error) {
c.SetReadTimeout(opts.readTimeout)
c.SetWriteTimeout(opts.writeTimeout)
c.SetIdleTimeout(opts.idleTimeout)
c.SetReadBufferThreshold(opts.readBufferThreshold)

// calling prepare first and then register.
if opts.onPrepare != nil {
Expand Down
43 changes: 38 additions & 5 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ func (c *connection) inputAck(n int) (err error) {
c.maxSize = mallocMax
}

// trigger throttle
if c.readBufferThreshold > 0 && int64(length) >= c.readBufferThreshold {
c.pauseRead()
}

var needTrigger = true
if length == n { // first start onRequest
needTrigger = c.onRequest()
Expand All @@ -117,7 +122,7 @@ func (c *connection) inputAck(n int) (err error) {
// outputs implements FDOperator.
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
if c.outputBuffer.IsEmpty() {
c.rw2r()
c.pauseWrite()
return rs, c.supportZeroCopy
}
rs = c.outputBuffer.GetBytes(vs)
Expand All @@ -131,13 +136,41 @@ func (c *connection) outputAck(n int) (err error) {
c.outputBuffer.Release()
}
if c.outputBuffer.IsEmpty() {
c.rw2r()
c.pauseWrite()
}
return nil
}

// rw2r removed the monitoring of write events.
func (c *connection) rw2r() {
c.operator.Control(PollRW2R)
// pauseWrite removed the monitoring of write events.
// pauseWrite used in poller
func (c *connection) pauseWrite() {
switch c.operator.getMode() {
case opreadwrite:
c.operator.Control(PollRW2R)
case opwrite:
c.operator.Control(PollW2Hup)
}
c.triggerWrite(nil)
}

// pauseRead removed the monitoring of read events.
// pauseRead used in poller
func (c *connection) pauseRead() {
switch c.operator.getMode() {
case opread:
c.operator.Control(PollR2Hup)
case opreadwrite:
c.operator.Control(PollRW2W)
}
}

// resumeRead add the monitoring of read events.
// resumeRead used by users
func (c *connection) resumeRead() {
switch c.operator.getMode() {
case ophup:
c.operator.Control(PollHup2R)
case opwrite:
c.operator.Control(PollW2RW)
}
}
110 changes: 99 additions & 11 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,18 +499,15 @@ func TestConnDetach(t *testing.T) {
func TestParallelShortConnection(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
MustNil(t, err)
defer ln.Close()

var received int64
el, err := NewEventLoop(func(ctx context.Context, connection Connection) error {
data, err := connection.Reader().Next(connection.Reader().Len())
if err != nil {
return err
}
Assert(t, err == nil || errors.Is(err, ErrEOF))
atomic.AddInt64(&received, int64(len(data)))
//t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive())
t.Logf("conn[%s] received: %d, active: %v", connection.RemoteAddr(), len(data), connection.IsActive())
return nil
})
defer el.Shutdown(context.Background())
go func() {
el.Serve(ln)
}()
Expand All @@ -536,10 +533,11 @@ func TestParallelShortConnection(t *testing.T) {
}
wg.Wait()

for atomic.LoadInt64(&received) < int64(totalSize) {
t.Logf("received: %d, except: %d", atomic.LoadInt64(&received), totalSize)
start := time.Now()
for atomic.LoadInt64(&received) < int64(totalSize) && time.Now().Sub(start) < time.Second {
time.Sleep(time.Millisecond * 100)
}
Equal(t, atomic.LoadInt64(&received), int64(totalSize))
}

func TestConnectionServerClose(t *testing.T) {
Expand Down Expand Up @@ -643,8 +641,6 @@ func TestConnectionServerClose(t *testing.T) {
func TestConnectionDailTimeoutAndClose(t *testing.T) {
ln, err := createTestListener("tcp", ":12345")
MustNil(t, err)
defer ln.Close()

el, err := NewEventLoop(
func(ctx context.Context, connection Connection) error {
_, err = connection.Reader().Next(connection.Reader().Len())
Expand All @@ -668,10 +664,102 @@ func TestConnectionDailTimeoutAndClose(t *testing.T) {
go func() {
defer wg.Done()
conn, err := DialConnection("tcp", ":12345", time.Nanosecond)
Assert(t, err == nil || strings.Contains(err.Error(), "i/o timeout"))
Assert(t, err == nil || strings.Contains(err.Error(), "i/o timeout"), err)
_ = conn
}()
}
wg.Wait()
}
}

func TestConnectionReadOutOfThreshold(t *testing.T) {
var readThreshold = 1024 * 100
var readSize = readThreshold + 1
var opts = &options{}
var wg sync.WaitGroup
wg.Add(1)
opts.onRequest = func(ctx context.Context, connection Connection) error {
if connection.Reader().Len() < readThreshold {
return nil
}
defer wg.Done()
// read throttled data
_, err := connection.Reader().Next(readSize)
Assert(t, errors.Is(err, ErrReadOutOfThreshold), err)
connection.Close()
return nil
}

WithReadBufferThreshold(int64(readThreshold)).f(opts)
r, w := GetSysFdPairs()
rconn, wconn := &connection{}, &connection{}
rconn.init(&netFD{fd: r}, opts)
wconn.init(&netFD{fd: w}, opts)

msg := make([]byte, readThreshold)
_, err := wconn.Writer().WriteBinary(msg)
MustNil(t, err)
err = wconn.Writer().Flush()
MustNil(t, err)
wg.Wait()
}

func TestConnectionReadThreshold(t *testing.T) {
var readThreshold int64 = 1024 * 100
var opts = &options{}
var wg sync.WaitGroup
var throttled int32
wg.Add(1)
opts.onRequest = func(ctx context.Context, connection Connection) error {
if int64(connection.Reader().Len()) < readThreshold {
return nil
}
defer wg.Done()

atomic.StoreInt32(&throttled, 1)
// check if no more read data when throttled
inbuffered := connection.Reader().Len()
t.Logf("Inbuffered: %d", inbuffered)
time.Sleep(time.Millisecond * 100)
Equal(t, inbuffered, connection.Reader().Len())

// read non-throttled data
buf, err := connection.Reader().Next(int(readThreshold))
Equal(t, int64(len(buf)), readThreshold)
MustNil(t, err)
err = connection.Reader().Release()
MustNil(t, err)
t.Logf("read non-throttled data")

// continue read throttled data
buf, err = connection.Reader().Next(5)
MustNil(t, err)
t.Logf("read throttled data: [%s]", buf)
Equal(t, len(buf), 5)
MustNil(t, err)
err = connection.Reader().Release()
MustNil(t, err)
Equal(t, connection.Reader().Len(), 0)
return nil
}

WithReadBufferThreshold(readThreshold).f(opts)
r, w := GetSysFdPairs()
rconn, wconn := &connection{}, &connection{}
rconn.init(&netFD{fd: r}, opts)
wconn.init(&netFD{fd: w}, opts)
Assert(t, rconn.readBufferThreshold == readThreshold)

msg := make([]byte, readThreshold)
_, err := wconn.Writer().WriteBinary(msg)
MustNil(t, err)
err = wconn.Writer().Flush()
MustNil(t, err)
_, err = wconn.Writer().WriteString("hello")
MustNil(t, err)
err = wconn.Writer().Flush()
MustNil(t, err)
t.Logf("flush final msg")

wg.Wait()
}
20 changes: 20 additions & 0 deletions docs/guide/guide_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,26 @@ func callback(connection netpoll.Connection) error {
}
```

## 8. 如何配置连接的读取阈值大小 ?

Netpoll 默认不会对端发送数据的读取速度有任何限制,每当连接有数据时,Netpoll 会尽可能快地将数据存放在自己的 buffer 中。但有时候可能用户不希望数据过快发送,或者是希望控制服务内存使用量,又或者业务 OnRequest 回调处理速度很慢需要限制发送方速度,此时可以使用 `WithReadThreshold` 来控制读取的最大阈值。

### Client 侧使用

```
dialer := netpoll.NewDialer(netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1)) // 1GB
conn, _ = dialer.DialConnection(network, address, timeout)
```

### Server 侧使用

```
eventLoop, _ := netpoll.NewEventLoop(
handle,
netpoll.WithReadThreshold(1024 * 1024 * 1024 * 1), // 1GB
)
```

# 注意事项

## 1. 错误设置 NumLoops
Expand Down
Loading

0 comments on commit d5b0914

Please sign in to comment.