Skip to content

Commit

Permalink
fix: trigger write event only when flush with read throttled
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Jan 10, 2024
1 parent 8b331dd commit a9d6975
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 6 deletions.
13 changes: 8 additions & 5 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (c *connection) waitRead(n int) (err error) {
goto CLEANUP
}
// wait full n
for c.inputBuffer.Len() < n {
for c.inputBuffer.Len() < n && err == nil {
switch c.status(closing) {
case poller:
err = Exception(ErrEOF, "wait read")
Expand All @@ -424,9 +424,6 @@ func (c *connection) waitRead(n int) (err error) {
default:
err = <-c.readTrigger
}
if err != nil {
goto CLEANUP
}
}
CLEANUP:
atomic.StoreInt64(&c.waitReadSize, 0)
Expand Down Expand Up @@ -506,7 +503,13 @@ func (c *connection) flush() error {
if c.outputBuffer.IsEmpty() {
return nil
}
err = c.operator.Control(PollR2RW)
if c.operator.getMode() == ophup {
// triggered read throttled, so here shouldn't trigger read event again
err = c.operator.Control(PollHup2W)
} else {
err = c.operator.Control(PollR2RW)
}
c.operator.done()
if err != nil {
return Exception(err, "when flush")
}
Expand Down
4 changes: 3 additions & 1 deletion poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,7 @@ const (
// PollR2Hup is used to remove the readable monitor of FDOperator.
PollR2Hup PollEvent = 0x9
// PollHup2R is used to add the readable monitor of FDOperator, generally used with PollR2Hup.
PollHup2R PollEvent = 0x10
PollHup2R PollEvent = 0xA
// PollHup2W is used to add the writeable monitor of FDOperator.
PollHup2W PollEvent = 0xB
)
3 changes: 3 additions & 0 deletions poll_default_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
case PollHup2R:
operator.setMode(opread)
evs[0].Filter, evs[0].Flags = syscall.EVFILT_READ, syscall.EV_ADD|syscall.EV_ENABLE
case PollHup2W:
operator.setMode(opwrite)
evs[0].Filter, evs[0].Flags = syscall.EVFILT_WRITE, syscall.EV_ADD|syscall.EV_ENABLE
}
_, err := syscall.Kevent(p.fd, evs, nil, nil)
return err
Expand Down
3 changes: 3 additions & 0 deletions poll_default_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
case PollHup2R:
operator.setMode(opread)
op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
case PollHup2W:
operator.setMode(opwrite)
op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
}
return EpollCtl(p.fd, op, operator.FD, &evt)
}

0 comments on commit a9d6975

Please sign in to comment.