forked from kata-containers/runtime
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor.go
144 lines (118 loc) · 2.51 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"sync"
"time"
"github.com/pkg/errors"
)
const (
defaultCheckInterval = 1 * time.Second
watcherChannelSize = 128
)
type monitor struct {
sync.Mutex
sandbox *Sandbox
checkInterval time.Duration
watchers []chan error
wg sync.WaitGroup
running bool
stopCh chan bool
}
func newMonitor(s *Sandbox) *monitor {
return &monitor{
sandbox: s,
checkInterval: defaultCheckInterval,
stopCh: make(chan bool, 1),
}
}
func (m *monitor) newWatcher() (chan error, error) {
m.Lock()
defer m.Unlock()
watcher := make(chan error, watcherChannelSize)
m.watchers = append(m.watchers, watcher)
if !m.running {
m.running = true
m.wg.Add(1)
// create and start agent watcher
go func() {
tick := time.NewTicker(m.checkInterval)
for {
select {
case <-m.stopCh:
tick.Stop()
m.wg.Done()
return
case <-tick.C:
m.watchHypervisor()
m.watchAgent()
}
}
}()
}
return watcher, nil
}
func (m *monitor) notify(err error) {
m.sandbox.agent.markDead()
m.Lock()
defer m.Unlock()
if !m.running {
return
}
// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()
for _, c := range m.watchers {
// throw away message can not write to channel
// make it not stuck, the first error is useful.
select {
case c <- err:
default:
virtLog.WithField("channel-size", watcherChannelSize).Warnf("watcher channel is full, throw notify message")
}
}
}
func (m *monitor) stop() {
// wait outside of monitor lock for the watcher channel to exit.
defer m.wg.Wait()
m.Lock()
defer m.Unlock()
if !m.running {
return
}
m.stopCh <- true
defer func() {
m.watchers = nil
m.running = false
}()
// a watcher is not supposed to close the channel
// but just in case...
defer func() {
if x := recover(); x != nil {
virtLog.Warnf("watcher closed channel: %v", x)
}
}()
for _, c := range m.watchers {
close(c)
}
}
func (m *monitor) watchAgent() {
err := m.sandbox.agent.check()
if err != nil {
// TODO: define and export error types
m.notify(errors.Wrapf(err, "failed to ping agent"))
}
}
func (m *monitor) watchHypervisor() error {
if err := m.sandbox.hypervisor.check(); err != nil {
m.notify(errors.Wrapf(err, "failed to ping hypervisor process"))
return err
}
return nil
}