forked from zephinzer/godev
-
Notifications
You must be signed in to change notification settings - Fork 0
/
execution.group.go
83 lines (76 loc) · 2.43 KB
/
execution.group.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main
import (
"sync"
)
// ExecutionGroupCount keeps track of the execution group count for
// display in the verbose logs - helps to differentiate between
// the different execution groups
var ExecutionGroupCount = 0
// ExecutionGroup runs all commands in parallel
type ExecutionGroup struct {
commands []*Command
waitGroup sync.WaitGroup
logger *Logger
}
// IsRunning is for the Runner to check if the execution group
// is still running
func (executionGroup *ExecutionGroup) IsRunning() bool {
for _, command := range executionGroup.commands {
if command.IsRunning() {
return true
}
}
return false
}
// Run starts the execution group's commands in parallel
// and waits for all of them to exit
func (executionGroup *ExecutionGroup) Run() {
ExecutionGroupCount++
defer executionGroup.logger.Debugf("execution group[%v] exited", ExecutionGroupCount)
executionGroup.logger.Debugf("execution group[%v] is starting...", ExecutionGroupCount)
for _, command := range executionGroup.commands {
if err := command.IsValid(); err != nil {
executionGroup.logger.Error(err)
} else {
go func(commandStatus *chan error) {
for {
select {
case err := <-*commandStatus: // Command letting us know its done
executionGroup.handleCommandStatus(command, err)
return
default:
}
}
}(command.GetStatus())
executionGroup.logger.Tracef("command[%s] is starting", command.GetID())
executionGroup.waitGroup.Add(1)
go command.Run()
}
}
executionGroup.logger.Tracef("waiting for commands to complete running...")
executionGroup.waitGroup.Wait()
}
// Terminate terminates this instance of the execution group, used when
// the Runner receives a signal to start a new pipeline
func (executionGroup *ExecutionGroup) Terminate() {
for _, command := range executionGroup.commands {
if command.IsRunning() {
executionGroup.logger.Tracef("sending SIGINT to command %v", command.GetID())
command.SendInterrupt()
executionGroup.logger.Tracef("SIGINT sent to command %v", command.GetID())
}
}
}
func (executionGroup *ExecutionGroup) handleCommandStatus(command *Command, err error) {
defer func() {
if r := recover(); r != nil {
executionGroup.logger.Warn(r)
}
}()
if err != nil {
executionGroup.logger.Warnf("command[%s] exited with: %s", command.GetID(), err)
} else {
executionGroup.logger.Debugf("command[%s] exited without error", command.GetID())
}
executionGroup.waitGroup.Done()
}