-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexecutor.go
84 lines (64 loc) · 2.07 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package executor
import (
"github.com/romana/rlog"
)
type TAggregator struct {
Count int
Channel chan *TWorkerDescriptor
}
type TFramedExecutor struct {
SW TSLServiceWorker
Aggregator *TAggregator
chJobs []*TChannelContainer
}
func (psJob *TFramedExecutor) MRun() (err error) {
psJob.Aggregator = &TAggregator{Channel: make(chan *TWorkerDescriptor)}
//iterating over services that should be started
for _, sSW := range psJob.SW {
if !sSW.Service.MIsRunning() {
rlog.Debugf("Service '%s' is not running. Starting",
sSW.Service.MGetName())
err = sSW.Service.MStart()
if err != nil {
return err
}
}
rlog.Debugf("Service '%s' started; Running the job", sSW.Service.MGetName())
sSW.Container = &TChannelContainer{sSW.Service.MGetName(), make(chan bool)}
go sSW.runner()
go psJob.listener(sSW.Container)
}
psJob.mListenAggregator()
rlog.Trace(6, "Job well done")
return nil
}
func (psJob *TFramedExecutor) mListenAggregator() {
for pswDescriptor := range psJob.Aggregator.Channel {
rlog.Tracef(5, "Channels to keep track of: %d", psJob.Aggregator.Count)
switch pswDescriptor.Status {
case StatusRunning:
rlog.Debugf("Worker for '%s' is running", pswDescriptor.Name)
case StatusDone:
rlog.Warnf("Worker for '%s' stopped working", pswDescriptor.Name)
default:
rlog.Errorf("%d is unknow status for worker '%s'",
pswDescriptor.Status, pswDescriptor.Name)
}
if psJob.Aggregator.Count == 0 {
close(psJob.Aggregator.Channel)
}
rlog.Trace(7, "Aggregator channel closed")
}
}
func (psJob *TFramedExecutor) listener(psContainer *TChannelContainer) {
psJob.Aggregator.Count++
rlog.Tracef(5, "Amount of workers is increased to: %d by %s",
psJob.Aggregator.Count, psContainer.Name)
for _ = range psContainer.Channel {
psJob.Aggregator.Channel <- &TWorkerDescriptor{psContainer.Name, StatusRunning}
}
psJob.Aggregator.Count--
rlog.Tracef(5, "Amount of workers is decreased to: %d by %s",
psJob.Aggregator.Count, psContainer.Name)
psJob.Aggregator.Channel <- &TWorkerDescriptor{psContainer.Name, StatusDone}
}