-
Notifications
You must be signed in to change notification settings - Fork 25
/
node_pool_manager.go
533 lines (451 loc) · 16.2 KB
/
node_pool_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
package updatestrategy
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/cenkalti/backoff"
log "github.com/sirupsen/logrus"
"github.com/zalando-incubator/cluster-lifecycle-manager/api"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
policy "k8s.io/client-go/pkg/apis/policy/v1beta1"
)
const (
mirrorPodAnnotation = "kubernetes.io/config.mirror"
multiplePDBsErrMsg = "This pod has more than one PodDisruptionBudget"
maxConflictRetries = 50
// podEvictionHeadroom is the extra time we wait to catch situations when the pod is ignoring SIGTERM and
// is killed with SIGKILL after TerminationGracePeriodSeconds
// Same headroom as the cluster-autoscaler:
// https://github.com/kubernetes/autoscaler/blob/cluster-autoscaler-1.2.2/cluster-autoscaler/core/scale_down.go#L77
podEvictionHeadroom = 30 * time.Second
)
// NodePoolManager defines an interface for managing node pools when performing
// update operations.
type NodePoolManager interface {
GetPool(nodePool *api.NodePool) (*NodePool, error)
LabelNode(node *Node, labelKey, labelValue string) error
TaintNode(node *Node, taintKey, taintValue string, effect v1.TaintEffect) error
ScalePool(ctx context.Context, nodePool *api.NodePool, replicas int) error
TerminateNode(ctx context.Context, node *Node, decrementDesired bool) error
CordonNode(node *Node) error
}
// KubernetesNodePoolManager defines a node pool manager which uses the
// Kubernetes API along with a node pool provider backend to manage node pools.
type KubernetesNodePoolManager struct {
kube kubernetes.Interface
backend ProviderNodePoolsBackend
logger *log.Entry
maxEvictTimeout time.Duration
}
// NewKubernetesNodePoolManager initializes a new Kubernetes NodePool manager
// which can manage single node pools based on the nodes registered in the
// Kubernetes API and the related NodePoolBackend for those nodes e.g.
// ASGNodePool.
func NewKubernetesNodePoolManager(logger *log.Entry, kubeClient kubernetes.Interface, poolBackend ProviderNodePoolsBackend, maxEvictTimeout time.Duration) *KubernetesNodePoolManager {
return &KubernetesNodePoolManager{
kube: kubeClient,
backend: poolBackend,
logger: logger,
maxEvictTimeout: maxEvictTimeout,
}
}
// GetPool gets the current node Pool from the node pool backend and attaches
// the Kubernetes node object name and labels to the corresponding nodes.
func (m *KubernetesNodePoolManager) GetPool(nodePoolDesc *api.NodePool) (*NodePool, error) {
nodePool, err := m.backend.Get(nodePoolDesc)
if err != nil {
return nil, err
}
// TODO: labelselector based on nodePool name. Can't do it yet because of how we create node pools in CLM
// https://github.com/zalando-incubator/cluster-lifecycle-manager/issues/226
kubeNodes, err := m.kube.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return nil, err
}
instanceIDMap := make(map[string]v1.Node)
for _, node := range kubeNodes.Items {
instanceIDMap[node.Spec.ProviderID] = node
}
nodes := make([]*Node, 0, len(instanceIDMap))
for _, npNode := range nodePool.Nodes {
if node, ok := instanceIDMap[npNode.ProviderID]; ok {
n := &Node{
ProviderID: npNode.ProviderID,
FailureDomain: npNode.FailureDomain,
Generation: npNode.Generation,
Ready: npNode.Ready,
Name: node.Name,
Labels: node.Labels,
Taints: node.Spec.Taints,
Cordoned: node.Spec.Unschedulable,
VolumesAttached: len(node.Status.VolumesAttached) > 0,
}
// TODO(mlarsen): Think about how this could be
// enabled. Currently it's not enabled because nodes
// will be NotReady when flannel is not running,
// meaning we can get stuck on initial provisioning
// when the daemonset hasn't been submitted yet.
// if n.Ready {
// n.Ready = v1.IsNodeReady(&node)
// }
nodes = append(nodes, n)
}
}
// if a node is not found in Kubernetes we don't consider it ready
// and thus doesn't include it in the list of nodes
nodePool.Current = len(nodes)
nodePool.Nodes = nodes
return nodePool, nil
}
// LabelNode labels a Kubernetes node object in case the label is not already
// defined.
func (m *KubernetesNodePoolManager) LabelNode(node *Node, labelKey, labelValue string) error {
if value, ok := node.Labels[labelKey]; !ok || value != labelValue {
label := []byte(fmt.Sprintf(`{"metadata": {"labels": {"%s": "%s"}}}`, labelKey, labelValue))
_, err := m.kube.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, label)
if err != nil {
return err
}
}
return nil
}
// updateTaint adds a taint with the provided key, value and effect if it isn't present or
// updates an existing one. Returns true if anything was changed.
func updateTaint(node *v1.Node, taintKey, taintValue string, effect v1.TaintEffect) bool {
for i, taint := range node.Spec.Taints {
if taint.Key == taintKey {
if taint.Value == taintValue && taint.Effect == effect {
return false
}
node.Spec.Taints[i].Value = taintValue
node.Spec.Taints[i].Effect = effect
return true
}
}
node.Spec.Taints = append(node.Spec.Taints, v1.Taint{
Key: taintKey,
Value: taintValue,
Effect: effect,
})
return true
}
// TaintNode sets a taint on a Kubernetes node object with a specified value and effect.
func (m *KubernetesNodePoolManager) TaintNode(node *Node, taintKey, taintValue string, effect v1.TaintEffect) error {
// fast check: verify if the taint is already set
for _, taint := range node.Taints {
if taint.Key == taintKey && taint.Value == taintValue && taint.Effect == effect {
return nil
}
}
taintNode := func() error {
// re-fetch the node since we're going to do an update
updatedNode, err := m.kube.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil {
return backoff.Permanent(err)
}
if updateTaint(updatedNode, taintKey, taintValue, effect) {
_, err := m.kube.CoreV1().Nodes().Update(updatedNode)
if err != nil {
// automatically retry if there was a conflicting update.
serr, ok := err.(*apiErrors.StatusError)
if ok && serr.Status().Reason == metav1.StatusReasonConflict {
return err
}
return backoff.Permanent(err)
}
}
return nil
}
backoffCfg := backoff.WithMaxTries(backoff.NewConstantBackOff(1*time.Second), maxConflictRetries)
return backoff.Retry(taintNode, backoffCfg)
}
// TerminateNode terminates a node and optionally decrement the desired size of
// the node pool. Before a node is terminated it's drained to ensure that pods
// running on the nodes are gracefully terminated.
func (m *KubernetesNodePoolManager) TerminateNode(ctx context.Context, node *Node, decrementDesired bool) error {
err := m.drain(ctx, node)
if err != nil {
return err
}
if err = ctx.Err(); err != nil {
return err
}
err = m.backend.Terminate(node, decrementDesired)
if err != nil {
return err
}
// remove the node resource from Kubernetes. This ensures that the same
// empty node is not drained multiple times.
return m.kube.CoreV1().Nodes().Delete(node.Name, nil)
}
// ScalePool scales a nodePool to the specified number of replicas.
// On scale down it will attempt to do it gracefully by draining the nodes
// before terminating them.
func (m *KubernetesNodePoolManager) ScalePool(ctx context.Context, nodePool *api.NodePool, replicas int) error {
var pool *NodePool
var err error
// in case we are scaling down to 0 replicas, disable the autoscaler to
// not fight with it.
if replicas == 0 {
err := m.backend.SuspendAutoscaling(nodePool)
if err != nil {
return err
}
}
for {
pool, err = WaitForDesiredNodes(ctx, m.logger, m, nodePool)
if err != nil {
return err
}
if pool.Current < replicas {
return m.backend.Scale(nodePool, replicas)
}
if pool.Current > replicas {
// pick a random node to terminate
if len(pool.Nodes) < 1 {
return errors.New("expected at least 1 node in the node pool, found 0")
}
node := pool.Nodes[0]
// if there are already cordoned nodes prefer one of those
cordonedNodes := filterNodesToTerminate(pool.Nodes)
if len(cordonedNodes) > 0 {
node = cordonedNodes[0]
}
err := m.CordonNode(node)
if err != nil {
return err
}
err = m.TerminateNode(ctx, node, true)
if err != nil {
return err
}
continue
}
return nil
}
}
// drain tries to evict all of the pods on a node.
// TODO: optimization: concurrent pod eviction.
func (m *KubernetesNodePoolManager) drain(ctx context.Context, node *Node) error {
m.logger.WithField("node", node.Name).Info("Draining node")
// mark node as draining
if err := m.LabelNode(node, lifecycleStatusLabel, lifecycleStatusDraining); err != nil {
return err
}
// evictAll is a function that tries to evict all evictable pods from a particular node exactly
// once in order of appearance. If it encounters errors due to pod disruption budget violation it
// ignores this pod and continues with the next. The function returns any error encountered,
// including the most recent error produced by a pod disruption budget violation.
evictAll := func() error {
pods, err := m.getPodsByNode(node.Name)
if err != nil {
return err
}
var lastPDBViolationErr error
for _, pod := range pods.Items {
// we check at the start because there's a continue in the loop body
if err = ctx.Err(); err != nil {
return backoff.Permanent(err)
}
// Don't bother with this pod if it's not evictable.
if !m.isEvictablePod(pod) {
continue
}
err = evictPod(m.kube, m.logger, &pod)
if err != nil {
if apiErrors.IsTooManyRequests(err) || isMultiplePDBsErr(err) {
m.logger.WithFields(log.Fields{
"ns": pod.Namespace,
"pod": pod.Name,
"node": pod.Spec.NodeName,
}).Info("Pod Disruption Budget violated")
lastPDBViolationErr = err
continue
}
return err
}
}
return lastPDBViolationErr
}
// We try to evict all pods of a node by calling evict on all of them once. If we encounter an
// error we will backoff and try again for as long as `maxEvictTimeout`. If after `maxEvictTimeout`
// we still receive an error related to pod disruption budget violations we will continue and
// forcefully shutdown the pod in the next step.
backoffCfg := backoff.NewExponentialBackOff()
backoffCfg.MaxElapsedTime = m.maxEvictTimeout
err := backoff.Retry(evictAll, backoffCfg)
if err != nil {
if !apiErrors.IsTooManyRequests(err) && !isMultiplePDBsErr(err) {
return err
}
}
pods, err := m.getPodsByNode(node.Name)
if err != nil {
return err
}
// Delete all remaining evictable pods disregarding their pod disruption budgets. It's necessary
// in case a pod disruption budget must be violated in order to proceed with a cluster update.
for _, pod := range pods.Items {
// Don't bother with this pod if it's not evictable.
if !m.isEvictablePod(pod) {
continue
}
err := m.kube.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{
GracePeriodSeconds: pod.Spec.TerminationGracePeriodSeconds,
})
if err != nil {
return err
}
if err = ctx.Err(); err != nil {
return err
}
logger := m.logger.WithFields(log.Fields{
"ns": pod.Namespace,
"pod": pod.Name,
"node": pod.Spec.NodeName,
})
// wait for pod to be terminated and gone from the node.
err = waitForPodTermination(m.kube, pod)
if err != nil {
logger.Warnf("Pod not terminated within grace period: %s", err)
}
logger.Info("Pod deleted")
}
return nil
}
// isMultiplePDBsErr returns true if the error is caused by multiple PDBs
// defined for a single pod.
func isMultiplePDBsErr(err error) bool {
return strings.Contains(err.Error(), multiplePDBsErrMsg)
}
// evictPod tries to evict a pod from a node.
// Note: this is defined as a variable so it can be easily mocked in tests.
var evictPod = func(client kubernetes.Interface, logger *log.Entry, pod *v1.Pod) error {
localLogger := logger.WithFields(log.Fields{
"ns": pod.Namespace,
"pod": pod.Name,
"node": pod.Spec.NodeName,
})
eviction := &policy.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
DeleteOptions: &metav1.DeleteOptions{
GracePeriodSeconds: pod.Spec.TerminationGracePeriodSeconds,
},
}
err := client.CoreV1().Pods(pod.Namespace).Evict(eviction)
if err != nil {
return err
}
localLogger.Info("Evicting pod")
// wait for the pod to be actually evicted and gone from the node.
// It has TerminationGracePeriodSeconds time to clean up.
start := time.Now().UTC()
err = waitForPodTermination(client, *pod)
if err != nil {
localLogger.Warnf("Pod not terminated within grace period: %s", err)
}
localLogger.Infof("Pod evicted. (Observed termination period: %s)", time.Now().UTC().Sub(start))
return nil
}
// waitForPodTermination waits for a pod to be terminated by looking up the pod
// in the API server.
// It waits for up to TerminationGracePeriodSeconds as specified on the pod +
// an additional eviction head room.
// This is to fully respect the termination expectations as described in:
// https://kubernetes.io/docs/concepts/workloads/pods/pod/#termination-of-pods
func waitForPodTermination(client kubernetes.Interface, pod v1.Pod) error {
if pod.Spec.TerminationGracePeriodSeconds == nil {
// if no grace period is defined, we don't wait.
return nil
}
waitForTermination := func() error {
newpod, err := client.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
if err != nil {
if apiErrors.IsNotFound(err) {
return nil
}
return err
}
// statefulset pods have the same name after restart, check the uid as well
if newpod.GetObjectMeta().GetUID() == pod.GetObjectMeta().GetUID() {
return fmt.Errorf("pod not terminated")
}
return nil
}
gracePeriod := time.Duration(*pod.Spec.TerminationGracePeriodSeconds)*time.Second + podEvictionHeadroom
backoffCfg := backoff.NewExponentialBackOff()
backoffCfg.MaxElapsedTime = gracePeriod
return backoff.Retry(waitForTermination, backoffCfg)
}
// CordonNode marks a node unschedulable.
func (m *KubernetesNodePoolManager) CordonNode(node *Node) error {
unschedulable := []byte(`{"spec": {"unschedulable": true}}`)
_, err := m.kube.CoreV1().Nodes().Patch(node.Name, types.StrategicMergePatchType, unschedulable)
return err
}
// getPodsByNode returns all pods currently scheduled to a node, regardless of their status.
func (m *KubernetesNodePoolManager) getPodsByNode(nodeName string) (*v1.PodList, error) {
opts := metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName),
}
return m.kube.CoreV1().Pods(v1.NamespaceAll).List(opts)
}
// isEvictablePod detects whether it makes sense to evict a pod.
// Non-evictable pods are pods managed by DaemonSets and mirror pods.
func (m *KubernetesNodePoolManager) isEvictablePod(pod v1.Pod) bool {
logger := m.logger.WithFields(log.Fields{
"ns": pod.Namespace,
"pod": pod.Name,
"node": pod.Spec.NodeName,
})
if _, ok := pod.Annotations[mirrorPodAnnotation]; ok {
logger.Debug("Mirror Pod not evictable")
return false
}
if pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed {
logger.Debug("Terminated pod (%s) not evictable", pod.Status.Phase)
return false
}
for _, owner := range pod.GetOwnerReferences() {
if owner.Kind == "DaemonSet" {
logger.Debug("DaemonSet Pod not evictable")
return false
}
}
return true
}
// WaitForDesiredNodes waits for the current number of nodes to match the
// desired number. The final node pool will be returned.
func WaitForDesiredNodes(ctx context.Context, logger *log.Entry, n NodePoolManager, nodePoolDesc *api.NodePool) (*NodePool, error) {
ctx, cancel := context.WithTimeout(ctx, operationMaxTimeout)
defer cancel()
var err error
var nodePool *NodePool
for {
nodePool, err = n.GetPool(nodePoolDesc)
if err != nil {
return nil, err
}
readyNodes := len(nodePool.ReadyNodes())
if readyNodes == nodePool.Desired {
break
}
logger.WithFields(log.Fields{"node-pool": nodePoolDesc.Name}).
Infof("Waiting for ready and desired number of nodes to match: %d/%d", readyNodes, nodePool.Desired)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(operationCheckInterval):
}
}
return nodePool, nil
}