diff --git a/boost/boost-public-api.go b/boost/boost-public-api.go index db8f572..9ff957d 100644 --- a/boost/boost-public-api.go +++ b/boost/boost-public-api.go @@ -19,9 +19,20 @@ type ( JobStreamR[I any] <-chan Job[I] JobStreamW[I any] chan<- Job[I] - OutputStream[O any] chan JobOutput[O] - OutputStreamR[O any] <-chan JobOutput[O] - OutputStreamW[O any] chan<- JobOutput[O] + JobOutputStream[O any] chan JobOutput[O] + JobOutputStreamR[O any] <-chan JobOutput[O] + JobOutputStreamW[O any] chan<- JobOutput[O] + + // Duplex represents a channel with multiple views, to be used + // by clients that need to hand out different ends of the same + // channel to different entities. + Duplex[T any] struct { + Channel chan T + ReaderCh <-chan T + WriterCh chan<- T + } + + DuplexJobOutput[O any] Duplex[JobOutput[O]] CancelWorkSignal struct{} CancelStream = chan CancelWorkSignal @@ -42,3 +53,12 @@ type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error) func (f ExecutiveFunc[I, O]) Invoke(j Job[I]) (JobOutput[O], error) { return f(j) } + +// NewDuplex creates a new instance of a Duplex with all members populated +func NewDuplex[T any](channel chan T) *Duplex[T] { + return &Duplex[T]{ + Channel: channel, + ReaderCh: channel, + WriterCh: channel, + } +} diff --git a/boost/pool-defs-internal.go b/boost/pool-defs-internal.go index d61f11c..72a0283 100644 --- a/boost/pool-defs-internal.go +++ b/boost/pool-defs-internal.go @@ -19,8 +19,7 @@ type ( finishedStreamW = chan<- *workerFinishedResult workerWrapper[I any, O any] struct { - cancelChOut CancelStreamW - core *worker[I, O] + core *worker[I, O] } workersCollection[I, O any] map[workerID]*workerWrapper[I, O] diff --git a/boost/worker-pool.go b/boost/worker-pool.go index 4c7a317..0248c00 100644 --- a/boost/worker-pool.go +++ b/boost/worker-pool.go @@ -105,9 +105,14 @@ func (p *WorkerPool[I, O]) composeID() workerID { func (p *WorkerPool[I, O]) Start( parentContext context.Context, parentCancel context.CancelFunc, - outputsChOut OutputStream[O], + outputsChOut chan<- JobOutput[O], ) { - p.run(parentContext, parentCancel, p.outputChTimeout, p.private.workersJobsCh, outputsChOut) + p.run(parentContext, + parentCancel, + p.outputChTimeout, + p.private.workersJobsCh, + outputsChOut, + ) } func (p *WorkerPool[I, O]) run( @@ -115,7 +120,7 @@ func (p *WorkerPool[I, O]) run( parentCancel context.CancelFunc, outputChTimeout time.Duration, forwardChOut JobStreamW[I], - outputsChOut OutputStream[O], + outputsChOut JobOutputStreamW[O], ) { result := &PoolResult{} defer func(r *PoolResult) { @@ -202,11 +207,9 @@ func (p *WorkerPool[I, O]) spawn( parentCancel context.CancelFunc, outputChTimeout time.Duration, jobsChIn JobStreamR[I], - outputsChOut OutputStream[O], + outputsChOut JobOutputStreamW[O], finishedChOut finishedStreamW, ) { - cancelCh := make(CancelStream, 1) - w := &workerWrapper[I, O]{ core: &worker[I, O]{ id: p.composeID(), @@ -215,7 +218,6 @@ func (p *WorkerPool[I, O]) spawn( outputsChOut: outputsChOut, finishedChOut: finishedChOut, }, - cancelChOut: cancelCh, // TODO: this is not used, so delete } p.private.pool[w.core.id] = w diff --git a/boost/worker-pool_test.go b/boost/worker-pool_test.go index a2ff779..20c9d0a 100644 --- a/boost/worker-pool_test.go +++ b/boost/worker-pool_test.go @@ -32,8 +32,13 @@ func (f TerminatorFunc[I, O]) After(parentContext context.Context, const ( JobChSize = 10 OutputsChSize = 10 + ScalingFactor = 1 ) +func scale(t time.Duration) time.Duration { + return t / ScalingFactor +} + var defaults = struct { noOfWorkers int producerInterval time.Duration @@ -77,15 +82,17 @@ var ( // type TestInputStream = chan boost.Job[TestJobInput] // -type TestInput struct { - Recipient string -} -type TestJobInput = boost.Job[TestInput] -type TestInputStream = chan boost.Job[TestJobInput] +type ( + TestInput struct { + Recipient string + } -type TestOutput string -type TestJobOutput = boost.JobOutput[TestOutput] -type TestOutputStream = chan boost.JobOutput[TestOutput] + TestJobInput = boost.Job[TestInput] + TestInputStream = boost.JobStream[TestJobInput] + TestOutput string + TestJobOutput = boost.JobOutput[TestOutput] + TestJobOutputStream = boost.JobOutputStream[TestOutput] +) var greeter = func(j TestJobInput) (TestJobOutput, error) { r := rand.Intn(1000) + 1 //nolint:gosec // trivial @@ -102,23 +109,23 @@ var greeter = func(j TestJobInput) (TestJobOutput, error) { } type pipeline[I, O any] struct { - wgan boost.WaitGroupAn - sequence int - outputsCh chan boost.JobOutput[O] - provider helpers.ProviderFunc[I] - producer *helpers.Producer[I, O] - pool *boost.WorkerPool[I, O] - consumer *helpers.Consumer[O] - cancel TerminatorFunc[I, O] - stop TerminatorFunc[I, O] + wgan boost.WaitGroupAn + sequence int + outputsDup *boost.Duplex[boost.JobOutput[O]] + provider helpers.ProviderFunc[I] + producer *helpers.Producer[I, O] + pool *boost.WorkerPool[I, O] + consumer *helpers.Consumer[O] + cancel TerminatorFunc[I, O] + stop TerminatorFunc[I, O] } -func start[I, O any](outputsCh chan boost.JobOutput[O]) *pipeline[I, O] { +func start[I, O any](outputsDupCh *boost.Duplex[boost.JobOutput[O]]) *pipeline[I, O] { pipe := &pipeline[I, O]{ - wgan: boost.NewAnnotatedWaitGroup("🍂 pipeline"), - outputsCh: outputsCh, - stop: noOp, - cancel: noOp, + wgan: boost.NewAnnotatedWaitGroup("🍂 pipeline"), + outputsDup: outputsDupCh, + stop: noOp, + cancel: noOp, } return pipe @@ -147,7 +154,6 @@ func (p *pipeline[I, O]) produce(parentContext context.Context, delay, ) } - p.producer = helpers.StartProducer[I, O]( parentContext, p.wgan, @@ -177,13 +183,13 @@ func (p *pipeline[I, O]) process(parentContext context.Context, p.wgan.Add(1, p.pool.RoutineName) - go p.pool.Start(parentContext, parentCancel, p.outputsCh) + go p.pool.Start(parentContext, parentCancel, p.outputsDup.WriterCh) } func (p *pipeline[I, O]) consume(parentContext context.Context, interval time.Duration) { p.consumer = helpers.StartConsumer(parentContext, p.wgan, - p.outputsCh, + p.outputsDup.ReaderCh, interval, ) @@ -226,7 +232,7 @@ var ( ) { Expect(result.Error).Error().To(BeNil()) Expect(pipe.producer.Count).To(Equal(pipe.consumer.Count)) - Eventually(parentContext, pipe.outputsCh).WithTimeout(time.Second * 5).Should(BeClosed()) + Eventually(parentContext, pipe.outputsDup.Channel).WithTimeout(time.Second * 5).Should(BeClosed()) Eventually(parentContext, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed()) } assertCancelled assertFunc = func(parentContext context.Context, @@ -302,15 +308,16 @@ var _ = Describe("WorkerPool", func() { func(specContext SpecContext, entry *poolTE) { defer leaktest.Check(GinkgoT())() - outputCh := lo.TernaryF(entry.outputsChSize > 0, - func() TestOutputStream { - return make(TestOutputStream, entry.outputsChSize) + outputDup := lo.TernaryF(entry.outputsChSize > 0, + func() *boost.Duplex[boost.JobOutput[TestOutput]] { + return boost.NewDuplex(make(TestJobOutputStream, entry.outputsChSize)) }, - func() TestOutputStream { - return nil + func() *boost.Duplex[boost.JobOutput[TestOutput]] { + return &boost.Duplex[boost.JobOutput[TestOutput]]{} }, ) - pipe := start[TestInput, TestOutput](outputCh) + + pipe := start[TestInput, TestOutput](outputDup) defer func() { if counter, ok := (pipe.wgan).(boost.AnnotatedWgCounter); ok { @@ -340,7 +347,7 @@ var _ = Describe("WorkerPool", func() { greeter, ) - if outputCh != nil { + if outputDup.Channel != nil { By("👾 WAIT-GROUP ADD(consumer)") pipe.consume(parentContext, lo.Ternary(entry.intervals.consumer > 0, entry.intervals.consumer, defaults.consumerInterval, @@ -366,8 +373,8 @@ var _ = Describe("WorkerPool", func() { should: "receive and process all", outputsChSize: OutputsChSize, intervals: durations{ - finishAfter: time.Second / 5, - outputChTimeout: time.Second, + finishAfter: scale(time.Second / 5), + outputChTimeout: scale(time.Second), }, finish: finishWithStop, summarise: summariseWithConsumer, @@ -379,9 +386,9 @@ var _ = Describe("WorkerPool", func() { should: "timeout on error", now: 2, intervals: durations{ - consumer: 1, // time.Second * 8, - finishAfter: time.Second * 2, - outputChTimeout: time.Second * 1, + consumer: scale(1), // time.Second * 8, + finishAfter: scale(time.Second * 2), + outputChTimeout: scale(time.Second * 1), }, outputsChSize: OutputsChSize, finish: finishWithCancel, @@ -395,8 +402,8 @@ var _ = Describe("WorkerPool", func() { now: 16, outputsChSize: OutputsChSize, intervals: durations{ - finishAfter: time.Second / 5, - outputChTimeout: time.Second, + finishAfter: scale(time.Second / 5), + outputChTimeout: scale(time.Second), }, finish: finishWithStop, summarise: summariseWithConsumer, @@ -408,8 +415,8 @@ var _ = Describe("WorkerPool", func() { should: "receive and process all without sending output, no error", outputsChSize: 0, intervals: durations{ - finishAfter: time.Second / 5, - outputChTimeout: time.Second / 10, + finishAfter: scale(time.Second / 5), + outputChTimeout: scale(time.Second / 10), }, finish: finishWithStop, summarise: summariseWithoutConsumer, @@ -421,8 +428,8 @@ var _ = Describe("WorkerPool", func() { should: "receive and process all without sending output, no error", outputsChSize: 0, intervals: durations{ - finishAfter: time.Second / 5, - outputChTimeout: time.Second / 10, + finishAfter: scale(time.Second / 5), + outputChTimeout: scale(time.Second / 10), }, finish: finishWithCancel, summarise: summariseWithoutConsumer, @@ -435,8 +442,8 @@ var _ = Describe("WorkerPool", func() { now: 2, outputsChSize: OutputsChSize, // 1 intervals: durations{ - finishAfter: time.Second / 5, - outputChTimeout: time.Second / 1000, + finishAfter: scale(time.Second / 5), + outputChTimeout: scale(time.Second / 1000), }, finish: finishWithStop, summarise: summariseWithConsumer, diff --git a/boost/worker.go b/boost/worker.go index e2602b5..125808b 100644 --- a/boost/worker.go +++ b/boost/worker.go @@ -11,7 +11,7 @@ type worker[I any, O any] struct { id workerID exec ExecutiveFunc[I, O] jobsChIn JobStreamR[I] - outputsChOut OutputStream[O] + outputsChOut JobOutputStreamW[O] finishedChOut finishedStreamW } diff --git a/internal/helpers/test-consumer.go b/internal/helpers/test-consumer.go index 5bfdca5..695f2f2 100644 --- a/internal/helpers/test-consumer.go +++ b/internal/helpers/test-consumer.go @@ -12,14 +12,14 @@ type Consumer[O any] struct { quitter boost.AnnotatedWgQuitter RoutineName boost.GoRoutineName interval time.Duration - OutputsChIn boost.OutputStreamR[O] + OutputsChIn boost.JobOutputStreamR[O] Count int } func StartConsumer[O any]( parentContext context.Context, quitter boost.AnnotatedWgQuitter, - outputsChIn boost.OutputStreamR[O], + outputsChIn boost.JobOutputStreamR[O], interval time.Duration, ) *Consumer[O] { consumer := &Consumer[O]{ diff --git a/internal/helpers/test-producer.go b/internal/helpers/test-producer.go index 45ee8e4..3d2c740 100644 --- a/internal/helpers/test-producer.go +++ b/internal/helpers/test-producer.go @@ -10,19 +10,19 @@ import ( ) type termination string -type terminationStream chan termination +type terminationDuplex *boost.Duplex[termination] type ProviderFunc[I any] func() I type Producer[I, O any] struct { - quitter boost.AnnotatedWgQuitter - RoutineName boost.GoRoutineName - sequenceNo int - provider ProviderFunc[I] - interval time.Duration - terminateCh terminationStream - JobsCh boost.JobStream[I] - Count int + quitter boost.AnnotatedWgQuitter + RoutineName boost.GoRoutineName + sequenceNo int + provider ProviderFunc[I] + interval time.Duration + terminateDup terminationDuplex + JobsCh boost.JobStream[I] + Count int } // The producer owns the Jobs channel as it knows when to close it. This producer is @@ -40,13 +40,14 @@ func StartProducer[I, O any]( } producer := Producer[I, O]{ - quitter: quitter, - RoutineName: boost.GoRoutineName("✨ producer"), - provider: provider, - interval: interval, - terminateCh: make(terminationStream), - JobsCh: make(boost.JobStream[I], capacity), + quitter: quitter, + RoutineName: boost.GoRoutineName("✨ producer"), + provider: provider, + interval: interval, + terminateDup: boost.NewDuplex(make(chan termination)), + JobsCh: make(boost.JobStream[I], capacity), } + go producer.run(parentContext) return &producer @@ -68,7 +69,7 @@ func (p *Producer[I, O]) run(parentContext context.Context) { fmt.Println(">>>> ✨ producer.run - done received ⛔⛔⛔") - case <-p.terminateCh: + case <-p.terminateDup.ReaderCh: running = false fmt.Printf(">>>> ✨ producer.run - termination detected (running: %v)\n", running) @@ -116,8 +117,8 @@ func (p *Producer[I, O]) item(parentContext context.Context) bool { func (p *Producer[I, O]) Stop() { fmt.Println(">>>> 🧲 producer terminating ...") - p.terminateCh <- termination("done") - close(p.terminateCh) + p.terminateDup.WriterCh <- termination("done") + close(p.terminateDup.Channel) } // StopProducerAfter, run in a new go routine