Skip to content

Commit

Permalink
ref(boost): change client stream type defs (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Aug 30, 2023
1 parent ce8767c commit 441f4a3
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
2 changes: 1 addition & 1 deletion boost/worker-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// and hence should only ever be accessed by the worker pool GR in contrast to all the
// other members of WorkerPool. This is an experimental pattern, the purpose of which
// is the clearly indicate what state can be accessed in different concurrency contexts,
// to ensure future updates can be applied with minimal cognitive overload.
// to ensure future updates can be appliethe d with minimal cognitive overload.
//
// There is another purpose for privateWpInfo and that is to do with "confinement" as
// described on page 86 of CiG. The aim here is to use "lexical confinement" for
Expand Down
40 changes: 27 additions & 13 deletions boost/worker-pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,34 @@ var (
testMain = boost.GoRoutineName("👾 test-main")
)

type TestJobInput struct {
// When defining client side channel types, the rule should be, when creating
// a derivative of the boost type, the client should not introduce a new type,
// rather, they should introduce an alias to the boost type. So we should never
// do:
// type TestInputStream chan boost.Job[TestJobInput]
//
// because we are referring to a boost type. Instead we should define
//
// type TestInputStream = chan boost.Job[TestJobInput]
//

type TestInput struct {
Recipient string
}
type TestJobInput = boost.Job[TestInput]
type TestInputStream = chan boost.Job[TestJobInput]

type TestJobOutput string
type TestOutputStream chan boost.JobOutput[TestJobOutput]
type TestOutput string
type TestJobOutput = boost.JobOutput[TestOutput]
type TestOutputStream = chan boost.JobOutput[TestOutput]

var greeter = func(j boost.Job[TestJobInput]) (boost.JobOutput[TestJobOutput], error) {
var greeter = func(j TestJobInput) (TestJobOutput, error) {
r := rand.Intn(1000) + 1 //nolint:gosec // trivial
delay := time.Millisecond * time.Duration(r)
time.Sleep(delay)

result := boost.JobOutput[TestJobOutput]{
Payload: TestJobOutput(fmt.Sprintf(" ---> 🍉🍉🍉 [Seq: %v] Hello: '%v'",
result := TestJobOutput{
Payload: TestOutput(fmt.Sprintf(" ---> 🍉🍉🍉 [Seq: %v] Hello: '%v'",
j.SequenceNo, j.Input.Recipient,
)),
}
Expand Down Expand Up @@ -150,7 +164,7 @@ func (p *pipeline[I, O]) consume(ctx context.Context) {
p.wgan.Add(1, p.consumer.RoutineName)
}

type TestPipeline *pipeline[TestJobInput, TestJobOutput]
type TestPipeline *pipeline[TestInput, TestOutput]
type assertFunc func(ctx context.Context, pipe TestPipeline)
type contextFunc func(ctx context.Context) (context.Context, context.CancelFunc)
type finishFunc func(
Expand Down Expand Up @@ -217,14 +231,14 @@ var _ = Describe("WorkerPool", func() {
defer leaktest.Check(GinkgoT())()

oc := lo.TernaryF(entry.outputsChSize > 0,
func() chan boost.JobOutput[TestJobOutput] {
return make(chan boost.JobOutput[TestJobOutput], entry.outputsChSize)
func() TestOutputStream {
return make(TestOutputStream, entry.outputsChSize)
},
func() chan boost.JobOutput[TestJobOutput] {
func() TestOutputStream {
return nil
},
)
pipe := start[TestJobInput, TestJobOutput](oc)
pipe := start[TestInput, TestOutput](oc)

defer func() {
if counter, ok := (pipe.wgan).(boost.AnnotatedWgCounter); ok {
Expand All @@ -235,9 +249,9 @@ var _ = Describe("WorkerPool", func() {
ctx, cancel := entry.context(ctxSpec)

By("👾 WAIT-GROUP ADD(producer)")
provider := func() TestJobInput {
provider := func() TestInput {
recipient := rand.Intn(len(audience)) //nolint:gosec // trivial
return TestJobInput{
return TestInput{
Recipient: audience[recipient],
}
}
Expand Down

0 comments on commit 441f4a3

Please sign in to comment.