Skip to content

Commit

Permalink
feat(ants,boost): add client context (#273)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed May 30, 2024
1 parent f541404 commit c2ea173
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 61 deletions.
67 changes: 67 additions & 0 deletions boost/examples/alpha/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/snivilised/lorax/boost"
"github.com/snivilised/lorax/internal/ants"
)

// Demonstrates that when all workers are engaged and the pool is at capacity,
// new incoming jobs are blocked, until a worker becomes free. The invoked function
// takes a second to complete. The PRE and POST indicators reflect this:
//
// PRE: <--- (n: 0) [13:56:22] 🍋
// => running: '0')
// POST: <--- (n: 0) [13:56:22] 🍊
// PRE: <--- (n: 1) [13:56:22] 🍋
// => running: '1')
// POST: <--- (n: 1) [13:56:22] 🍊
// PRE: <--- (n: 2) [13:56:22] 🍋
// => running: '2')
// POST: <--- (n: 2) [13:56:22] 🍊
// PRE: <--- (n: 3) [13:56:22] 🍋
// => running: '3')
// <--- (n: 2)🍒
// <--- (n: 1)🍒
// <--- (n: 0)🍒
// <--- (n: 3)🍒
// POST: <--- (n: 3) [13:56:23] 🍊
//
// Considering the above, whilst the pool is not at capacity, each new submission is
// executed immediately, as a new worker can be allocated to those jobs (n=0..2).
// Once the pool has reached capacity (n=3), the PRE is blocked, because its corresponding
// POST doesn't happen until a second later; this illustrates the blocking.
//

func main() {
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const NoW = 3

pool, _ := boost.NewFuncPool[int, int](ctx, NoW, func(inputCh ants.InputParam) {
n, _ := inputCh.(int)
fmt.Printf("<--- (n: %v)🍒 \n", n)
time.Sleep(time.Second)
}, &wg, ants.WithNonblocking(false))

defer pool.Release()

for i := 0; i < 30; i++ { // producer
fmt.Printf("PRE: <--- (n: %v) [%v] 🍋 \n", i, time.Now().Format(time.TimeOnly))
_ = pool.Post(i)
fmt.Printf("POST: <--- (n: %v) [%v] 🍊 \n", i, time.Now().Format(time.TimeOnly))
}

fmt.Printf("pool with func, running workers number:%d\n",
pool.Running(),
)
wg.Wait()
fmt.Println("🏁 (func-pool) FINISHED")
}
64 changes: 64 additions & 0 deletions boost/examples/beta/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/snivilised/lorax/boost"
)

// Demonstrates that when all workers are engaged and the pool is at capacity,
// new incoming jobs are blocked, until a worker becomes free. The invoked function
// takes a second to complete. The PRE and POST indicators reflect this:
//
// PRE: <--- (n: 0) [13:41:49] 🍋
// POST: <--- (n: 0) [13:41:49] 🍊
// PRE: <--- (n: 1) [13:41:49] 🍋
// POST: <--- (n: 1) [13:41:49] 🍊
// PRE: <--- (n: 2) [13:41:49] 🍋
// POST: <--- (n: 2) [13:41:49] 🍊
// PRE: <--- (n: 3) [13:41:49] 🍋
// => running: '3')
// <--- (n: 2)🍒
// => running: '3')
// <--- (n: 1)🍒
// => running: '3')
// <--- (n: 0)🍒
// POST: <--- (n: 3) [13:41:50] 🍊
//
// Considering the above, whilst the pool is not at capacity, each new submission is
// executed immediately, as a new worker can be allocated to those jobs (n=0..2).
// Once the pool has reached capacity (n=3), the PRE is blocked, because its corresponding
// POST doesn't happen until a second later; this illustrates the blocking.
//

func main() {
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const NoW = 3

pool, _ := boost.NewTaskPool[int, int](ctx, NoW, &wg)

defer pool.Release()

for i := 0; i < 30; i++ { // producer
fmt.Printf("PRE: <--- (n: %v) [%v] 🍋 \n", i, time.Now().Format(time.TimeOnly))
_ = pool.Post(func() {
fmt.Printf("=> running: '%v')\n", pool.Running())
fmt.Printf("<--- (n: %v)🍒 \n", i)
time.Sleep(time.Second)
})
fmt.Printf("POST: <--- (n: %v) [%v] 🍊 \n", i, time.Now().Format(time.TimeOnly))
}

fmt.Printf("task pool, running workers number:%d\n",
pool.Running(),
)
wg.Wait()
fmt.Println("🏁 (task-pool) FINISHED")
}
2 changes: 2 additions & 0 deletions boost/pool-defs-internal.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package boost

import (
"context"
"sync"

"github.com/snivilised/lorax/internal/ants"
Expand Down Expand Up @@ -31,6 +32,7 @@ type (
workersCollectionL[I, O any] map[workerID]*workerWrapperL[I, O]

basePool struct {
ctx context.Context
idGen IDGenerator
wg *sync.WaitGroup
}
Expand Down
7 changes: 5 additions & 2 deletions boost/worker-pool-func.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package boost

import (
"context"
"sync"

"github.com/snivilised/lorax/internal/ants"
Expand All @@ -14,7 +15,8 @@ type WorkerPoolInvoker[I, O any] struct {

// NewFuncPool creates a new worker pool using the native ants interface; ie
// new jobs are submitted with Submit(task TaskFunc)
func NewFuncPool[I, O any](size int,
func NewFuncPool[I, O any](ctx context.Context,
size int,
pf ants.PoolFunc,
wg *sync.WaitGroup,
options ...Option,
Expand All @@ -24,13 +26,14 @@ func NewFuncPool[I, O any](size int,
// allocated for each job, but this is not necessarily
// the case, because each worker has its own job queue.
//
pool, err := ants.NewPoolWithFunc(size, func(i ants.InputParam) {
pool, err := ants.NewPoolWithFunc(ctx, size, func(i ants.InputParam) {
defer wg.Done()
pf(i)
}, options...)

return &WorkerPoolInvoker[I, O]{
basePool: basePool{
ctx: ctx,
idGen: &Sequential{},
wg: wg,
},
Expand Down
38 changes: 36 additions & 2 deletions boost/worker-pool-func_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package boost_test

import (
"context"
"sync"
"time"

. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
. "github.com/onsi/gomega" //nolint:revive // gomega ok

"github.com/snivilised/lorax/boost"
"github.com/snivilised/lorax/internal/ants"
)

var _ = Describe("WorkerPoolFunc", func() {
Context("ants", func() {
It("should: not fail", func() {
It("should: not fail", func(specCtx SpecContext) {
// TestNonblockingSubmit
var wg sync.WaitGroup

pool, err := boost.NewFuncPool[int, int](AntsSize, demoPoolFunc, &wg)
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

pool, err := boost.NewFuncPool[int, int](ctx, AntsSize, demoPoolFunc, &wg)

defer pool.Release()

Expand All @@ -30,5 +36,33 @@ var _ = Describe("WorkerPoolFunc", func() {

Expect(err).To(Succeed())
})

When("shorter", func() {
It("should: not fail", func(specCtx SpecContext) {
// TestNonblockingSubmit
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(specCtx)
defer cancel()

pool, err := boost.NewFuncPool[int, int](ctx, 3, func(inputCh ants.InputParam) {
n, _ := inputCh.(int)
time.Sleep(time.Duration(n) * time.Millisecond)
}, &wg)

defer pool.Release()

for i := 0; i < 10; i++ { // producer
_ = pool.Post(Param)
}
wg.Wait()
GinkgoWriter.Printf("pool with func, running workers number:%d\n",
pool.Running(),
)
ShowMemStats()

Expect(err).To(Succeed())
})
})
})
})
7 changes: 5 additions & 2 deletions boost/worker-pool-manifold.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package boost

import (
"context"
"sync"

"github.com/snivilised/lorax/internal/ants"
Expand All @@ -14,19 +15,21 @@ type WorkerPoolManifold[I, O any] struct {
sourceJobsChIn JobStream[I]
}

func NewManifoldPool[I, O any](size int,
func NewManifoldPool[I, O any](ctx context.Context,
size int,
pf ants.PoolFunc,
wg *sync.WaitGroup,
options ...Option,
) (*WorkerPoolManifold[I, O], error) {
pool, err := ants.NewPoolWithFunc(size, func(i ants.InputParam) {
pool, err := ants.NewPoolWithFunc(ctx, size, func(i ants.InputParam) {
defer wg.Done()

pf(i)
}, options...)

return &WorkerPoolManifold[I, O]{
basePool: basePool{
ctx: ctx,
idGen: &Sequential{},
wg: wg,
},
Expand Down
15 changes: 11 additions & 4 deletions boost/worker-pool-submitter.go → boost/worker-pool-task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package boost
// ManifoldE: func[I, O any]() O, error
//
import (
"context"
"sync"

"github.com/snivilised/lorax/internal/ants"
Expand All @@ -34,16 +35,18 @@ type WorkerPool[I, O any] struct {
sourceJobsChIn JobStream[I]
}

// NewSubmitterPool creates a new worker pool using the native ants interface; ie
// NewTaskPool creates a new worker pool using the native ants interface; ie
// new jobs are submitted with Submit(task TaskFunc)
func NewSubmitterPool[I, O any](size int,
func NewTaskPool[I, O any](ctx context.Context,
size int,
wg *sync.WaitGroup,
options ...Option,
) (*WorkerPool[I, O], error) {
pool, err := ants.NewPool(size, options...)
pool, err := ants.NewPool(ctx, size, options...)

return &WorkerPool[I, O]{
basePool: basePool{
ctx: ctx,
idGen: &Sequential{},
wg: wg,
},
Expand All @@ -56,13 +59,17 @@ func NewSubmitterPool[I, O any](size int,
func (p *WorkerPool[I, O]) Post(task ants.TaskFunc) error {
p.wg.Add(1)

return p.pool.Submit(func() {
return p.pool.Submit(p.ctx, func() {
defer p.wg.Done()

task()
})
}

func (p *WorkerPool[I, O]) Running() int {
return p.pool.Running()
}

func (p *WorkerPool[I, O]) Release() {
p.pool.Release()
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package boost_test

import (
"context"
"sync"

. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
Expand All @@ -10,13 +11,16 @@ import (
"github.com/snivilised/lorax/internal/ants"
)

var _ = Describe("WorkerPoolSubmitter", func() {
var _ = Describe("WorkerPoolTask", func() {
Context("ants", func() {
It("should: not fail", func() {
It("should: not fail", func(specCtx SpecContext) {
// TestNonblockingSubmit
var wg sync.WaitGroup

pool, err := boost.NewSubmitterPool[int, int](PoolSize, &wg,
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

pool, err := boost.NewTaskPool[int, int](ctx, PoolSize, &wg,
ants.WithNonblocking(true),
)
defer pool.Release()
Expand Down
Loading

0 comments on commit c2ea173

Please sign in to comment.