Skip to content

Commit

Permalink
Correctly handle select on multiple channels in Queues
Browse files Browse the repository at this point in the history
There are a few places in FlushQueueWithContext which make an
incorrect assumption about how `select` on multiple channels works.

The problem is best expressed by looking at the following example:

```go
package main

import "fmt"

func main() {
    closedChan := make(chan struct{})
    close(closedChan)
    toClose := make(chan struct{})
    count := 0

    for {
        select {
        case <-closedChan:
            count++
            fmt.Println(count)
            if count == 2 {
                close(toClose)
            }
        case <-toClose:
            return
        }
    }
}
```

This PR double-checks that the contexts are closed outside of checking
if there is data in the dataChan. It also rationalises the WorkerPool
FlushWithContext because the previous implementation failed to handle
pausing correctly. This will probably fix the underlying problem in
 go-gitea#22145

Fix go-gitea#22145

Signed-off-by: Andrew Thornton <art27@cantab.net>
  • Loading branch information
zeripath committed Dec 15, 2022
1 parent 3243dbe commit 5afa604
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 57 deletions.
26 changes: 0 additions & 26 deletions modules/queue/queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,32 +109,6 @@ func (q *ChannelQueue) Flush(timeout time.Duration) error {
return q.FlushWithContext(ctx)
}

// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
log.Trace("ChannelQueue: %d Flush", q.qid)
paused, _ := q.IsPausedIsResumed()
for {
select {
case <-paused:
return nil
case data, ok := <-q.dataChan:
if !ok {
return nil
}
if unhandled := q.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
}
atomic.AddInt64(&q.numInQueue, -1)
case <-q.baseCtx.Done():
return q.baseCtx.Err()
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
}

// Shutdown processing from this queue
func (q *ChannelQueue) Shutdown() {
q.lock.Lock()
Expand Down
30 changes: 0 additions & 30 deletions modules/queue/unique_queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"runtime/pprof"
"sync"
"sync/atomic"
"time"

"code.gitea.io/gitea/modules/container"
Expand Down Expand Up @@ -167,35 +166,6 @@ func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error {
return q.FlushWithContext(ctx)
}

// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
log.Trace("ChannelUniqueQueue: %d Flush", q.qid)
paused, _ := q.IsPausedIsResumed()
for {
select {
case <-paused:
return nil
default:
}
select {
case data, ok := <-q.dataChan:
if !ok {
return nil
}
if unhandled := q.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
}
atomic.AddInt64(&q.numInQueue, -1)
case <-q.baseCtx.Done():
return q.baseCtx.Err()
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}
}

// Shutdown processing from this queue
func (q *ChannelUniqueQueue) Shutdown() {
log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
Expand Down
44 changes: 43 additions & 1 deletion modules/queue/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,13 +463,43 @@ func (p *WorkerPool) IsEmpty() bool {
return atomic.LoadInt64(&p.numInQueue) == 0
}

// contextError returns either ctx.Done(), the base context's error or nil
func (p *WorkerPool) contextError(ctx context.Context) error {
select {
case <-p.baseCtx.Done():
return p.baseCtx.Err()
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}

// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
// NB: The worker will not be registered with the manager.
func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
log.Trace("WorkerPool: %d Flush", p.qid)
paused, _ := p.IsPausedIsResumed()
for {
// Because select will return any case that is satisified at random we precheck here before looking at dataChan.
select {
case <-paused:
// Ensure that even if paused that the cancelled error is still sent
return p.contextError(ctx)
case <-p.baseCtx.Done():
return p.baseCtx.Err()
case <-ctx.Done():
return ctx.Err()
default:
}

select {
case data := <-p.dataChan:
case <-paused:
return p.contextError(ctx)
case data, ok := <-p.dataChan:
if !ok {
return nil
}
if unhandled := p.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", p.qid)
}
Expand All @@ -495,6 +525,7 @@ func (p *WorkerPool) doWork(ctx context.Context) {
paused, _ := p.IsPausedIsResumed()
data := make([]Data, 0, p.batchLength)
for {
// Because select will return any case that is satisified at random we precheck here before looking at dataChan.
select {
case <-paused:
log.Trace("Worker for Queue %d Pausing", p.qid)
Expand All @@ -515,8 +546,19 @@ func (p *WorkerPool) doWork(ctx context.Context) {
log.Trace("Worker shutting down")
return
}
case <-ctx.Done():
if len(data) > 0 {
log.Trace("Handling: %d data, %v", len(data), data)
if unhandled := p.handle(data...); unhandled != nil {
log.Error("Unhandled Data in queue %d", p.qid)
}
atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
}
log.Trace("Worker shutting down")
return
default:
}

select {
case <-paused:
// go back around
Expand Down

0 comments on commit 5afa604

Please sign in to comment.