Skip to content

Commit

Permalink
broker: allow the discard goroutine to die
Browse files Browse the repository at this point in the history
See embedded comments; when acks == 0, we can now free up a few more
resources. Notably I am aiming to use this improvement in an upcoming
commit.
  • Loading branch information
twmb committed Feb 13, 2022
1 parent cd53e97 commit db9017a
Showing 1 changed file with 34 additions and 3 deletions.
37 changes: 34 additions & 3 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"math"
"net"
"os"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1202,10 +1203,15 @@ func (cxn *brokerCxn) waitResp(pr promisedResp) {
// (5) we set a read deadline *after* the size bytes are read, and only if the
// client has not yet closed.
func (cxn *brokerCxn) discard() {
defer cxn.die()
var firstTimeout bool
defer func() {
if !firstTimeout { // see below
cxn.die()
}
}()

discardBuf := make([]byte, 256)
for {
for i := 0; ; i++ {
var (
nread int
err error
Expand All @@ -1216,10 +1222,35 @@ func (cxn *brokerCxn) discard() {

readDone = make(chan struct{})
)
cxn.conn.SetReadDeadline(time.Time{})

// On all but the first request, we use no deadline. We could
// be hanging reading while we wait for more produce requests.
// We know we are talking to azure when i > 0 and we should not
// quit this goroutine.
//
// However, on the *first* produce request, we know that we are
// writing *right now*. We can deadline our read side with
// ample overhead, and if this first read hits the deadline,
// then we can quit this discard / read goroutine with no
// problems.
//
// We choose 3x our timeouts:
// - first we cover the write, connTimeoutOverhead + produceTimeout
// - then we cover the read, connTimeoutOverhead
// - then we throw in another connTimeoutOverhead just to be sure
//
deadline := time.Time{}
if i == 0 {
deadline = time.Now().Add(3*cxn.cl.cfg.requestTimeoutOverhead + cxn.cl.cfg.produceTimeout)
}
cxn.conn.SetReadDeadline(deadline)

go func() {
defer close(readDone)
if nread, err = io.ReadFull(cxn.conn, discardBuf[:4]); err != nil {
if i == 0 && errors.Is(err, os.ErrDeadlineExceeded) {
firstTimeout = true
}
return
}
deadlineMu.Lock()
Expand Down

0 comments on commit db9017a

Please sign in to comment.