From db9017a8a5229b2a720103ef78806ffdaa38e2e6 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 12 Feb 2022 20:31:31 -0700 Subject: [PATCH] broker: allow the discard goroutine to die See embedded comments; when acks == 0, we can now free up a few more resources. Notably I am aiming to use this improvement in an upcoming commit. --- pkg/kgo/broker.go | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/pkg/kgo/broker.go b/pkg/kgo/broker.go index ed309ebf..64bf8a32 100644 --- a/pkg/kgo/broker.go +++ b/pkg/kgo/broker.go @@ -9,6 +9,7 @@ import ( "io" "math" "net" + "os" "strconv" "sync" "sync/atomic" @@ -1202,10 +1203,15 @@ func (cxn *brokerCxn) waitResp(pr promisedResp) { // (5) we set a read deadline *after* the size bytes are read, and only if the // client has not yet closed. func (cxn *brokerCxn) discard() { - defer cxn.die() + var firstTimeout bool + defer func() { + if !firstTimeout { // see below + cxn.die() + } + }() discardBuf := make([]byte, 256) - for { + for i := 0; ; i++ { var ( nread int err error @@ -1216,10 +1222,35 @@ func (cxn *brokerCxn) discard() { readDone = make(chan struct{}) ) - cxn.conn.SetReadDeadline(time.Time{}) + + // On all but the first request, we use no deadline. We could + // be hanging reading while we wait for more produce requests. + // We know we are talking to azure when i > 0 and we should not + // quit this goroutine. + // + // However, on the *first* produce request, we know that we are + // writing *right now*. We can deadline our read side with + // ample overhead, and if this first read hits the deadline, + // then we can quit this discard / read goroutine with no + // problems. + // + // We choose 3x our timeouts: + // - first we cover the write, connTimeoutOverhead + produceTimeout + // - then we cover the read, connTimeoutOverhead + // - then we throw in another connTimeoutOverhead just to be sure + // + deadline := time.Time{} + if i == 0 { + deadline = time.Now().Add(3*cxn.cl.cfg.requestTimeoutOverhead + cxn.cl.cfg.produceTimeout) + } + cxn.conn.SetReadDeadline(deadline) + go func() { defer close(readDone) if nread, err = io.ReadFull(cxn.conn, discardBuf[:4]); err != nil { + if i == 0 && errors.Is(err, os.ErrDeadlineExceeded) { + firstTimeout = true + } return } deadlineMu.Lock()