Skip to content

Commit

Permalink
ref(boost): apply channel refactorings (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Sep 1, 2023
1 parent 6ee37d8 commit 7034927
Showing 1 changed file with 11 additions and 12 deletions.
23 changes: 11 additions & 12 deletions boost/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,16 @@ type privateWpInfo[I, O any] struct {
type WorkerPool[I, O any] struct {
private privateWpInfo[I, O]
exec ExecutiveFunc[I, O]
RoutineName GoRoutineName
noWorkers int
SourceJobsChIn JobStreamR[I]

WaitAQ AnnotatedWgAQ
sourceJobsChIn JobStream[I]
RoutineName GoRoutineName
WaitAQ AnnotatedWgAQ
}

type NewWorkerPoolParams[I, O any] struct {
NoWorkers int
Exec ExecutiveFunc[I, O]
JobsCh chan Job[I]
JobsCh JobStream[I]
CancelCh CancelStream
WaitAQ AnnotatedWgAQ
}
Expand All @@ -66,14 +65,14 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O
wp := &WorkerPool[I, O]{
private: privateWpInfo[I, O]{
pool: make(workersCollection[I, O], noWorkers),
workersJobsCh: make(chan Job[I], noWorkers),
workersJobsCh: make(JobStream[I], noWorkers),
finishedCh: make(FinishedStream, noWorkers),
cancelCh: params.CancelCh,
},
exec: params.Exec,
RoutineName: GoRoutineName("🧊 worker pool"),
noWorkers: noWorkers,
SourceJobsChIn: params.JobsCh,
sourceJobsChIn: params.JobsCh,

WaitAQ: params.WaitAQ,
}
Expand Down Expand Up @@ -124,7 +123,7 @@ func (p *WorkerPool[I, O]) run(

running = false

case job, ok := <-p.SourceJobsChIn:
case job, ok := <-p.sourceJobsChIn:
if ok {
fmt.Printf("===> 🧊 (#workers: '%v') WorkerPool.run - new job received\n",
len(p.private.pool),
Expand All @@ -146,11 +145,11 @@ func (p *WorkerPool[I, O]) run(
}
} else {
// ⚠️ This close is essential. Since the pool acts as a bridge between
// 2 channels (p.SourceJobsChIn and p.private.workersJobsCh), when the
// producer closes p.SourceJobsChIn, we need to delegate that closure
// to p.private.workersJobsCh, otherwise we end up in a deadlock.
// 2 channels (p.sourceJobsChIn and p.private.workersJobsCh/forwardChOut),
// when the producer closes p.sourceJobsChIn, we need to delegate that
// closure to forwardChOut, otherwise we end up in a deadlock.
//
close(p.private.workersJobsCh)
close(forwardChOut)
fmt.Printf("===> 🚀 WorkerPool.run(source jobs chan closed) 🟥🟥🟥\n")
running = false
}
Expand Down

0 comments on commit 7034927

Please sign in to comment.