forked from knative-extensions/security-guard
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservices.go
318 lines (269 loc) · 9.67 KB
/
services.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
/*
Copyright 2022 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"sync"
"time"
spec "knative.dev/security-guard/pkg/apis/guard/v1alpha1"
guardKubeMgr "knative.dev/security-guard/pkg/guard-kubemgr"
pi "knative.dev/security-guard/pkg/pluginterfaces"
)
const (
pileMergeLimit = uint32(1000)
numSamplesLimit = uint32(1000000)
pileLearnMinTime = 30 * time.Second // 30sec
guardianPersistMinTime = 5 * 60 * time.Second // 5min
)
// A cached record kept by guard-service for each deployed service
type serviceRecord struct {
ns string // namespace of the deployed service
sid string // name of the deployed service
cmFlag bool // indicate if the deployed service uses a ConfigMap (or CRD)
guardianSpec *spec.GuardianSpec // a copy of the cached deployed service Guardian (RO - no mutext needed)
pile spec.SessionDataPile // the deployed service Pile (RW - protected with pileMutex)
pileLastLearn time.Time // Last time we learned
guardianLastPersist time.Time // Last time we stored the guardian
guardianPersistCounter uint // Counter guardian peristed
guardianLearnCounter uint // Counter guardian learned
pileMergeCounter uint // Counter pile merged
recordMutex sync.Mutex // protect access to the record
alerts uint // num of alerts
deleted bool // mark that record was deleted
}
// service cache maintaining a cached record per deployed service
type services struct {
kmgr guardKubeMgr.KubeMgrInterface // KubeMgr to access KuebApi during cache misses
mutex sync.Mutex // protect access to cache map and to namespaces map
cache map[string]*serviceRecord // the cache
namespaces map[string]bool // list of namespaces to watch for changes in ConfigMaps and CRDs
records []*serviceRecord // list of records to periodically process learn and store during tick()
lastCreatedRecords time.Time // last time we created the records
}
// determine the cacheKey from its components
func serviceKey(ns string, sid string, cmFlag bool) string {
service := sid + "." + ns
if cmFlag {
service += ".cm"
}
return service
}
func newServices() *services {
s := new(services)
s.cache = make(map[string]*serviceRecord, 64)
s.namespaces = make(map[string]bool, 4)
s.kmgr = guardKubeMgr.NewKubeMgr()
return s
}
func (s *services) start() {
// cant be tested due to KubeMgr
s.kmgr.InitConfigs()
}
func (s *services) createRecords() {
if time.Since(s.lastCreatedRecords) < guardianPersistMinTime {
// no need to build the list until it is time to have a fresh look at all records
return
}
s.lastCreatedRecords = time.Now()
s.mutex.Lock()
defer s.mutex.Unlock()
// Assign more work to be done now and in future ticks
s.records = make([]*serviceRecord, len(s.cache))
i := 0
for _, r := range s.cache {
s.records[i] = r
i++
}
}
func (s *services) flushTickerRecords() {
s.mutex.Lock()
defer s.mutex.Unlock()
// Assign more work to be done now and in future ticks
s.records = make([]*serviceRecord, len(s.cache))
i := 0
for _, r := range s.cache {
s.records[i] = r
r.pileLastLearn = time.UnixMicro(0)
r.guardianLastPersist = time.UnixMicro(0)
i++
}
}
// Periodical background work to ensure:
// 1. Small unused piles are eventually learned
// 2. Learned unused guardians are eventually stored using KubeApi
// In some unrealistic case, it is possible that ~1K ticks (1000 seconds = ~20m)
// will be needed to persist all records (assuming all waiting to be persisted)
func (s *services) tick() {
// Tick should not include any asynchronous work
// Move all asynchronous work (e.g. KubeApi work) to go routines
// try up to 100 records per tick to find one that can be persisted
numRecordsToProcess := len(s.records)
if numRecordsToProcess == 0 {
// May loop over some ~10K service records
s.createRecords()
return
}
if numRecordsToProcess > 100 {
numRecordsToProcess = 100
}
// find a record to persist
// May loop and learn upto 100 service records + may persist 10
i := 0 // i is the index of the record to learn
persistCounter := 0 // number of records we persisted
for ; i < numRecordsToProcess; i++ {
r := s.records[i]
if !r.deleted {
if s.learnAndPersistGuardian(r) {
persistCounter++
if persistCounter > 10 {
i++
break
}
}
}
}
// remove the records we processed
s.records = s.records[i:]
}
// delete from cache
func (s *services) delete(ns string, sid string, cmFlag bool) {
service := serviceKey(ns, sid, cmFlag)
s.mutex.Lock()
defer s.mutex.Unlock()
if r, ok := s.cache[service]; ok {
r.deleted = true
}
delete(s.cache, service)
pi.Log.Debugf("deleteSession %s", service)
}
// get from cache or from KubeApi (or get a default Guardian)
// if new namespace, start watching this namespace for changes in guardians
func (s *services) get(ns string, sid string, cmFlag bool) *serviceRecord {
var knownNamespace bool = true
service := serviceKey(ns, sid, cmFlag)
s.mutex.Lock()
// check if known Namespace
_, knownNamespace = s.namespaces[ns]
if !knownNamespace {
s.namespaces[ns] = true
}
// try to get from cache
record := s.cache[service]
s.mutex.Unlock()
// Must unlock s.mutex before s.kmgr.Watch, s.kmgr.GetGuardian, s.set
// watch any unknown namespace
if !knownNamespace {
go s.kmgr.Watch(ns, cmFlag, s.update)
}
if record == nil {
// not cached, get from kubeApi or create a default and add to cache
record = s.set(ns, sid, cmFlag, s.kmgr.GetGuardian(ns, sid, cmFlag, true))
}
// record is never nil here
return record
}
// set to cache
// caller ensures that guardianSpec is never nil
func (s *services) set(ns string, sid string, cmFlag bool, guardianSpec *spec.GuardianSpec) *serviceRecord {
// we have a new guardianSpec from update() or from get()
if guardianSpec.Learned != nil {
guardianSpec.Learned.Prepare()
}
service := serviceKey(ns, sid, cmFlag)
s.mutex.Lock()
defer s.mutex.Unlock()
record, exists := s.cache[service]
if !exists {
record = new(serviceRecord)
record.pile.Clear()
//record.pileLastLearn = time.Now()
//record.guardianLastPersist = time.Now()
record.pileLastLearn = time.UnixMicro(0)
record.guardianLastPersist = time.UnixMicro(0)
record.ns = ns
record.sid = sid
record.cmFlag = cmFlag
s.cache[service] = record
}
record.guardianSpec = guardianSpec
return record
}
// update cache
// delete if guardianSpec is nil, set otherwise
func (s *services) update(ns string, sid string, cmFlag bool, guardianSpec *spec.GuardianSpec) {
pi.Log.Debugf("Update cache using watch: %s.%s", ns, sid)
if guardianSpec == nil {
s.delete(ns, sid, cmFlag)
} else {
s.set(ns, sid, cmFlag, guardianSpec)
}
}
// update the record pile by merging a new pile
func (s *services) mergeAndLearnAndPersistGuardian(record *serviceRecord, pile *spec.SessionDataPile) {
if pile != nil && pile.Count > 0 {
// Must unlock pileMutex before s.learnPile
record.pile.Merge(pile)
record.pileMergeCounter++
}
s.learnAndPersistGuardian(record)
}
// Update the guardian using the pile
// Persist guardian using KubeAPI
// Return true if persisted, false if not
func (s *services) learnAndPersistGuardian(record *serviceRecord) bool {
var shouldPersist bool
var shouldLearn bool
if record.guardianSpec.Learned == nil {
// Our first guardian
record.guardianSpec.Learned = new(spec.SessionDataConfig)
shouldPersist = true
shouldLearn = true
} else {
// we already have a critiria - do we need to learn again?
if record.pile.Count >= pileMergeLimit || record.guardianSpec.NumSamples < record.pile.Count*10 || time.Since(record.pileLastLearn) >= pileLearnMinTime {
shouldLearn = true
}
// we already have a critiria - do we need to persist?
if time.Since(record.guardianLastPersist) > guardianPersistMinTime {
shouldPersist = true
}
}
if shouldLearn && record.pile.Count > 0 {
// ok, lets learn
record.guardianSpec.Learned.Learn(&record.pile)
record.guardianSpec.NumSamples += record.pile.Count
if record.guardianSpec.NumSamples > numSamplesLimit {
record.guardianSpec.NumSamples = numSamplesLimit
}
record.pileLastLearn = time.Now()
record.pile.Clear()
record.guardianLearnCounter++
// Must unlock record.pileMutex before s.persist
}
if shouldPersist && record.guardianLastPersist.Before(record.pileLastLearn) {
// update the kubeApi record
record.guardianLastPersist = time.Now()
record.guardianPersistCounter++
go s.persistGuardian(record)
}
return shouldPersist
}
func (s *services) persistGuardian(record *serviceRecord) {
if err := s.kmgr.Set(record.ns, record.sid, record.cmFlag, record.guardianSpec); err != nil {
pi.Log.Infof("Failed to update KubeApi with new config %s.%s: %v", record.ns, record.sid, err)
} else {
pi.Log.Debugf("Update KubeApi with new config %s.%s", record.ns, record.sid)
}
}
func (s *services) deletePod(record *serviceRecord, podname string) {
s.kmgr.DeletePod(record.ns, podname)
}