This repository has been archived by the owner on Feb 20, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
/
workers.go
250 lines (224 loc) · 6.19 KB
/
workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package workers
import (
"context"
"errors"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
var defaultWatchSignals = []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL}
// Worker Contains the work function. Allows an input and output to a channel or another worker for pipeline work.
// Return nil if you want the Runner to continue otherwise any error will cause the Runner to shutdown and return the
// error.
type Worker interface {
Work(in interface{}, out chan<- interface{}) error
}
// Runner Handles the running the Worker logic.
type Runner interface {
BeforeFunc(func(ctx context.Context) error) Runner
AfterFunc(func(ctx context.Context, err error) error) Runner
SetDeadline(t time.Time) Runner
SetTimeout(duration time.Duration) Runner
SetFollower()
Send(in interface{})
InFrom(w ...Runner) Runner
SetOut(chan interface{})
Start() Runner
Stop() chan error
Wait() error
}
type runner struct {
ctx context.Context
cancel context.CancelFunc
inChan chan interface{}
outChan chan interface{}
errChan chan error
signalChan chan os.Signal
limiter chan struct{}
afterFunc func(ctx context.Context, err error) error
workFunc func(in interface{}, out chan<- interface{}) error
beforeFunc func(ctx context.Context) error
timeout time.Duration
deadline time.Duration
isLeader bool
stopCalled bool
numWorkers int64
lock *sync.RWMutex
wg *sync.WaitGroup
done *sync.Once
once *sync.Once
}
// NewRunner Factory function for a new Runner. The Runner will handle running the workers logic.
func NewRunner(ctx context.Context, w Worker, numWorkers int64) Runner {
var runnerCtx, runnerCancel = context.WithCancel(ctx)
var runner = &runner{
ctx: runnerCtx,
cancel: runnerCancel,
inChan: make(chan interface{}, numWorkers),
outChan: nil,
errChan: make(chan error, 1),
signalChan: make(chan os.Signal, 1),
limiter: make(chan struct{}, numWorkers),
afterFunc: func(ctx context.Context, err error) error { return err },
workFunc: w.Work,
beforeFunc: func(ctx context.Context) error { return nil },
numWorkers: numWorkers,
isLeader: true,
lock: new(sync.RWMutex),
wg: new(sync.WaitGroup),
once: new(sync.Once),
done: new(sync.Once),
}
runner.waitForSignal(defaultWatchSignals...)
return runner
}
// Send Send an object to the worker for processing.
func (r *runner) Send(in interface{}) {
select {
case <-r.ctx.Done():
return
case r.inChan <- in:
}
}
// InFrom Set a worker to accept output from another worker(s).
func (r *runner) InFrom(w ...Runner) Runner {
r.SetFollower()
for _, wr := range w {
wr.SetOut(r.inChan)
}
return r
}
// SetFollower Sets the worker as a follower and does not need to close it's in channel.
func (r *runner) SetFollower() {
r.lock.Lock()
r.isLeader = false
r.lock.Unlock()
}
// Start Starts the worker on processing.
func (r *runner) Start() Runner {
r.startWork()
return r
}
// BeforeFunc Function to be run before worker starts processing.
func (r *runner) BeforeFunc(f func(ctx context.Context) error) Runner {
r.beforeFunc = f
return r
}
// AfterFunc Function to be run after worker has stopped.
func (r *runner) AfterFunc(f func(ctx context.Context, err error) error) Runner {
r.afterFunc = f
return r
}
// SetOut Allows the setting of a workers out channel, if not already set.
func (r *runner) SetOut(c chan interface{}) {
if r.outChan != nil {
return
}
r.outChan = c
}
// SetDeadline allows a time to be set when the workers should stop.
// Deadline needs to be handled by the IsDone method.
func (r *runner) SetDeadline(t time.Time) Runner {
r.lock.Lock()
defer r.lock.Unlock()
r.ctx, r.cancel = context.WithDeadline(r.ctx, t)
return r
}
// SetTimeout allows a time duration to be set when the workers should stop.
// Timeout needs to be handled by the IsDone method.
func (r *runner) SetTimeout(duration time.Duration) Runner {
r.lock.Lock()
defer r.lock.Unlock()
r.timeout = duration
return r
}
// Wait calls stop on workers and waits for the channel to drain.
// !!Should only be called when certain nothing will send to worker.
func (r *runner) Wait() error {
r.waitForDrain()
if err := <-r.Stop(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
}
// Stop Stops the processing of a worker and closes it's channel in.
// Returns a blocking channel with type error.
// !!Should only be called when certain nothing will send to worker.
func (r *runner) Stop() chan error {
r.done.Do(func() {
if r.inChan != nil && r.isLeader {
close(r.inChan)
}
})
return r.errChan
}
// IsDone returns a channel signaling the workers context has been canceled.
func (r *runner) IsDone() <-chan struct{} {
return r.ctx.Done()
}
// waitForSignal make sure we wait for a term signal and shutdown correctly
func (r *runner) waitForSignal(signals ...os.Signal) {
go func() {
signal.Notify(r.signalChan, signals...)
<-r.signalChan
if r.cancel != nil {
r.cancel()
}
}()
}
// waitForDrain Waits for the limiter to be zeroed out and the in channel to be empty.
func (r *runner) waitForDrain() {
for len(r.limiter) > 0 || len(r.inChan) > 0 {
// Wait for the drain.
}
}
// startWork Runs the before function and starts processing until one of three things happen.
// 1. A term signal is received or cancellation of context.
// 2. Stop function is called.
// 3. Worker returns an error.
func (r *runner) startWork() {
var err error
if err = r.beforeFunc(r.ctx); err != nil {
r.errChan <- err
return
}
if r.timeout > 0 {
r.ctx, r.cancel = context.WithTimeout(r.ctx, r.timeout)
}
r.wg.Add(1)
go func() {
var workerWG = new(sync.WaitGroup)
var closeOnce = new(sync.Once)
// write out error if not nil on exit.
defer func() {
workerWG.Wait()
r.errChan <- err
closeOnce.Do(func() {
if r.outChan != nil {
close(r.outChan)
}
})
r.wg.Done()
}()
for in := range r.inChan {
input := in
r.limiter <- struct{}{}
workerWG.Add(1)
go func() {
defer func() {
<-r.limiter
workerWG.Done()
}()
if err := r.workFunc(input, r.outChan); err != nil {
r.once.Do(func() {
r.errChan <- err
r.cancel()
return
})
}
}()
}
}()
}