Skip to content

Commit

Permalink
ref(boost); fix up channel type defs (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Sep 25, 2023
1 parent 1a4b397 commit 8de6983
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 79 deletions.
26 changes: 23 additions & 3 deletions boost/boost-public-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,20 @@ type (
JobStreamR[I any] <-chan Job[I]
JobStreamW[I any] chan<- Job[I]

OutputStream[O any] chan JobOutput[O]
OutputStreamR[O any] <-chan JobOutput[O]
OutputStreamW[O any] chan<- JobOutput[O]
JobOutputStream[O any] chan JobOutput[O]
JobOutputStreamR[O any] <-chan JobOutput[O]
JobOutputStreamW[O any] chan<- JobOutput[O]

// Duplex represents a channel with multiple views, to be used
// by clients that need to hand out different ends of the same
// channel to different entities.
Duplex[T any] struct {
Channel chan T
ReaderCh <-chan T
WriterCh chan<- T
}

DuplexJobOutput[O any] Duplex[JobOutput[O]]

CancelWorkSignal struct{}
CancelStream = chan CancelWorkSignal
Expand All @@ -42,3 +53,12 @@ type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error)
func (f ExecutiveFunc[I, O]) Invoke(j Job[I]) (JobOutput[O], error) {
return f(j)
}

// NewDuplex creates a new instance of a Duplex with all members populated
func NewDuplex[T any](channel chan T) *Duplex[T] {
return &Duplex[T]{
Channel: channel,
ReaderCh: channel,
WriterCh: channel,
}
}
3 changes: 1 addition & 2 deletions boost/pool-defs-internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ type (
finishedStreamW = chan<- *workerFinishedResult

workerWrapper[I any, O any] struct {
cancelChOut CancelStreamW
core *worker[I, O]
core *worker[I, O]
}

workersCollection[I, O any] map[workerID]*workerWrapper[I, O]
Expand Down
16 changes: 9 additions & 7 deletions boost/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,22 @@ func (p *WorkerPool[I, O]) composeID() workerID {
func (p *WorkerPool[I, O]) Start(
parentContext context.Context,
parentCancel context.CancelFunc,
outputsChOut OutputStream[O],
outputsChOut chan<- JobOutput[O],
) {
p.run(parentContext, parentCancel, p.outputChTimeout, p.private.workersJobsCh, outputsChOut)
p.run(parentContext,
parentCancel,
p.outputChTimeout,
p.private.workersJobsCh,
outputsChOut,
)
}

func (p *WorkerPool[I, O]) run(
parentContext context.Context,
parentCancel context.CancelFunc,
outputChTimeout time.Duration,
forwardChOut JobStreamW[I],
outputsChOut OutputStream[O],
outputsChOut JobOutputStreamW[O],
) {
result := &PoolResult{}
defer func(r *PoolResult) {
Expand Down Expand Up @@ -202,11 +207,9 @@ func (p *WorkerPool[I, O]) spawn(
parentCancel context.CancelFunc,
outputChTimeout time.Duration,
jobsChIn JobStreamR[I],
outputsChOut OutputStream[O],
outputsChOut JobOutputStreamW[O],
finishedChOut finishedStreamW,
) {
cancelCh := make(CancelStream, 1)

w := &workerWrapper[I, O]{
core: &worker[I, O]{
id: p.composeID(),
Expand All @@ -215,7 +218,6 @@ func (p *WorkerPool[I, O]) spawn(
outputsChOut: outputsChOut,
finishedChOut: finishedChOut,
},
cancelChOut: cancelCh, // TODO: this is not used, so delete
}

p.private.pool[w.core.id] = w
Expand Down
99 changes: 53 additions & 46 deletions boost/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ func (f TerminatorFunc[I, O]) After(parentContext context.Context,
const (
JobChSize = 10
OutputsChSize = 10
ScalingFactor = 1
)

func scale(t time.Duration) time.Duration {
return t / ScalingFactor
}

var defaults = struct {
noOfWorkers int
producerInterval time.Duration
Expand Down Expand Up @@ -77,15 +82,17 @@ var (
// type TestInputStream = chan boost.Job[TestJobInput]
//

type TestInput struct {
Recipient string
}
type TestJobInput = boost.Job[TestInput]
type TestInputStream = chan boost.Job[TestJobInput]
type (
TestInput struct {
Recipient string
}

type TestOutput string
type TestJobOutput = boost.JobOutput[TestOutput]
type TestOutputStream = chan boost.JobOutput[TestOutput]
TestJobInput = boost.Job[TestInput]
TestInputStream = boost.JobStream[TestJobInput]
TestOutput string
TestJobOutput = boost.JobOutput[TestOutput]
TestJobOutputStream = boost.JobOutputStream[TestOutput]
)

var greeter = func(j TestJobInput) (TestJobOutput, error) {
r := rand.Intn(1000) + 1 //nolint:gosec // trivial
Expand All @@ -102,23 +109,23 @@ var greeter = func(j TestJobInput) (TestJobOutput, error) {
}

type pipeline[I, O any] struct {
wgan boost.WaitGroupAn
sequence int
outputsCh chan boost.JobOutput[O]
provider helpers.ProviderFunc[I]
producer *helpers.Producer[I, O]
pool *boost.WorkerPool[I, O]
consumer *helpers.Consumer[O]
cancel TerminatorFunc[I, O]
stop TerminatorFunc[I, O]
wgan boost.WaitGroupAn
sequence int
outputsDup *boost.Duplex[boost.JobOutput[O]]
provider helpers.ProviderFunc[I]
producer *helpers.Producer[I, O]
pool *boost.WorkerPool[I, O]
consumer *helpers.Consumer[O]
cancel TerminatorFunc[I, O]
stop TerminatorFunc[I, O]
}

func start[I, O any](outputsCh chan boost.JobOutput[O]) *pipeline[I, O] {
func start[I, O any](outputsDupCh *boost.Duplex[boost.JobOutput[O]]) *pipeline[I, O] {
pipe := &pipeline[I, O]{
wgan: boost.NewAnnotatedWaitGroup("🍂 pipeline"),
outputsCh: outputsCh,
stop: noOp,
cancel: noOp,
wgan: boost.NewAnnotatedWaitGroup("🍂 pipeline"),
outputsDup: outputsDupCh,
stop: noOp,
cancel: noOp,
}

return pipe
Expand Down Expand Up @@ -147,7 +154,6 @@ func (p *pipeline[I, O]) produce(parentContext context.Context,
delay,
)
}

p.producer = helpers.StartProducer[I, O](
parentContext,
p.wgan,
Expand Down Expand Up @@ -177,13 +183,13 @@ func (p *pipeline[I, O]) process(parentContext context.Context,

p.wgan.Add(1, p.pool.RoutineName)

go p.pool.Start(parentContext, parentCancel, p.outputsCh)
go p.pool.Start(parentContext, parentCancel, p.outputsDup.WriterCh)
}

func (p *pipeline[I, O]) consume(parentContext context.Context, interval time.Duration) {
p.consumer = helpers.StartConsumer(parentContext,
p.wgan,
p.outputsCh,
p.outputsDup.ReaderCh,
interval,
)

Expand Down Expand Up @@ -226,7 +232,7 @@ var (
) {
Expect(result.Error).Error().To(BeNil())
Expect(pipe.producer.Count).To(Equal(pipe.consumer.Count))
Eventually(parentContext, pipe.outputsCh).WithTimeout(time.Second * 5).Should(BeClosed())
Eventually(parentContext, pipe.outputsDup.Channel).WithTimeout(time.Second * 5).Should(BeClosed())
Eventually(parentContext, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed())
}
assertCancelled assertFunc = func(parentContext context.Context,
Expand Down Expand Up @@ -302,15 +308,16 @@ var _ = Describe("WorkerPool", func() {
func(specContext SpecContext, entry *poolTE) {
defer leaktest.Check(GinkgoT())()

outputCh := lo.TernaryF(entry.outputsChSize > 0,
func() TestOutputStream {
return make(TestOutputStream, entry.outputsChSize)
outputDup := lo.TernaryF(entry.outputsChSize > 0,
func() *boost.Duplex[boost.JobOutput[TestOutput]] {
return boost.NewDuplex(make(TestJobOutputStream, entry.outputsChSize))
},
func() TestOutputStream {
return nil
func() *boost.Duplex[boost.JobOutput[TestOutput]] {
return &boost.Duplex[boost.JobOutput[TestOutput]]{}
},
)
pipe := start[TestInput, TestOutput](outputCh)

pipe := start[TestInput, TestOutput](outputDup)

defer func() {
if counter, ok := (pipe.wgan).(boost.AnnotatedWgCounter); ok {
Expand Down Expand Up @@ -340,7 +347,7 @@ var _ = Describe("WorkerPool", func() {
greeter,
)

if outputCh != nil {
if outputDup.Channel != nil {
By("👾 WAIT-GROUP ADD(consumer)")
pipe.consume(parentContext, lo.Ternary(entry.intervals.consumer > 0,
entry.intervals.consumer, defaults.consumerInterval,
Expand All @@ -366,8 +373,8 @@ var _ = Describe("WorkerPool", func() {
should: "receive and process all",
outputsChSize: OutputsChSize,
intervals: durations{
finishAfter: time.Second / 5,
outputChTimeout: time.Second,
finishAfter: scale(time.Second / 5),
outputChTimeout: scale(time.Second),
},
finish: finishWithStop,
summarise: summariseWithConsumer,
Expand All @@ -379,9 +386,9 @@ var _ = Describe("WorkerPool", func() {
should: "timeout on error",
now: 2,
intervals: durations{
consumer: 1, // time.Second * 8,
finishAfter: time.Second * 2,
outputChTimeout: time.Second * 1,
consumer: scale(1), // time.Second * 8,
finishAfter: scale(time.Second * 2),
outputChTimeout: scale(time.Second * 1),
},
outputsChSize: OutputsChSize,
finish: finishWithCancel,
Expand All @@ -395,8 +402,8 @@ var _ = Describe("WorkerPool", func() {
now: 16,
outputsChSize: OutputsChSize,
intervals: durations{
finishAfter: time.Second / 5,
outputChTimeout: time.Second,
finishAfter: scale(time.Second / 5),
outputChTimeout: scale(time.Second),
},
finish: finishWithStop,
summarise: summariseWithConsumer,
Expand All @@ -408,8 +415,8 @@ var _ = Describe("WorkerPool", func() {
should: "receive and process all without sending output, no error",
outputsChSize: 0,
intervals: durations{
finishAfter: time.Second / 5,
outputChTimeout: time.Second / 10,
finishAfter: scale(time.Second / 5),
outputChTimeout: scale(time.Second / 10),
},
finish: finishWithStop,
summarise: summariseWithoutConsumer,
Expand All @@ -421,8 +428,8 @@ var _ = Describe("WorkerPool", func() {
should: "receive and process all without sending output, no error",
outputsChSize: 0,
intervals: durations{
finishAfter: time.Second / 5,
outputChTimeout: time.Second / 10,
finishAfter: scale(time.Second / 5),
outputChTimeout: scale(time.Second / 10),
},
finish: finishWithCancel,
summarise: summariseWithoutConsumer,
Expand All @@ -435,8 +442,8 @@ var _ = Describe("WorkerPool", func() {
now: 2,
outputsChSize: OutputsChSize, // 1
intervals: durations{
finishAfter: time.Second / 5,
outputChTimeout: time.Second / 1000,
finishAfter: scale(time.Second / 5),
outputChTimeout: scale(time.Second / 1000),
},
finish: finishWithStop,
summarise: summariseWithConsumer,
Expand Down
2 changes: 1 addition & 1 deletion boost/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type worker[I any, O any] struct {
id workerID
exec ExecutiveFunc[I, O]
jobsChIn JobStreamR[I]
outputsChOut OutputStream[O]
outputsChOut JobOutputStreamW[O]
finishedChOut finishedStreamW
}

Expand Down
4 changes: 2 additions & 2 deletions internal/helpers/test-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ type Consumer[O any] struct {
quitter boost.AnnotatedWgQuitter
RoutineName boost.GoRoutineName
interval time.Duration
OutputsChIn boost.OutputStreamR[O]
OutputsChIn boost.JobOutputStreamR[O]
Count int
}

func StartConsumer[O any](
parentContext context.Context,
quitter boost.AnnotatedWgQuitter,
outputsChIn boost.OutputStreamR[O],
outputsChIn boost.JobOutputStreamR[O],
interval time.Duration,
) *Consumer[O] {
consumer := &Consumer[O]{
Expand Down
Loading

0 comments on commit 8de6983

Please sign in to comment.