-
Notifications
You must be signed in to change notification settings - Fork 2
/
pooled_actuator.go
127 lines (105 loc) · 2.83 KB
/
pooled_actuator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package conexec
import (
"context"
"fmt"
"runtime"
"sync"
"time"
"github.com/panjf2000/ants/v2"
)
var (
// ErrorUsingActuator is the error when goroutine pool has exception
ErrorUsingActuator = fmt.Errorf("ErrorUsingActuator")
)
// GoroutinePool is the base routine pool interface
// User can use custom goroutine pool by implementing this interface
type GoroutinePool interface {
Submit(f func()) error
Release()
}
// PooledActuator is a actuator which has a worker pool
type PooledActuator struct {
timeout *time.Duration
workerNum int
pool GoroutinePool
initOnce sync.Once
}
// NewPooledActuator creates an PooledActuator instance
func NewPooledActuator(workerNum int, opt ...*Options) *PooledActuator {
c := &PooledActuator{
workerNum: workerNum,
}
setOptions(c, opt...)
return c
}
// WithPool will support for using custom goroutine pool
func (c *PooledActuator) WithPool(pool GoroutinePool) *PooledActuator {
newActuator := c.clone()
newActuator.pool = pool
return newActuator
}
// Exec is used to run tasks concurrently
func (c *PooledActuator) Exec(tasks ...Task) error {
return c.ExecWithContext(context.Background(), tasks...)
}
// ExecWithContext uses goroutine pool to run tasks concurrently
// Return nil when tasks are all completed successfully,
// or return error when some exception happen such as timeout
func (c *PooledActuator) ExecWithContext(ctx context.Context, tasks ...Task) error {
// ensure the actuator can init correctly
c.initOnce.Do(func() {
c.initPooledActuator()
})
if c.workerNum == -1 {
return ErrorUsingActuator
}
return execTasks(ctx, c, c.runWithPool, tasks...)
}
// GetTimeout return the timeout set before
func (c *PooledActuator) GetTimeout() *time.Duration {
return c.timeout
}
// Release will release the pool
func (c *PooledActuator) Release() {
if c.pool != nil {
c.pool.Release()
}
}
// initPooledActuator init the pooled actuator once while the runtime
// If the workerNum is zero or negative,
// default worker num will be used
func (c *PooledActuator) initPooledActuator() {
if c.pool != nil {
// just pass
c.workerNum = 1
return
}
if c.workerNum <= 0 {
c.workerNum = runtime.NumCPU() << 1
}
var err error
c.pool, err = ants.NewPool(c.workerNum)
if err != nil {
c.workerNum = -1
fmt.Println("initPooledActuator err")
}
}
// runWithPool used the goroutine pool to execute the tasks
func (c *PooledActuator) runWithPool(f func()) {
err := c.pool.Submit(f)
if err != nil {
fmt.Printf("submit task err:%s\n", err.Error())
}
}
// setTimeout sets the timeout
func (c *PooledActuator) setTimeout(timeout *time.Duration) {
c.timeout = timeout
}
// clone will clone this PooledActuator without goroutine pool
func (c *PooledActuator) clone() *PooledActuator {
return &PooledActuator{
timeout: c.timeout,
workerNum: c.workerNum,
initOnce: sync.Once{},
}
}