Skip to content

Commit

Permalink
ref(ants,boost): make pool size an option (#290)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Jun 7, 2024
1 parent 3e318c6 commit d4c1a83
Show file tree
Hide file tree
Showing 20 changed files with 100 additions and 61 deletions.
1 change: 1 addition & 0 deletions boost/ants-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ var (
WithOutput = ants.WithOutput
WithPanicHandler = ants.WithPanicHandler
WithPreAlloc = ants.WithPreAlloc
WithSize = ants.WithSize
)
8 changes: 5 additions & 3 deletions boost/examples/fp-throttled/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/snivilised/lorax/boost"
"github.com/snivilised/lorax/internal/ants"
)

// Demonstrates that when all workers are engaged and the pool is at capacity,
Expand Down Expand Up @@ -45,11 +44,14 @@ func main() {

const NoW = 3

pool, _ := boost.NewFuncPool[int, int](ctx, NoW, func(input ants.InputParam) {
pool, _ := boost.NewFuncPool[int, int](ctx, func(input boost.InputParam) {
n, _ := input.(int)
fmt.Printf("<--- (n: %v)🍒 \n", n)
time.Sleep(time.Second)
}, &wg, ants.WithNonblocking(false))
}, &wg,
boost.WithSize(NoW),
boost.WithNonblocking(false),
)

defer pool.Release(ctx)

Expand Down
7 changes: 4 additions & 3 deletions boost/examples/mf-all-output-consumed-by-range/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ func main() {
defer cancel()

pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, func(input int) (int, error) {
ctx, func(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}, &wg,
boost.WithSize(AntsSize),
boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend),
)

Expand All @@ -37,10 +38,10 @@ func main() {
}

wg.Add(1)
go produce(ctx, pool, &wg)
go produce(ctx, pool, &wg)

wg.Add(1)
go consume(ctx, pool, &wg)
go consume(ctx, pool, &wg)

fmt.Printf("pool with func, no of running workers:%d\n",
pool.Running(),
Expand Down
3 changes: 2 additions & 1 deletion boost/examples/mf-err-missing-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ func main() {

fmt.Printf("⏱️ timeout on send, missing consumer: '%.2f's\n", TimeoutOnSend.Seconds())
pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, func(input int) (int, error) {
ctx, func(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}, &wg,
boost.WithSize(AntsSize),
boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend),
)

Expand Down
3 changes: 2 additions & 1 deletion boost/examples/mf-err-timeout-on-send/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ func main() {

fmt.Printf("⏱️ timeout on send: '%.2f's\n", TimeoutOnSend.Seconds())
pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, func(input int) (int, error) {
ctx, func(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}, &wg,
boost.WithSize(AntsSize),
boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend),
)

Expand Down
3 changes: 2 additions & 1 deletion boost/examples/mf-input-injected-via-chan/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ func main() {
defer cancel()

pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, func(input int) (int, error) {
ctx, func(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}, &wg,
boost.WithSize(AntsSize),
boost.WithInput(InputChSize),
boost.WithOutput(OutputChSize, CheckCloseInterval, TimeoutOnSend),
)
Expand Down
5 changes: 4 additions & 1 deletion boost/examples/tp-throttled/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ func main() {

const NoW = 3

pool, _ := boost.NewTaskPool[int, int](ctx, NoW, &wg)
pool, _ := boost.NewTaskPool[int, int](ctx,
&wg,
boost.WithSize(NoW),
)

defer pool.Release(ctx)

Expand Down
13 changes: 10 additions & 3 deletions boost/options.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package boost

import (
"runtime"
)

// withDefaults prepends boost withDefaults to the sequence of options
func withDefaults(options ...Option) []Option {
const (
noDefaults = 1
)
o := make([]Option, 0, len(options)+noDefaults)
o = append(o, WithGenerator(&Sequential{
Format: "ID:%08d",
}))
o = append(o,
WithGenerator(&Sequential{
Format: "ID:%08d",
}),
WithSize(uint(runtime.NumCPU())),
)
o = append(o, options...)

return o
Expand Down
6 changes: 3 additions & 3 deletions boost/support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

. "github.com/onsi/ginkgo/v2" //nolint:revive // ok
"github.com/snivilised/lorax/internal/ants"
"github.com/snivilised/lorax/boost"
)

const (
Expand Down Expand Up @@ -39,7 +39,7 @@ func demoFunc() {
time.Sleep(time.Duration(BenchParam) * time.Millisecond)
}

func demoPoolFunc(inputCh ants.InputParam) {
func demoPoolFunc(inputCh boost.InputParam) {
n, _ := inputCh.(int)
time.Sleep(time.Duration(n) * time.Millisecond)
}
Expand All @@ -60,7 +60,7 @@ func longRunningFunc() {

var stopLongRunningPoolFunc int32

func longRunningPoolFunc(arg ants.InputParam) {
func longRunningPoolFunc(arg boost.InputParam) {
if ch, ok := arg.(chan struct{}); ok {
<-ch
return
Expand Down
5 changes: 2 additions & 3 deletions boost/worker-pool-func-manifold.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type ManifoldFuncPool[I, O any] struct {

// NewManifoldFuncPool creates a new manifold function based worker pool.
func NewManifoldFuncPool[I, O any](ctx context.Context,
size int,
mf ManifoldFunc[I, O],
wg *sync.WaitGroup,
options ...Option,
Expand All @@ -42,7 +41,7 @@ func NewManifoldFuncPool[I, O any](ctx context.Context,
wi = fromOutputInfo(o, oi)
}

pool, err := ants.NewPoolWithFunc(ctx, size, func(input ants.InputParam) {
pool, err := ants.NewPoolWithFunc(ctx, func(input InputParam) {
manifoldFuncResponse(ctx, mf, input, wi)
}, ants.WithOptions(*o))

Expand Down Expand Up @@ -131,7 +130,7 @@ func (p *ManifoldFuncPool[I, O]) Conclude(ctx context.Context) {

func manifoldFuncResponse[I, O any](ctx context.Context,
mf ManifoldFunc[I, O],
input ants.InputParam,
input InputParam,
wi *outputInfoW[O],
) {
if job, ok := input.(Job[I]); ok {
Expand Down
15 changes: 10 additions & 5 deletions boost/worker-pool-func-manifold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ var _ = Describe("WorkerPoolFuncManifold", func() {
defer cancel()

pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, demoPoolManifoldFunc, &wg,
ctx, demoPoolManifoldFunc, &wg,
boost.WithSize(AntsSize),
boost.WithOutput(10, CheckCloseInterval, TimeoutOnSend),
)

Expand Down Expand Up @@ -93,7 +94,8 @@ var _ = Describe("WorkerPoolFuncManifold", func() {
defer cancel()

pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, demoPoolManifoldFunc, &wg,
ctx, demoPoolManifoldFunc, &wg,
boost.WithSize(AntsSize),
)

defer pool.Release(ctx)
Expand All @@ -120,7 +122,8 @@ var _ = Describe("WorkerPoolFuncManifold", func() {
defer cancel()

pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, demoPoolManifoldFunc, &wg,
ctx, demoPoolManifoldFunc, &wg,
boost.WithSize(AntsSize),
boost.WithInput(InputBufferSize),
boost.WithOutput(10, CheckCloseInterval, TimeoutOnSend),
)
Expand Down Expand Up @@ -153,7 +156,8 @@ var _ = Describe("WorkerPoolFuncManifold", func() {
defer cancel()

pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, demoPoolManifoldFunc, &wg,
ctx, demoPoolManifoldFunc, &wg,
boost.WithSize(AntsSize),
)

defer pool.Release(ctx)
Expand Down Expand Up @@ -197,7 +201,8 @@ var _ = Describe("WorkerPoolFuncManifold", func() {
defer cancel()

pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, demoPoolManifoldFunc, &wg,
ctx, demoPoolManifoldFunc, &wg,
boost.WithSize(AntsSize),
boost.WithInput(InputBufferSize),
boost.WithOutput(10, CheckCloseInterval, TimeoutOnSend),
)
Expand Down
3 changes: 1 addition & 2 deletions boost/worker-pool-func.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type FuncPool[I, O any] struct {
// NewFuncPool creates a new worker pool using the native ants interface; ie
// new jobs are submitted with Submit(task TaskFunc)
func NewFuncPool[I, O any](ctx context.Context,
size int,
pf ants.PoolFunc,
wg *sync.WaitGroup,
options ...Option,
Expand All @@ -26,7 +25,7 @@ func NewFuncPool[I, O any](ctx context.Context,
// allocated for each job, but this is not necessarily
// the case, because each worker has its own job queue.
//
pool, err := ants.NewPoolWithFunc(ctx, size, pf, withDefaults(options...)...)
pool, err := ants.NewPoolWithFunc(ctx, pf, withDefaults(options...)...)

return &FuncPool[I, O]{
basePool: basePool[I, O]{
Expand Down
11 changes: 8 additions & 3 deletions boost/worker-pool-func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ var _ = Describe("WorkerPoolFunc", func() {
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

pool, err := boost.NewFuncPool[int, int](ctx, AntsSize, demoPoolFunc, &wg)
pool, err := boost.NewFuncPool[int, int](ctx, demoPoolFunc, &wg,
boost.WithSize(AntsSize),
)

defer pool.Release(ctx)

Expand All @@ -46,7 +48,9 @@ var _ = Describe("WorkerPoolFunc", func() {
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

pool, err := boost.NewFuncPool[int, int](ctx, AntsSize, demoPoolFunc, &wg)
pool, err := boost.NewFuncPool[int, int](ctx, demoPoolFunc, &wg,
boost.WithSize(AntsSize),
)

defer pool.Release(ctx)

Expand Down Expand Up @@ -77,7 +81,8 @@ var _ = Describe("WorkerPoolFunc", func() {
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

pool, err := boost.NewFuncPool[int, int](ctx, PoolSize, longRunningPoolFunc, &wg,
pool, err := boost.NewFuncPool[int, int](ctx, longRunningPoolFunc, &wg,
boost.WithSize(PoolSize),
boost.WithMaxBlockingTasks(1),
)

Expand Down
3 changes: 1 addition & 2 deletions boost/worker-pool-task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@ type TaskPool[I, O any] struct {
// NewTaskPool creates a new worker pool using the native ants interface; ie
// new jobs are submitted with Submit(task TaskFunc)
func NewTaskPool[I, O any](ctx context.Context,
size int,
wg *sync.WaitGroup,
options ...Option,
) (*TaskPool[I, O], error) {
pool, err := ants.NewPool(ctx, size, withDefaults(options...)...)
pool, err := ants.NewPool(ctx, withDefaults(options...)...)

return &TaskPool[I, O]{
basePool: basePool[I, O]{
Expand Down
6 changes: 4 additions & 2 deletions boost/worker-pool-task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ var _ = Describe("WorkerPoolTask", func() {
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

pool, err := boost.NewTaskPool[int, int](ctx, PoolSize, &wg,
pool, err := boost.NewTaskPool[int, int](ctx, &wg,
boost.WithSize(PoolSize),
boost.WithNonblocking(true),
)
defer pool.Release(ctx)
Expand Down Expand Up @@ -66,7 +67,8 @@ var _ = Describe("WorkerPoolTask", func() {
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

pool, err := boost.NewTaskPool[int, int](ctx, PoolSize, &wg,
pool, err := boost.NewTaskPool[int, int](ctx, &wg,
boost.WithSize(PoolSize),
boost.WithMaxBlockingTasks(1),
)
Expect(err).To(Succeed(), "create TimingPool failed")
Expand Down
19 changes: 13 additions & 6 deletions internal/ants/ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
. "github.com/onsi/ginkgo/v2" //nolint:revive // ok
. "github.com/onsi/gomega" //nolint:revive // ok

"github.com/snivilised/lorax/boost"
"github.com/snivilised/lorax/internal/ants"
)

Expand All @@ -21,8 +22,12 @@ var _ = Describe("Ants", func() {
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

poolSize := 10
pool, err := ants.NewPool(ctx, poolSize, ants.WithNonblocking(true))
const poolSize = 10

pool, err := ants.NewPool(ctx,
ants.WithSize(poolSize),
ants.WithNonblocking(true),
)
Expect(err).To(Succeed(), "create TimingPool failed")

defer pool.Release(ctx)
Expand Down Expand Up @@ -62,8 +67,10 @@ var _ = Describe("Ants", func() {
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

poolSize := 10
pool, err := ants.NewPool(ctx, poolSize, ants.WithMaxBlockingTasks(1))
const poolSize = 10
pool, err := ants.NewPool(ctx,
ants.WithMaxBlockingTasks(1),
)
Expect(err).To(Succeed(), "create TimingPool failed")

defer pool.Release(ctx)
Expand Down Expand Up @@ -122,7 +129,7 @@ var _ = Describe("Ants", func() {
defer cancel()

var wg sync.WaitGroup
pool, _ := ants.NewPoolWithFunc(ctx, AntsSize, func(i ants.InputParam) {
pool, _ := ants.NewPoolWithFunc(ctx, func(i boost.InputParam) {
demoPoolFunc(i)
wg.Done()
})
Expand All @@ -148,7 +155,7 @@ var _ = Describe("Ants", func() {
defer cancel()

var wg sync.WaitGroup
pool, _ := ants.NewPoolWithFunc(ctx, AntsSize, func(i ants.InputParam) {
pool, _ := ants.NewPoolWithFunc(ctx, func(i boost.InputParam) {
demoPoolFunc(i)
wg.Done()
}, ants.WithPreAlloc(true))
Expand Down
Loading

0 comments on commit d4c1a83

Please sign in to comment.