-
Notifications
You must be signed in to change notification settings - Fork 1
/
gotask.go
139 lines (104 loc) · 3.19 KB
/
gotask.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package gotask
import (
"context"
"errors"
"math"
"sync"
"time"
)
type kvPair struct {
key string
value interface{}
}
func getDurationValueOrDefault(timeout []time.Duration) time.Duration {
if len(timeout) == 0 {
return time.Duration(math.MaxInt64)
} else {
return time.Second * timeout[0]
}
}
//Await function equivalent to .Net Awaiter
func Await(do func() (interface{}, error), timeout ...time.Duration) (interface{}, error) {
resultChannel := make(chan interface{})
errorChannel := make(chan error)
timeoutVal := getDurationValueOrDefault(timeout)
go func(do func() (interface{}, error)) {
result, err := do()
resultChannel <- result
errorChannel <- err
close(resultChannel)
close(errorChannel)
return
}(do)
select {
case result := <-resultChannel:
return result, nil
case err := <-errorChannel:
return nil, err
case <-time.After(timeoutVal):
return nil, errors.New("Timeout Exception")
}
}
/*WhenAll does: Assign the tasks in jobsSlice to multiple workers throttled by numberOfWorkers parameter
with an optional timeout value in seconds*/
func WhenAll(jobsSlice map[string]func() (interface{}, error), numberOfWorkers int, timeout ...time.Duration) (map[string]interface{}, map[string]error) {
results := make(map[string]interface{})
errs := make(map[string]error)
timeoutVal := getDurationValueOrDefault(timeout)
//parameters validation and correction
if jobsSlice == nil {
errs[""] = errors.New("Argument Exception")
return results, errs
}
//set numberOfWorkers no to exceed the jobs to be done
if numberOfWorkers == 0 || numberOfWorkers > len(jobsSlice) {
numberOfWorkers = len(jobsSlice)
}
//make the job queue
jobsChannel := make(chan kvPair, len(jobsSlice))
for key, function := range jobsSlice {
jobsChannel <- kvPair{key, function}
}
close(jobsChannel)
var workersWaitGroup sync.WaitGroup
resultsChannel := make(chan kvPair, len(jobsSlice))
errorsChannel := make(chan kvPair, len(jobsSlice))
ctx, cancel := context.WithTimeout(context.Background(), timeoutVal)
defer cancel()
workersWaitGroup.Add(len(jobsSlice))
for i := 0; i < numberOfWorkers; i++ {
//map
go func(jobsChannel chan kvPair, resultsChannel chan kvPair, errorsChannel chan kvPair, ctx context.Context, wg *sync.WaitGroup) {
for jobPair := range jobsChannel {
work := jobPair.value.(func() (interface{}, error))
ret, err := work()
select {
case <-time.After(1 * time.Nanosecond):
if err != nil {
errorsChannel <- kvPair{jobPair.key, err}
} else {
resultsChannel <- kvPair{jobPair.key, ret}
}
workersWaitGroup.Done()
// we received the signal of cancelation in this channel
case <-ctx.Done():
workersWaitGroup.Done()
errorsChannel <- kvPair{"", errors.New("Timeout Exception")}
}
}
}(jobsChannel, resultsChannel, errorsChannel, ctx, &workersWaitGroup)
}
//All works completed
workersWaitGroup.Wait()
//close all communication channels
close(resultsChannel)
close(errorsChannel)
//reduce
for resultKVPair := range resultsChannel {
results[resultKVPair.key] = resultKVPair.value
}
for errKVPair := range errorsChannel {
errs[errKVPair.key] = errKVPair.value.(error)
}
return results, errs
}