-
Notifications
You must be signed in to change notification settings - Fork 503
/
Copy pathprocesses.go
149 lines (135 loc) · 4.05 KB
/
processes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package processes
import (
"context"
"sync"
"sync/atomic"
"github.com/docker/buildx/build"
"github.com/docker/buildx/controller/pb"
"github.com/docker/buildx/util/ioset"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// Process provides methods to control a process.
type Process struct {
inEnd *ioset.Forwarder
invokeConfig *pb.InvokeConfig
errCh chan error
processCancel func()
serveIOCancel func()
}
// ForwardIO forwards process's io to the specified reader/writer.
// Optionally specify ioCancelCallback which will be called when
// the process closes the specified IO. This will be useful for additional cleanup.
func (p *Process) ForwardIO(in *ioset.In, ioCancelCallback func()) {
p.inEnd.SetIn(in)
if f := p.serveIOCancel; f != nil {
f()
}
p.serveIOCancel = ioCancelCallback
}
// Done returns a channel where error or nil will be sent
// when the process exits.
// TODO: change this to Wait()
func (p *Process) Done() <-chan error {
return p.errCh
}
// Manager manages a set of proceses.
type Manager struct {
container atomic.Value
processes sync.Map
}
// NewManager creates and returns a Manager.
func NewManager() *Manager {
return &Manager{}
}
// Get returns the specified process.
func (m *Manager) Get(id string) (*Process, bool) {
v, ok := m.processes.Load(id)
if !ok {
return nil, false
}
return v.(*Process), true
}
// CancelRunningProcesses cancels execution of all running processes.
func (m *Manager) CancelRunningProcesses() {
var funcs []func()
m.processes.Range(func(key, value any) bool {
funcs = append(funcs, value.(*Process).processCancel)
m.processes.Delete(key)
return true
})
for _, f := range funcs {
f()
}
}
// ListProcesses lists all running processes.
func (m *Manager) ListProcesses() (res []*pb.ProcessInfo) {
m.processes.Range(func(key, value any) bool {
res = append(res, &pb.ProcessInfo{
ProcessID: key.(string),
InvokeConfig: value.(*Process).invokeConfig,
})
return true
})
return res
}
// DeleteProcess deletes the specified process.
func (m *Manager) DeleteProcess(id string) error {
p, ok := m.processes.LoadAndDelete(id)
if !ok {
return errors.Errorf("unknown process %q", id)
}
p.(*Process).processCancel()
return nil
}
// StartProcess starts a process in the container.
// When a container isn't available (i.e. first time invoking or the container has exited) or cfg.Rollback is set,
// this method will start a new container and run the process in it. Otherwise, this method starts a new process in the
// existing container.
func (m *Manager) StartProcess(pid string, resultCtx *build.ResultHandle, cfg *pb.InvokeConfig) (*Process, error) {
// Get the target result to invoke a container from
var ctr *build.Container
if a := m.container.Load(); a != nil {
ctr = a.(*build.Container)
}
if cfg.Rollback || ctr == nil || ctr.IsUnavailable() {
go m.CancelRunningProcesses()
// (Re)create a new container if this is rollback or first time to invoke a process.
if ctr != nil {
go ctr.Cancel() // Finish the existing container
}
var err error
ctr, err = build.NewContainer(context.TODO(), resultCtx, cfg)
if err != nil {
return nil, errors.Errorf("failed to create container %v", err)
}
m.container.Store(ctr)
}
// [client(ForwardIO)] <-forwarder(switchable)-> [out] <-pipe-> [in] <- [process]
in, out := ioset.Pipe()
f := ioset.NewForwarder()
f.PropagateStdinClose = false
f.SetOut(&out)
// Register process
ctx, cancel := context.WithCancel(context.TODO())
var cancelOnce sync.Once
processCancelFunc := func() { cancelOnce.Do(func() { cancel(); f.Close(); in.Close(); out.Close() }) }
p := &Process{
inEnd: f,
invokeConfig: cfg,
processCancel: processCancelFunc,
errCh: make(chan error),
}
m.processes.Store(pid, p)
go func() {
var err error
if err = ctr.Exec(ctx, cfg, in.Stdin, in.Stdout, in.Stderr); err != nil {
logrus.Errorf("failed to exec process: %v", err)
}
logrus.Debugf("finished process %s %v", pid, cfg.Entrypoint)
m.processes.Delete(pid)
processCancelFunc()
p.errCh <- err
}()
return p, nil
}