Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: clamp the event-loops in ET mode to avaoid starving #599

Merged
merged 2 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connection_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
// 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF.
err = el.write(c)
default:
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
c.outboundBuffer.Release() // don't bother to write to a connection that is already broken

Check warning on line 53 in connection_bsd.go

View check run for this annotation

Codecov / codecov/patch

connection_bsd.go#L53

Added line #L53 was not covered by tests
err = el.close(c, io.EOF)
}
}
Expand Down
8 changes: 4 additions & 4 deletions connection_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
el := c.loop
// First check for any unexpected non-IO events.
// For these events we just close the connection directly.
if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 {
c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error
if ev&(netpoll.ErrEvents|unix.EPOLLRDHUP) != 0 && ev&netpoll.ReadWriteEvents == 0 {
c.outboundBuffer.Release() // don't bother to write to a connection that is already broken

Check warning on line 33 in connection_linux.go

View check run for this annotation

Codecov / codecov/patch

connection_linux.go#L33

Added line #L33 was not covered by tests
return el.close(c, io.EOF)
}
// Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority
Expand All @@ -43,14 +43,14 @@
// to the remote first and then close the connection.
//
// We perform eventloop.write for EPOLLOUT because it can take good care of either case.
if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 {
if ev&(netpoll.WriteEvents|netpoll.ErrEvents) != 0 {
if err := el.write(c); err != nil {
return err
}
}
// Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in
// the socket buffer.
if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 {
if ev&(netpoll.ReadEvents|netpoll.ErrEvents) != 0 {
if err := el.read(c); err != nil {
return err
}
Expand Down
41 changes: 36 additions & 5 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,18 @@
return el.handleAction(c, action)
}

func (el *eventloop) read0(itf interface{}) error {
return el.read(itf.(*conn))

Check warning on line 118 in eventloop_unix.go

View check run for this annotation

Codecov / codecov/patch

eventloop_unix.go#L117-L118

Added lines #L117 - L118 were not covered by tests
}

const maxBytesTransferET = 1 << 22

func (el *eventloop) read(c *conn) error {
if !c.opened {
return nil
}

var recv int
isET := el.engine.opts.EdgeTriggeredIO
loop:
n, err := unix.Read(c.fd, el.buffer)
Expand All @@ -131,6 +138,7 @@
}
return el.close(c, os.NewSyscallError("read", err))
}
recv += n

c.buffer = el.buffer[:n]
action := el.eventHandler.OnTraffic(c)
Expand All @@ -144,13 +152,25 @@
_, _ = c.inboundBuffer.Write(c.buffer)
c.buffer = c.buffer[:0]

if isET || c.isEOF {
if c.isEOF || (isET && recv < maxBytesTransferET) {
goto loop
}

// To prevent infinite reading in ET mode and starving other events,
// we need to set up threshold for the maximum read bytes per connection
// on each event-loop. If the threshold is reached and there are still
// unread data in the socket buffer, we must issue another read event manually.
if isET && n == len(el.buffer) {
return el.poller.Trigger(queue.LowPriority, el.read0, c)

Check warning on line 164 in eventloop_unix.go

View check run for this annotation

Codecov / codecov/patch

eventloop_unix.go#L164

Added line #L164 was not covered by tests
}

return nil
}

func (el *eventloop) write0(itf interface{}) error {
return el.write(itf.(*conn))

Check warning on line 171 in eventloop_unix.go

View check run for this annotation

Codecov / codecov/patch

eventloop_unix.go#L170-L171

Added lines #L170 - L171 were not covered by tests
}

// The default value of UIO_MAXIOV/IOV_MAX is 1024 on Linux and most BSD-like OSs.
const iovMax = 1024

Expand All @@ -161,8 +181,9 @@

isET := el.engine.opts.EdgeTriggeredIO
var (
n int
err error
n int
sent int
err error
)
loop:
iov, _ := c.outboundBuffer.Peek(-1)
Expand All @@ -182,14 +203,24 @@
default:
return el.close(c, os.NewSyscallError("write", err))
}
if isET && !c.outboundBuffer.IsEmpty() {
sent += n

if isET && !c.outboundBuffer.IsEmpty() && sent < maxBytesTransferET {
goto loop
}

// All data have been sent, it's no need to monitor the writable events for LT mode,
// remove the writable event from poller to help the future event-loops if necessary.
if !isET && c.outboundBuffer.IsEmpty() {
_ = el.poller.ModRead(&c.pollAttachment, false)
return el.poller.ModRead(&c.pollAttachment, false)
}

// To prevent infinite writing in ET mode and starving other events,
// we need to set up threshold for the maximum write bytes per connection
// on each event-loop. If the threshold is reached and there are still
// pending data to write, we must issue another write event manually.
if isET && !c.outboundBuffer.IsEmpty() {
return el.poller.Trigger(queue.HighPriority, el.write0, c)

Check warning on line 223 in eventloop_unix.go

View check run for this annotation

Codecov / codecov/patch

eventloop_unix.go#L223

Added line #L223 was not covered by tests
}

return nil
Expand Down
30 changes: 15 additions & 15 deletions eventloop_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var (
func BenchmarkGC4El100k(b *testing.B) {
oldGc := debug.SetGCPercent(-1)

ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 100000)
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 100000)
b.Run("Run-4-eventloop-100000", func(b *testing.B) {
for i := 0; i < b.N; i++ {
runtime.GC()
Expand All @@ -62,7 +62,7 @@ func BenchmarkGC4El100k(b *testing.B) {
func BenchmarkGC4El200k(b *testing.B) {
oldGc := debug.SetGCPercent(-1)

ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 200000)
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 200000)
b.Run("Run-4-eventloop-200000", func(b *testing.B) {
for i := 0; i < b.N; i++ {
runtime.GC()
Expand All @@ -76,7 +76,7 @@ func BenchmarkGC4El200k(b *testing.B) {
func BenchmarkGC4El500k(b *testing.B) {
oldGc := debug.SetGCPercent(-1)

ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 500000)
ts1 := benchServeGC(b, "tcp", ":0", true, 4, 500000)
b.Run("Run-4-eventloop-500000", func(b *testing.B) {
for i := 0; i < b.N; i++ {
runtime.GC()
Expand Down Expand Up @@ -146,73 +146,73 @@ func TestServeGC(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 1, 10000)
testServeGC(t, "tcp", ":0", true, true, 1, 10000)
})
t.Run("1-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 1, 100000)
testServeGC(t, "tcp", ":0", true, true, 1, 100000)
})
t.Run("1-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 1, 1000000)
testServeGC(t, "tcp", ":0", true, true, 1, 1000000)
})
t.Run("2-loop-10000", func(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 2, 10000)
testServeGC(t, "tcp", ":0", true, true, 2, 10000)
})
t.Run("2-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 2, 100000)
testServeGC(t, "tcp", ":0", true, true, 2, 100000)
})
t.Run("2-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 2, 1000000)
testServeGC(t, "tcp", ":0", true, true, 2, 1000000)
})
t.Run("4-loop-10000", func(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 4, 10000)
testServeGC(t, "tcp", ":0", true, true, 4, 10000)
})
t.Run("4-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 4, 100000)
testServeGC(t, "tcp", ":0", true, true, 4, 100000)
})
t.Run("4-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 4, 1000000)
testServeGC(t, "tcp", ":0", true, true, 4, 1000000)
})
t.Run("16-loop-10000", func(t *testing.T) {
if testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 16, 10000)
testServeGC(t, "tcp", ":0", true, true, 16, 10000)
})
t.Run("16-loop-100000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 16, 100000)
testServeGC(t, "tcp", ":0", true, true, 16, 100000)
})
t.Run("16-loop-1000000", func(t *testing.T) {
if !testBigGC {
t.Skipf("Skip when testBigGC=%t", testBigGC)
}
testServeGC(t, "tcp", ":9000", true, true, 16, 1000000)
testServeGC(t, "tcp", ":0", true, true, 16, 1000000)
})
})
}
Expand Down
11 changes: 8 additions & 3 deletions internal/netpoll/defs_poller_epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ const (
MinPollEventsCap = 32
// MaxAsyncTasksAtOneTime is the maximum amount of asynchronous tasks that the event-loop will process at one time.
MaxAsyncTasksAtOneTime = 256
// ErrEvents represents exceptional events that are not read/write, like socket being closed,
// reading/writing from/to a closed socket, etc.
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP
// ReadEvents represents readable events that are polled by epoll.
ReadEvents = unix.EPOLLIN | unix.EPOLLPRI
// WriteEvents represents writeable events that are polled by epoll.
WriteEvents = unix.EPOLLOUT
// ReadWriteEvents represents both readable and writeable events.
ReadWriteEvents = ReadEvents | WriteEvents
// ErrEvents represents exceptional events that occurred on the local side.
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP
)

type eventList struct {
Expand Down
26 changes: 10 additions & 16 deletions internal/netpoll/poller_epoll_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,57 +193,51 @@ func (p *Poller) Polling(callback PollEventHandler) error {
}
}

const (
readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP
writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP
readWriteEvents = readEvents | writeEvents
)

// AddReadWrite registers the given file-descriptor with readable and writable events to the poller.
func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readWriteEvents
var ev uint32 = ReadWriteEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl add",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// AddRead registers the given file-descriptor with readable event to the poller.
func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readEvents
var ev uint32 = ReadEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl add",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// AddWrite registers the given file-descriptor with writable event to the poller.
func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = writeEvents
var ev uint32 = WriteEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl add",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// ModRead renews the given file-descriptor with readable event in the poller.
func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readEvents
var ev uint32 = ReadEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl mod",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
}

// ModReadWrite renews the given file-descriptor with readable and writable events in the poller.
func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev uint32 = readWriteEvents
var ev uint32 = ReadWriteEvents
if edgeTriggered {
ev |= unix.EPOLLET
ev |= unix.EPOLLET | unix.EPOLLRDHUP
}
return os.NewSyscallError("epoll_ctl mod",
unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev}))
Expand Down
26 changes: 10 additions & 16 deletions internal/netpoll/poller_epoll_ultimate.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,12 @@ func (p *Poller) Polling() error {
}
}

const (
readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP
writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP
readWriteEvents = readEvents | writeEvents
)

// AddReadWrite registers the given file-descriptor with readable and writable events to the poller.
func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = readWriteEvents
ev.events = ReadWriteEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev))
Expand All @@ -215,9 +209,9 @@ func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error {
// AddRead registers the given file-descriptor with readable event to the poller.
func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = readEvents
ev.events = ReadEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev))
Expand All @@ -226,9 +220,9 @@ func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error {
// AddWrite registers the given file-descriptor with writable event to the poller.
func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = writeEvents
ev.events = WriteEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev))
Expand All @@ -237,9 +231,9 @@ func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error {
// ModRead renews the given file-descriptor with readable event in the poller.
func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = readEvents
ev.events = ReadEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev))
Expand All @@ -248,9 +242,9 @@ func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error {
// ModReadWrite renews the given file-descriptor with readable and writable events in the poller.
func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error {
var ev epollevent
ev.events = readWriteEvents
ev.events = ReadWriteEvents
if edgeTriggered {
ev.events |= unix.EPOLLET
ev.events |= unix.EPOLLET | unix.EPOLLRDHUP
}
convertPollAttachment(unsafe.Pointer(&ev.data), pa)
return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev))
Expand Down
Loading