diff --git a/conn.go b/conn.go index d58ca15..cdb2224 100644 --- a/conn.go +++ b/conn.go @@ -174,7 +174,12 @@ func (c *conn) reader(ctx context.Context) { discardMU := el.Value.(connMarshalerUnmarshaler) if err = resp3.Unmarshal(c.br, discardMU.unmarshalInto, c.rOpts); err != nil { - break + // Ignore RESP errors. + if !errors.As(err, &resp3.SimpleError{}) && !errors.As(err, &resp3.BlobError{}) { + break + } + + err = nil } discardList.Remove(el) diff --git a/conn_test.go b/conn_test.go index 52d76bc..73abce0 100644 --- a/conn_test.go +++ b/conn_test.go @@ -268,6 +268,96 @@ func TestConnDeadlineExceeded(t *T) { t.Logf("successes:%d timeouts:%d closed:%d", numSuccesses, numTimeouts, numClosed) }) + t.Run("error", func(t *T) { + const pe, ps, n = 5, 5, 100 + const initTimeout = time.Second + conn := dial() + defer conn.Close() + ctx := testCtx(t) + + var wg sync.WaitGroup + wg.Add(pe + ps) + + var numTimeouts, numClosed uint64 + + for i := 0; i < pe; i++ { + go func() { + timeout := initTimeout + defer wg.Done() + for i := 0; i < n; i++ { + if err := ctx.Err(); err != nil { + panic(err) + } + + innerCtx, cancel := context.WithTimeout(ctx, mkTimeout(timeout)) + into := "" + err := conn.Do(innerCtx, Cmd(&into, "EVAL", "return redis.error_reply('NOTOK')", "0")) + cancel() + + // We want to see a timeout that happens while the + // the next goroutine in the other loop is already running. + timeout /= 2 + + if !assert.NotNil(t, err) { + return + } + + if !errors.As(err, &resp3.SimpleError{}) { + isTimeout := errors.Is(err, context.DeadlineExceeded) + isClosed := errors.Is(err, proc.ErrClosed) || + errors.Is(err, net.ErrClosed) + if !assert.True(t, isTimeout || isClosed, "err:%v", err) { + return + } + if isTimeout { + atomic.AddUint64(&numTimeouts, 1) + } + if isClosed { + atomic.AddUint64(&numClosed, 1) + } + return + } + } + }() + } + + for i := 0; i < ps; i++ { + go func() { + defer wg.Done() + for i := 0; i < n; i++ { + if err := ctx.Err(); err != nil { + panic(err) + } + + into := "" + err := conn.Do(ctx, Cmd(&into, "EVAL", "return redis.status_reply('OK')", "0")) + + if err != nil { + // If we see a RESP error value then the results from + // this command and the previous one have been mixed + // up. + if !assert.False(t, errors.As(err, &resp3.SimpleError{})) { + return + } + + isTimeout := errors.Is(err, context.DeadlineExceeded) + isClosed := errors.Is(err, proc.ErrClosed) || + errors.Is(err, net.ErrClosed) + if !assert.True(t, isTimeout || isClosed, "err:%v", err) { + return + } + } else if !assert.Equal(t, "OK", into) { + return + } + } + }() + } + + wg.Wait() + assert.NotZero(t, numTimeouts, "number of timeouts") + t.Logf("timeouts:%d closed:%d", numTimeouts, numClosed) + }) + t.Run("pubsub", func(t *T) { const n = 100