-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool.go
94 lines (77 loc) · 1.42 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package task
import (
"sync"
)
type PoolFunc func(*Mission, ...interface{})
type PoolWeakFunc func(...interface{})
type poolParam struct {
f PoolFunc
m *Mission
arg []interface{}
}
type poolWeakParam struct {
f PoolWeakFunc
arg []interface{}
}
type Pool struct {
param_ch chan interface{}
m *Mission
cc_once sync.Once
}
func NewPool(m *Mission, cnt int) *Pool {
pch := make(chan interface{}, cnt)
p := &Pool{m: m, param_ch: pch}
for cnt > 0 {
go p.worker(p.m.New())
cnt--
}
return p
}
func (p *Pool) worker(m *Mission) {
defer m.Done()
for i := range p.param_ch {
switch param := i.(type) {
case poolParam:
select {
case <-p.m.RecvCancel():
param.m.Cancel()
default:
}
param.f(param.m, param.arg...)
case poolWeakParam:
select {
case <-p.m.RecvCancel():
default:
param.f(param.arg...)
}
}
}
}
func (p *Pool) cancel() {
p.m.Cancel()
close(p.param_ch)
}
func (p *Pool) Cancel() {
p.cc_once.Do(p.cancel)
}
func (p *Pool) Recv() <-chan struct{} {
return p.m.RecvDone()
}
func (p *Pool) Close() {
p.Cancel()
p.m.Done()
}
func (p *Pool) Do(f PoolFunc, m *Mission, args ...interface{}) {
select {
case <-p.m.RecvCancel():
m.Cancel()
f(m, args...)
case p.param_ch <- poolParam{f: f, m: m, arg: args}:
}
}
func (p *Pool) WeakDo(f PoolWeakFunc, args ...interface{}) {
select {
case <-p.m.RecvCancel():
case p.param_ch <- poolWeakParam{f: f, arg: args}:
}
}