diff --git a/pipe.go b/pipe.go index af1dffa1..9607a569 100644 --- a/pipe.go +++ b/pipe.go @@ -217,6 +217,11 @@ func (p *pipe) _background() { } go func() { exit(p._backgroundWrite()) + for atomic.LoadInt32(&p.waits) != 0 { + if _, _, ch := p.queue.NextWriteCmd(); ch == nil { + runtime.Gosched() + } + } close(wait) }() { @@ -228,7 +233,6 @@ func (p *pipe) _background() { atomic.AddInt32(&p.waits, -1) }() } - <-wait p.nsubs.Close() p.psubs.Close() @@ -253,7 +257,6 @@ func (p *pipe) _background() { p.onInvalidations(nil) } for atomic.LoadInt32(&p.waits) != 0 { - p.queue.NextWriteCmd() if ones[0], multi, ch, cond = p.queue.NextResultCh(); ch != nil { if multi == nil { multi = ones @@ -269,6 +272,7 @@ func (p *pipe) _background() { runtime.Gosched() } } + <-wait atomic.StoreInt32(&p.state, 4) } diff --git a/pipe_test.go b/pipe_test.go index c8026bd9..af15b4c8 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -2445,6 +2445,33 @@ func TestExitOnWriteMultiError(t *testing.T) { } } +func TestExitOnRingFullAndConnError(t *testing.T) { + p, mock, _, closeConn := setup(t, ClientOption{ + RingScaleEachConn: 1, + }) + p.background() + + // fill the ring + for i := 0; i < len(p.queue.(*ring).store); i++ { + go func() { + if err := p.Do(context.Background(), cmds.NewCompleted([]string{"GET", "a"})).Error(); err != io.EOF && !strings.HasPrefix(err.Error(), "io:") { + t.Errorf("unexpected result, expected io err, got %v", err) + } + }() + } + // let writer loop over the ring + for i := 0; i < len(p.queue.(*ring).store); i++ { + mock.Expect("GET", "a") + } + + time.Sleep(time.Second) // make sure the writer is waiting for the next write + closeConn() + + if err := p.Do(context.Background(), cmds.NewCompleted([]string{"GET", "a"})).Error(); err != io.EOF && !strings.HasPrefix(err.Error(), "io:") { + t.Errorf("unexpected result, expected io err, got %v", err) + } +} + func TestExitAllGoroutineOnWriteError(t *testing.T) { conn, mock, _, closeConn := setup(t, ClientOption{})