Skip to content

Commit 53b7d6a

Browse files
authored
Merge pull request #113 from rueian/fix-shutdown-deadlock-ring-full
fix: shutdown deadlock of pipe when its ring is full (#108)
2 parents 9153856 + b26c3ba commit 53b7d6a

File tree

2 files changed

+33
-2
lines changed

2 files changed

+33
-2
lines changed

pipe.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,11 @@ func (p *pipe) _background() {
217217
}
218218
go func() {
219219
exit(p._backgroundWrite())
220+
for atomic.LoadInt32(&p.waits) != 0 {
221+
if _, _, ch := p.queue.NextWriteCmd(); ch == nil {
222+
runtime.Gosched()
223+
}
224+
}
220225
close(wait)
221226
}()
222227
{
@@ -228,7 +233,6 @@ func (p *pipe) _background() {
228233
atomic.AddInt32(&p.waits, -1)
229234
}()
230235
}
231-
<-wait
232236

233237
p.nsubs.Close()
234238
p.psubs.Close()
@@ -253,7 +257,6 @@ func (p *pipe) _background() {
253257
p.onInvalidations(nil)
254258
}
255259
for atomic.LoadInt32(&p.waits) != 0 {
256-
p.queue.NextWriteCmd()
257260
if ones[0], multi, ch, cond = p.queue.NextResultCh(); ch != nil {
258261
if multi == nil {
259262
multi = ones
@@ -269,6 +272,7 @@ func (p *pipe) _background() {
269272
runtime.Gosched()
270273
}
271274
}
275+
<-wait
272276
atomic.StoreInt32(&p.state, 4)
273277
}
274278

pipe_test.go

+27
Original file line numberDiff line numberDiff line change
@@ -2445,6 +2445,33 @@ func TestExitOnWriteMultiError(t *testing.T) {
24452445
}
24462446
}
24472447

2448+
func TestExitOnRingFullAndConnError(t *testing.T) {
2449+
p, mock, _, closeConn := setup(t, ClientOption{
2450+
RingScaleEachConn: 1,
2451+
})
2452+
p.background()
2453+
2454+
// fill the ring
2455+
for i := 0; i < len(p.queue.(*ring).store); i++ {
2456+
go func() {
2457+
if err := p.Do(context.Background(), cmds.NewCompleted([]string{"GET", "a"})).Error(); err != io.EOF && !strings.HasPrefix(err.Error(), "io:") {
2458+
t.Errorf("unexpected result, expected io err, got %v", err)
2459+
}
2460+
}()
2461+
}
2462+
// let writer loop over the ring
2463+
for i := 0; i < len(p.queue.(*ring).store); i++ {
2464+
mock.Expect("GET", "a")
2465+
}
2466+
2467+
time.Sleep(time.Second) // make sure the writer is waiting for the next write
2468+
closeConn()
2469+
2470+
if err := p.Do(context.Background(), cmds.NewCompleted([]string{"GET", "a"})).Error(); err != io.EOF && !strings.HasPrefix(err.Error(), "io:") {
2471+
t.Errorf("unexpected result, expected io err, got %v", err)
2472+
}
2473+
}
2474+
24482475
func TestExitAllGoroutineOnWriteError(t *testing.T) {
24492476
conn, mock, _, closeConn := setup(t, ClientOption{})
24502477

0 commit comments

Comments
 (0)