Skip to content

Commit

Permalink
opt: eliminate the inuse eventloop.cache and memory leaks for idle co…
Browse files Browse the repository at this point in the history
…nnections (#660)

Fixes #659
Updates #420
  • Loading branch information
panjf2000 authored Nov 16, 2024
1 parent db2fad2 commit 451f015
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 63 deletions.
45 changes: 20 additions & 25 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type conn struct {
pollAttachment netpoll.PollAttachment // connection attachment for poller
inboundBuffer elastic.RingBuffer // buffer for leftover data from the remote
buffer []byte // buffer for the latest bytes
cache []byte // temporary cache for the inbound data
isDatagram bool // UDP protocol
opened bool // connection opened event fired
isEOF bool // whether the connection has reached EOF
Expand Down Expand Up @@ -290,6 +291,7 @@ func (c *conn) sendTo(buf []byte) error {
func (c *conn) resetBuffer() {
c.buffer = c.buffer[:0]
c.inboundBuffer.Reset()
c.inboundBuffer.Done()
}

func (c *conn) Read(p []byte) (n int, err error) {
Expand Down Expand Up @@ -325,22 +327,9 @@ func (c *conn) Next(n int) (buf []byte, err error) {
return
}

head, tail := c.inboundBuffer.Peek(n)
defer c.inboundBuffer.Discard(n) //nolint:errcheck
c.loop.cache.Reset()
c.loop.cache.Write(head)
if len(head) == n {
return c.loop.cache.Bytes(), err
}
c.loop.cache.Write(tail)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err
}

remaining := n - inBufferLen
c.loop.cache.Write(c.buffer[:remaining])
c.buffer = c.buffer[remaining:]
return c.loop.cache.Bytes(), err
buf = bsPool.Get(n)
_, err = c.Read(buf)
return
}

func (c *conn) Peek(n int) (buf []byte, err error) {
Expand All @@ -359,25 +348,31 @@ func (c *conn) Peek(n int) (buf []byte, err error) {
if len(head) == n {
return head, err
}
c.loop.cache.Reset()
c.loop.cache.Write(head)
c.loop.cache.Write(tail)
buf = bsPool.Get(n)[:0]
buf = append(buf, head...)
buf = append(buf, tail...)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err
return
}

remaining := n - inBufferLen
c.loop.cache.Write(c.buffer[:remaining])
return c.loop.cache.Bytes(), err
buf = append(buf, c.buffer[:remaining]...)
c.cache = buf
return
}

func (c *conn) Discard(n int) (int, error) {
if len(c.cache) > 0 {
bsPool.Put(c.cache)
c.cache = nil
}

inBufferLen := c.inboundBuffer.Buffered()
tempBufferLen := len(c.buffer)
if inBufferLen+tempBufferLen < n || n <= 0 {
if totalLen := inBufferLen + len(c.buffer); n >= totalLen || n <= 0 {
c.resetBuffer()
return inBufferLen + tempBufferLen, nil
return totalLen, nil
}

if c.inboundBuffer.IsEmpty() {
c.buffer = c.buffer[n:]
return n, nil
Expand Down
45 changes: 21 additions & 24 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/panjf2000/gnet/v2/pkg/buffer/elastic"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
bbPool "github.com/panjf2000/gnet/v2/pkg/pool/bytebuffer"
bsPool "github.com/panjf2000/gnet/v2/pkg/pool/byteslice"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
)

Expand Down Expand Up @@ -54,6 +55,7 @@ type conn struct {
ctx any // user-defined context
loop *eventloop // owner event-loop
buffer *bbPool.ByteBuffer // reuse memory of inbound data as a temporary buffer
cache []byte // temporary cache for the inbound data
rawConn net.Conn // original connection
localAddr net.Addr // local server addr
remoteAddr net.Addr // remote addr
Expand Down Expand Up @@ -116,6 +118,7 @@ func newUDPConn(el *eventloop, pc net.PacketConn, localAddr, remoteAddr net.Addr
func (c *conn) resetBuffer() {
c.buffer.Reset()
c.inboundBuffer.Reset()
c.inboundBuffer.Done()
}

func (c *conn) Read(p []byte) (n int, err error) {
Expand Down Expand Up @@ -149,22 +152,10 @@ func (c *conn) Next(n int) (buf []byte, err error) {
c.buffer.B = c.buffer.B[n:]
return
}
head, tail := c.inboundBuffer.Peek(n)
defer c.inboundBuffer.Discard(n) //nolint:errcheck
c.loop.cache.Reset()
c.loop.cache.Write(head)
if len(head) == n {
return c.loop.cache.Bytes(), err
}
c.loop.cache.Write(tail)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err
}

remaining := n - inBufferLen
c.loop.cache.Write(c.buffer.B[:remaining])
c.buffer.B = c.buffer.B[remaining:]
return c.loop.cache.Bytes(), err
buf = bsPool.Get(n)
_, err = c.Read(buf)
return
}

func (c *conn) Peek(n int) (buf []byte, err error) {
Expand All @@ -181,25 +172,31 @@ func (c *conn) Peek(n int) (buf []byte, err error) {
if len(head) == n {
return head, err
}
c.loop.cache.Reset()
c.loop.cache.Write(head)
c.loop.cache.Write(tail)
buf = bsPool.Get(n)[:0]
buf = append(buf, head...)
buf = append(buf, tail...)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err
return
}

remaining := n - inBufferLen
c.loop.cache.Write(c.buffer.B[:remaining])
return c.loop.cache.Bytes(), err
buf = append(buf, c.buffer.B[:remaining]...)
c.cache = buf
return
}

func (c *conn) Discard(n int) (int, error) {
if len(c.cache) > 0 {
bsPool.Put(c.cache)
c.cache = nil
}

inBufferLen := c.inboundBuffer.Buffered()
tempBufferLen := c.buffer.Len()
if inBufferLen+tempBufferLen < n || n <= 0 {
if totalLen := inBufferLen + c.buffer.Len(); n >= totalLen || n <= 0 {
c.resetBuffer()
return inBufferLen + tempBufferLen, nil
return totalLen, nil
}

if c.inboundBuffer.IsEmpty() {
c.buffer.B = c.buffer.B[n:]
return n, nil
Expand Down
2 changes: 0 additions & 2 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package gnet

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -39,7 +38,6 @@ import (
type eventloop struct {
listeners map[int]*listener // listeners
idx int // loop index in the engine loops list
cache bytes.Buffer // temporary buffer for scattered bytes
engine *engine // engine in loop
poller *netpoll.Poller // epoll or kqueue
buffer []byte // read packet buffer whose capacity is set by user, default value is 64KB
Expand Down
2 changes: 0 additions & 2 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package gnet

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -31,7 +30,6 @@ type eventloop struct {
ch chan any // channel for event-loop
idx int // index of event-loop in event-loops
eng *engine // engine in loop
cache bytes.Buffer // temporary buffer for scattered bytes
connCount int32 // number of active connections in event-loop
connections map[*conn]struct{} // TCP connection map: fd -> conn
eventHandler EventHandler // user eventHandler
Expand Down
18 changes: 8 additions & 10 deletions pkg/buffer/linkedlist/linked_list_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func (b *node) len() int {

// Buffer is a linked list of node.
type Buffer struct {
bs [][]byte
head *node
tail *node
size int
Expand Down Expand Up @@ -123,19 +122,19 @@ func (llb *Buffer) Peek(maxBytes int) ([][]byte, error) {
} else if maxBytes > llb.Buffered() {
return nil, io.ErrShortBuffer
}
llb.bs = llb.bs[:0]
var bs [][]byte
var cum int
for iter := llb.head; iter != nil; iter = iter.next {
offset := iter.len()
if cum+offset > maxBytes {
offset = maxBytes - cum
}
llb.bs = append(llb.bs, iter.buf[:offset])
bs = append(bs, iter.buf[:offset])
if cum += offset; cum == maxBytes {
break
}
}
return llb.bs, nil
return bs, nil
}

// PeekWithBytes is like Peek but accepts [][]byte and puts them onto head.
Expand All @@ -145,17 +144,17 @@ func (llb *Buffer) PeekWithBytes(maxBytes int, bs ...[]byte) ([][]byte, error) {
} else if maxBytes > llb.Buffered() {
return nil, io.ErrShortBuffer
}
llb.bs = llb.bs[:0]
var bss [][]byte
var cum int
for _, b := range bs {
if n := len(b); n > 0 {
offset := n
if cum+offset > maxBytes {
offset = maxBytes - cum
}
llb.bs = append(llb.bs, b[:offset])
bss = append(bss, b[:offset])
if cum += offset; cum == maxBytes {
return llb.bs, nil
return bss, nil
}
}
}
Expand All @@ -164,12 +163,12 @@ func (llb *Buffer) PeekWithBytes(maxBytes int, bs ...[]byte) ([][]byte, error) {
if cum+offset > maxBytes {
offset = maxBytes - cum
}
llb.bs = append(llb.bs, iter.buf[:offset])
bss = append(bss, iter.buf[:offset])
if cum += offset; cum == maxBytes {
break
}
}
return llb.bs, nil
return bss, nil
}

// Discard removes some nodes based on n bytes.
Expand Down Expand Up @@ -266,7 +265,6 @@ func (llb *Buffer) Reset() {
llb.tail = nil
llb.size = 0
llb.bytes = 0
llb.bs = nil
}

// pop returns and removes the head of l. If l is empty, it returns nil.
Expand Down

0 comments on commit 451f015

Please sign in to comment.