Skip to content

Commit

Permalink
ref(boost): minor refactorings (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Sep 2, 2023
1 parent 7034927 commit cf56b0a
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 86 deletions.
51 changes: 24 additions & 27 deletions boost/boost-public-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,33 @@ const (
MaxWorkers = 100
)

type Job[I any] struct {
ID string
Input I
SequenceNo int
}
type (
Job[I any] struct {
ID string
Input I
SequenceNo int
}

JobOutput[O any] struct {
Payload O
}

JobStream[I any] chan Job[I]
JobStreamR[I any] <-chan Job[I]
JobStreamW[I any] chan<- Job[I]

OutputStream[O any] chan JobOutput[O]
OutputStreamR[O any] <-chan JobOutput[O]
OutputStreamW[O any] chan<- JobOutput[O]

CancelWorkSignal struct{}
CancelStream = chan CancelWorkSignal
CancelStreamR = <-chan CancelWorkSignal
CancelStreamW = chan<- CancelWorkSignal
)

type ExecutiveFunc[I, O any] func(j Job[I]) (JobOutput[O], error)

func (f ExecutiveFunc[I, O]) Invoke(j Job[I]) (JobOutput[O], error) {
return f(j)
}

type JobOutput[O any] struct {
Payload O
}

type JobStream[I any] chan Job[I]
type JobStreamR[I any] <-chan Job[I]
type JobStreamW[I any] chan<- Job[I]

type OutputStream[O any] chan JobOutput[O]
type OutputStreamR[O any] <-chan JobOutput[O]
type OutputStreamW[O any] chan<- JobOutput[O]

type CancelWorkSignal struct{}
type CancelStream = chan CancelWorkSignal
type CancelStreamR = <-chan CancelWorkSignal
type CancelStreamW = chan<- CancelWorkSignal

type WorkerID string
type FinishedStream = chan WorkerID
type FinishedStreamR = <-chan WorkerID
type FinishedStreamW = chan<- WorkerID
17 changes: 12 additions & 5 deletions boost/pool-defs-internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@ const (
DefaultChSize = 100
)

type workerWrapper[I any, O any] struct {
cancelChOut chan<- CancelWorkSignal
core *worker[I, O]
}
type (
workerID string
finishedStream = chan workerID
finishedStreamR = <-chan workerID
finishedStreamW = chan<- workerID

type workersCollection[I, O any] map[WorkerID]*workerWrapper[I, O]
workerWrapper[I any, O any] struct {
cancelChOut CancelStreamW
core *worker[I, O]
}

workersCollection[I, O any] map[workerID]*workerWrapper[I, O]
)
47 changes: 18 additions & 29 deletions boost/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/google/uuid"
)

// privateWpInfo contains any state that needs to be mutated in a non concurrent manner
// privateWpInfo (dmz!) contains any state that needs to be mutated in a non concurrent manner
// and therefore should be exclusively accessed by a single go routine. Actually, due to
// our ability to compose functionality with channels as opposed to shared state, the
// pool does not contain any state that is accessed directly or indirectly from other
Expand All @@ -32,7 +32,7 @@ import (
type privateWpInfo[I, O any] struct {
pool workersCollection[I, O]
workersJobsCh chan Job[I]
finishedCh FinishedStream
finishedCh finishedStream
cancelCh CancelStream
}

Expand Down Expand Up @@ -66,7 +66,7 @@ func NewWorkerPool[I, O any](params *NewWorkerPoolParams[I, O]) *WorkerPool[I, O
private: privateWpInfo[I, O]{
pool: make(workersCollection[I, O], noWorkers),
workersJobsCh: make(JobStream[I], noWorkers),
finishedCh: make(FinishedStream, noWorkers),
finishedCh: make(finishedStream, noWorkers),
cancelCh: params.CancelCh,
},
exec: params.Exec,
Expand All @@ -86,12 +86,12 @@ var eyeballs = []string{
"❤️", "💙", "💚", "💜", "💛", "🤍", "💖", "💗", "💝",
}

func (p *WorkerPool[I, O]) composeID() WorkerID {
func (p *WorkerPool[I, O]) composeID() workerID {
n := len(p.private.pool)
index := (n) % len(eyeballs)
emoji := eyeballs[index]

return WorkerID(fmt.Sprintf("(%v)WORKER-ID-%v:%v", emoji, n, uuid.NewString()))
return workerID(fmt.Sprintf("(%v)WORKER-ID-%v:%v", emoji, n, uuid.NewString()))
}

func (p *WorkerPool[I, O]) Start(
Expand Down Expand Up @@ -119,10 +119,11 @@ func (p *WorkerPool[I, O]) run(
for running := true; running; {
select {
case <-ctx.Done():
fmt.Println("===> 🧊 WorkerPool.run - done received ☢️☢️☢️")

running = false

close(forwardChOut) // ⚠️ This is new
fmt.Println("===> 🧊 WorkerPool.run (source jobs chan closed) - done received ☢️☢️☢️")

case job, ok := <-p.sourceJobsChIn:
if ok {
fmt.Printf("===> 🧊 (#workers: '%v') WorkerPool.run - new job received\n",
Expand All @@ -139,6 +140,9 @@ func (p *WorkerPool[I, O]) run(
job.SequenceNo,
)
case <-ctx.Done():
running = false

close(forwardChOut) // ⚠️ This is new
fmt.Printf("===> 🧊 (#workers: '%v') WorkerPool.run - done received ☢️☢️☢️\n",
len(p.private.pool),
)
Expand All @@ -149,9 +153,9 @@ func (p *WorkerPool[I, O]) run(
// when the producer closes p.sourceJobsChIn, we need to delegate that
// closure to forwardChOut, otherwise we end up in a deadlock.
//
running = false
close(forwardChOut)
fmt.Printf("===> 🚀 WorkerPool.run(source jobs chan closed) 🟥🟥🟥\n")
running = false
}
}
}
Expand All @@ -171,7 +175,7 @@ func (p *WorkerPool[I, O]) spawn(
ctx context.Context,
jobsChIn JobStreamR[I],
outputsChOut OutputStream[O],
finishedChOut FinishedStreamW,
finishedChOut finishedStreamW,
) {
cancelCh := make(CancelStream, 1)

Expand All @@ -182,7 +186,6 @@ func (p *WorkerPool[I, O]) spawn(
jobsChIn: jobsChIn,
outputsChOut: outputsChOut,
finishedChOut: finishedChOut,
cancelChIn: cancelCh,
},
cancelChOut: cancelCh,
}
Expand All @@ -192,7 +195,7 @@ func (p *WorkerPool[I, O]) spawn(
fmt.Printf("===> 🧊 WorkerPool.spawned new worker: '%v' 🎀🎀🎀\n", w.core.id)
}

func (p *WorkerPool[I, O]) drain(finishedChIn FinishedStreamR) {
func (p *WorkerPool[I, O]) drain(finishedChIn finishedStreamR) {
fmt.Printf(
"!!!! 🧊 WorkerPool.drain - waiting for remaining workers: %v (#GRs: %v); 🧊🧊🧊 \n",
len(p.private.pool), runtime.NumGoroutine(),
Expand All @@ -205,7 +208,7 @@ func (p *WorkerPool[I, O]) drain(finishedChIn FinishedStreamR) {
// run loop, which ends that loop then enters drain the phase. When this happens,
// you can't reuse that same done channel as it will immediately return the value
// already handled. This has the effect of short-circuiting this loop meaning that
// workerID := <-finishedChIn never has a chance to be selected and the drain loop
// wid := <-finishedChIn never has a chance to be selected and the drain loop
// exits early. The end result of which means that the p.private.pool collection is
// never depleted.
//
Expand Down Expand Up @@ -237,29 +240,15 @@ func (p *WorkerPool[I, O]) drain(finishedChIn FinishedStreamR) {
// If a goroutine outlives its context or keeps references to closed Done() channels,
// it might not behave as expected.
//
workerID := <-finishedChIn
delete(p.private.pool, workerID)
wid := <-finishedChIn
delete(p.private.pool, wid)

if len(p.private.pool) == 0 {
running = false
}

fmt.Printf("!!!! 🧊 WorkerPool.drain - worker(%v) finished, remaining: '%v' 🟥\n",
workerID, len(p.private.pool),
wid, len(p.private.pool),
)
}
}

func (p *WorkerPool[I, O]) cancelWorkers() {
// perhaps, we can replace this with another broadcast mechanism such as sync.Cond
//
n := len(p.private.pool)
for k, w := range p.private.pool {
fmt.Printf("===> 🧊 cancelling worker '%v' of %v 📛📛📛... \n", k, n)
// shouldn't need to be preemptable because it is a buffered single item channel
// which should only ever be accessed by the work pool GR and therefore should
// never be a position where its competing to send on that channel
//
w.cancelChOut <- CancelWorkSignal{}
}
}
45 changes: 26 additions & 19 deletions boost/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,15 @@ var (
passthruContext contextFunc = func(ctx context.Context) (context.Context, context.CancelFunc) {
return ctx, nil
}
assertCounts assertFunc = func(ctx context.Context, pipe TestPipeline) {
assertWithConsumerCounts assertFunc = func(ctx context.Context, pipe TestPipeline) {
Expect(pipe.producer.Count).To(Equal(pipe.consumer.Count))
Eventually(ctx, pipe.outputsCh).WithTimeout(time.Second * 5).Should(BeClosed())
Eventually(ctx, pipe.producer.JobsCh).WithTimeout(time.Second * 5).Should(BeClosed())
}
assertAbort assertFunc = func(ctx context.Context, pipe TestPipeline) {
// tbd ...
}

summariseWithConsumer summariseFunc = func(pipe TestPipeline) {
fmt.Printf("<--- orpheus(alpha) finished Counts >>> (Producer: '%v', Consumer: '%v'). 🎯🎯🎯\n",
pipe.producer.Count,
Expand All @@ -220,6 +224,7 @@ type poolTE struct {
outputsChSize int
after time.Duration
context contextFunc
outputTimeout time.Duration
finish finishFunc
summarise summariseFunc
assert assertFunc
Expand All @@ -230,15 +235,15 @@ var _ = Describe("WorkerPool", func() {
func(ctxSpec SpecContext, entry *poolTE) {
defer leaktest.Check(GinkgoT())()

oc := lo.TernaryF(entry.outputsChSize > 0,
outputCh := lo.TernaryF(entry.outputsChSize > 0,
func() TestOutputStream {
return make(TestOutputStream, entry.outputsChSize)
},
func() TestOutputStream {
return nil
},
)
pipe := start[TestInput, TestOutput](oc)
pipe := start[TestInput, TestOutput](outputCh)

defer func() {
if counter, ok := (pipe.wgan).(boost.AnnotatedWgCounter); ok {
Expand All @@ -261,7 +266,7 @@ var _ = Describe("WorkerPool", func() {
now := lo.Ternary(entry.now > 0, entry.now, DefaultNoWorkers)
pipe.process(ctx, now, greeter)

if oc != nil {
if outputCh != nil {
By("👾 WAIT-GROUP ADD(consumer)")
pipe.consume(ctx)
}
Expand All @@ -287,7 +292,7 @@ var _ = Describe("WorkerPool", func() {
context: passthruContext,
finish: finishWithStop,
summarise: summariseWithConsumer,
assert: assertCounts,
assert: assertWithConsumerCounts,
}),

Entry(nil, &poolTE{
Expand All @@ -301,35 +306,37 @@ var _ = Describe("WorkerPool", func() {
}),

Entry(nil, &poolTE{
given: "finish by stop and no output",
given: "finish by stop and high no of workers",
should: "receive and process all",
now: 16,
outputsChSize: OutputsChSize,
after: time.Second / 5,
context: passthruContext,
finish: finishWithStop,
summarise: summariseWithConsumer,
assert: assertWithConsumerCounts,
}),

Entry(nil, &poolTE{
given: "finish by stop and no output",
should: "cancel parent and abort",
outputsChSize: 0,
after: time.Second / 5,
context: passthruContext,
outputTimeout: time.Second / 10,
finish: finishWithStop,
summarise: summariseWithoutConsumer,
assert: assertAbort,
}),

Entry(nil, &poolTE{
given: "finish by cancel and no output",
should: "receive and process all",
should: "cancel parent and abort",
outputsChSize: 0,
after: time.Second / 5,
context: context.WithCancel,
finish: finishWithCancel,
summarise: summariseWithoutConsumer,
}),

Entry(nil, &poolTE{
given: "finish by stop and high no of workers",
should: "receive and process all",
now: 16,
outputsChSize: OutputsChSize,
after: time.Second / 5,
context: passthruContext,
finish: finishWithStop,
summarise: summariseWithConsumer,
assert: assertCounts,
}),
)
})
16 changes: 10 additions & 6 deletions boost/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,22 @@ import (
)

type worker[I any, O any] struct {
id WorkerID
id workerID
exec ExecutiveFunc[I, O]
jobsChIn JobStreamR[I]
outputsChOut OutputStream[O]
finishedChOut FinishedStreamW

// this might be better replaced with a broadcast mechanism such as sync.Cond
//
cancelChIn CancelStreamR
finishedChOut finishedStreamW
}

func (w *worker[I, O]) run(ctx context.Context) {
// we need to also receive the cancel function associated with this context
// Should we allow each worker to create it's own child context that
// contains the timeout or do we share the same pool allocated context shared
// across all the workers?
//
// Could we use a condition event to signal cancellation? That way it becomes
// easy to test for, as long the test has access to the event/condition
//
defer func() {
w.finishedChOut <- w.id // ⚠️ non-pre-emptive send, but this should be ok
fmt.Printf(" <--- 🚀 worker.run(%v) (SENT FINISHED). 🚀🚀🚀\n", w.id)
Expand Down

0 comments on commit cf56b0a

Please sign in to comment.