Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(boost): implement manifold func pool (#276) #282

Merged
merged 1 commit into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"cSpell.words": [
"Alloc",
"Assistable",
"Berthe",
"binaryheap",
"bodyclose",
"cenkalti",
Expand Down
8 changes: 7 additions & 1 deletion boost/ants-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ package boost
import "github.com/snivilised/lorax/internal/ants"

type (
Option = ants.Option
IDGenerator = ants.IDGenerator
InputParam = ants.InputParam
Option = ants.Option
PoolFunc = ants.PoolFunc
TaskFunc = ants.TaskFunc
)

var (
WithDisablePurge = ants.WithDisablePurge
WithExpiryDuration = ants.WithExpiryDuration
WithGenerator = ants.WithGenerator
WithMaxBlockingTasks = ants.WithMaxBlockingTasks
WithNonblocking = ants.WithNonblocking
WithOptions = ants.WithOptions
WithOutput = ants.WithOutput
WithPanicHandler = ants.WithPanicHandler
WithPreAlloc = ants.WithPreAlloc
)
23 changes: 23 additions & 0 deletions boost/base-pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package boost

import (
"sync"
"sync/atomic"
)

type (
basePool[O any] struct {
wg *sync.WaitGroup
sequence int32
outputDupCh *Duplex[JobOutput[O]]
ending bool
}
)

func (p *basePool[O]) next() int32 {
return atomic.AddInt32(&p.sequence, int32(1))
}

func (p *basePool[O]) Observe() JobOutputStreamR[O] {
return p.outputDupCh.ReaderCh
}
12 changes: 5 additions & 7 deletions boost/boost-public-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ const (
type (
Job[I any] struct {
ID string
Input I
SequenceNo int
Input I
}

JobOutput[O any] struct {
Payload O
ID string
SequenceNo int
Payload O
Error error
}

JobStream[I any] chan Job[I]
Expand Down Expand Up @@ -47,11 +50,6 @@ type (
PoolResultStreamR = <-chan *PoolResult
PoolResultStreamW = chan<- *PoolResult

// IDGenerator is a sequential unique id generator interface
IDGenerator interface {
Generate() string
}

// Next is a sequential unique id generator func type
Next func() string
)
Expand Down
6 changes: 3 additions & 3 deletions boost/examples/alpha/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func main() {

const NoW = 3

pool, _ := boost.NewFuncPool[int, int](ctx, NoW, func(inputCh ants.InputParam) {
n, _ := inputCh.(int)
pool, _ := boost.NewFuncPool[int, int](ctx, NoW, func(input ants.InputParam) {
n, _ := input.(int)
fmt.Printf("<--- (n: %v)🍒 \n", n)
time.Sleep(time.Second)
}, &wg, ants.WithNonblocking(false))
Expand All @@ -59,7 +59,7 @@ func main() {
fmt.Printf("POST: <--- (n: %v) [%v] 🍊 \n", i, time.Now().Format(time.TimeOnly))
}

fmt.Printf("pool with func, running workers number:%d\n",
fmt.Printf("pool with func, no of running workers:%d\n",
pool.Running(),
)
wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion boost/examples/beta/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func main() {
fmt.Printf("POST: <--- (n: %v) [%v] 🍊 \n", i, time.Now().Format(time.TimeOnly))
}

fmt.Printf("task pool, running workers number:%d\n",
fmt.Printf("task pool, no of running workers:%d\n",
pool.Running(),
)
wg.Wait()
Expand Down
118 changes: 118 additions & 0 deletions boost/examples/gamma/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/snivilised/lorax/boost"
)

// Demonstrates use of manifold func base worker pool where
// the client manifold func returns an output and an error. An
// output channel is created through which the client receives
// all generated outputs.

const (
AntsSize = 1000
n = 100000
OutputChSize = 10
Param = 100
OutputChTimeout = time.Second / 2 // do not use a value that is similar to interval
interval = time.Second / 10
)

func produce(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
) {
defer wg.Done()

// Only the producer (observable) knows when the workload is complete
// but clearly it has no idea when the worker-pool is complete. Initially,
// one might think that the worker-pool knows when work is complete
// but this is in correct. The pool only knows when the pool is dormant,
// not that no more jobs will be submitted.
// This poses a problem from the perspective of the consumer; it does
// not know when to exit its output processing loop.
// What this indicates to us is that the knowledge of end of workload is
// a combination of multiple events:
//
// 1) The producer knows when it will submit no more work
// 2) The pool knows when all it's workers are dormant
//
// A non deterministic way for the consumer to exit it's output processing
// loop, is to use a timeout. But what is a sensible value? Only the client
// knows this and even so, it can't really be sure no more outputs will
// arrive after the timeout; essentially its making an educated guess, which
// is not reliable.
//
for i, n := 0, 100; i < n; i++ {
_ = pool.Post(ctx, Param)
}

pool.EndWork(ctx, interval)
}

func consume(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
) {
defer wg.Done()

rch := pool.Observe()
for {
select {
case output, ok := <-rch:
if !ok {
return
} else {
fmt.Printf("🍒 payload: '%v', id: '%v', seq: '%v' (e: '%v')\n",
output.Payload, output.ID, output.SequenceNo, output.Error,
)
}
case <-time.After(OutputChTimeout):
fmt.Printf("⏱️ timeout!\n")
return
case <-ctx.Done():
fmt.Printf("❌ cancelled!\n")
return
}
}
}

func main() {
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, func(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}, &wg,
boost.WithOutput(OutputChSize),
)

defer pool.Release(ctx)

if err != nil {
fmt.Printf("🔥 error creating pool: '%v'\n", err)
return
}

wg.Add(1)
go produce(ctx, pool, &wg) //nolint:wsl // pendant

wg.Add(1)
go consume(ctx, pool, &wg) //nolint:wsl // pendant

fmt.Printf("pool with func, no of running workers:%d\n",
pool.Running(),
)
wg.Wait()
fmt.Println("🏁 (manifold-func-pool) FINISHED")
}
49 changes: 49 additions & 0 deletions boost/generic-pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package boost

import (
"context"

"github.com/snivilised/lorax/internal/ants"
)

// functionalPool
type functionalPool struct {
pool *ants.PoolWithFunc
}

func (p *functionalPool) Post(ctx context.Context, job InputParam) error {
return p.pool.Invoke(ctx, job)
}

func (p *functionalPool) Release(ctx context.Context) {
p.pool.Release(ctx)
}

func (p *functionalPool) Running() int {
return p.pool.Running()
}

func (p *functionalPool) Waiting() int {
return p.pool.Waiting()
}

// taskPool
type taskPool struct {
pool *ants.Pool
}

func (p *taskPool) Post(ctx context.Context, task TaskFunc) error {
return p.pool.Submit(ctx, task)
}

func (p *taskPool) Release(ctx context.Context) {
p.pool.Release(ctx)
}

func (p *taskPool) Running() int {
return p.pool.Running()
}

func (p *taskPool) Waiting() int {
return p.pool.Waiting()
}
7 changes: 4 additions & 3 deletions boost/id-generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package boost

import (
"fmt"
"sync/atomic"
)

type Sequential struct {
Format string
id int
id int32
}

func (g *Sequential) Generate() string {
g.id++
n := atomic.AddInt32(&g.id, int32(1))

return fmt.Sprintf(g.Format, g.id)
return fmt.Sprintf(g.Format, n)
}
15 changes: 15 additions & 0 deletions boost/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package boost

// withDefaults prepends boost withDefaults to the sequence of options
func withDefaults(options ...Option) []Option {
const (
noDefaults = 1
)
o := make([]Option, 0, len(options)+noDefaults)
o = append(o, WithGenerator(&Sequential{
Format: "ID:%v",
}))
o = append(o, options...)

return o
}
19 changes: 0 additions & 19 deletions boost/pool-defs-internal.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package boost

import (
"sync"

"github.com/snivilised/lorax/internal/ants"
)

const (
// TODO: This is just temporary, channel size definition still needs to be
// fine tuned
Expand All @@ -29,19 +23,6 @@ type (
}

workersCollectionL[I, O any] map[workerID]*workerWrapperL[I, O]

basePool struct {
wg *sync.WaitGroup
idGen IDGenerator
}

taskPool struct {
pool *ants.Pool
}

functionalPool struct {
pool *ants.PoolWithFunc
}
)

// Worker pool types:
Expand Down
6 changes: 6 additions & 0 deletions boost/support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func demoPoolFunc(inputCh ants.InputParam) {
time.Sleep(time.Duration(n) * time.Millisecond)
}

func demoPoolManifoldFunc(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}

var stopLongRunningFunc int32

func longRunningFunc() {
Expand Down
Loading
Loading