Skip to content

Commit

Permalink
Update to use RepairStatements
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam committed Nov 18, 2024
1 parent d6b3853 commit 66b9eda
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 38 deletions.
16 changes: 9 additions & 7 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type CloudProvider struct {
CreatedNodeClaims map[string]*v1.NodeClaim
Drifted cloudprovider.DriftReason
NodeClassGroupVersionKind []schema.GroupVersionKind
RepairPolicy []cloudprovider.RepairPolicy
}

func NewCloudProvider() *CloudProvider {
Expand Down Expand Up @@ -94,6 +95,13 @@ func (c *CloudProvider) Reset() {
Kind: "",
},
}
c.RepairPolicy = []cloudprovider.RepairPolicy{
{
ConditionType: "BadNode",
ConditionStatus: corev1.ConditionFalse,
TolerationDuration: 30 * time.Minute,
},
}
}

func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) {
Expand Down Expand Up @@ -264,13 +272,7 @@ func (c *CloudProvider) IsDrifted(context.Context, *v1.NodeClaim) (cloudprovider
}

func (c *CloudProvider) RepairPolicies() []cloudprovider.RepairPolicy {
return []cloudprovider.RepairPolicy{
{
ConditionType: "BadNode",
ConditionStatus: corev1.ConditionFalse,
TolerationDuration: 30 * time.Minute,
},
}
return c.RepairPolicy
}

// Name returns the CloudProvider implementation name.
Expand Down
50 changes: 29 additions & 21 deletions pkg/controllers/node/health/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package health

import (
"context"
"fmt"
"time"

"github.com/samber/lo"
Expand Down Expand Up @@ -65,32 +64,21 @@ func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcil
ctx = injection.WithControllerName(ctx, "node.health")
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(node.Namespace, node.Name)))

// Validate that the node is owned by us and is not being deleted
// Validate that the node is owned by us
nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
return reconcile.Result{}, nodeutils.IgnoreNodeClaimNotFoundError(err)
}

log.FromContext(ctx).V(1).Info("found nodeclaim")
// If find if a node is unhealthy
healthCondition, foundHealthCondition := lo.Find(c.cloudProvider.RepairPolicies(), func(policy cloudprovider.RepairPolicy) bool {
nodeCondition := nodeutils.GetCondition(node, policy.ConditionType)
for _, condition := range node.Status.Conditions {
if condition.Type == policy.ConditionType && nodeCondition.Status == policy.ConditionStatus {
terminationTime := condition.LastTransitionTime.Add(policy.TolerationDuration)
return !c.clock.Now().Before(terminationTime)
}
}
return false
})
unhealthyNodeCondition, policyTerminationDuration := c.findUnhealthyConditions(node)
if unhealthyNodeCondition == nil {
return reconcile.Result{}, nil
}

log.FromContext(ctx).V(1).Info(fmt.Sprintf("found status: %v", foundHealthCondition))
// If the Node is unhealthy, but has not reached it's full toleration disruption
// requeue at the termination time of the unhealthy node
terminationTime := nodeutils.GetCondition(node, healthCondition.ConditionType).LastTransitionTime.Add(healthCondition.TolerationDuration)
log.FromContext(ctx).V(1).Info(fmt.Sprintf("time now: %s, terminate: %s", c.clock.Now().String(), terminationTime.String()))
if !foundHealthCondition {
log.FromContext(ctx).V(1).Info("termination time")
terminationTime := unhealthyNodeCondition.LastTransitionTime.Add(policyTerminationDuration)
if c.clock.Now().Before(terminationTime) {
return reconcile.Result{RequeueAfter: terminationTime.Sub(c.clock.Now())}, nil
}

Expand All @@ -99,19 +87,39 @@ func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcil
return reconcile.Result{}, client.IgnoreNotFound(err)
}
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, err
return reconcile.Result{}, client.IgnoreNotFound(err)
}

// The deletion timestamp has successfully been set for the Node, update relevant metrics.
log.FromContext(ctx).V(1).Info("deleting unhealthy node")
metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{
metrics.ReasonLabel: string(healthCondition.ConditionType),
metrics.ReasonLabel: string(unhealthyNodeCondition.Type),
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: node.Labels[v1.CapacityTypeLabelKey],
})
return reconcile.Result{}, nil
}

// Find a node with a condition that matches one of the unhealthy conditions defined by the cloud provider
// If there are multiple unhealthy status condition we will requeue based on the condition closest to its terminationDuration
func (c *Controller) findUnhealthyConditions(node *corev1.Node) (nc *corev1.NodeCondition, cpTerminationDuration time.Duration) {
requeueTime := time.Time{}
for _, policy := range c.cloudProvider.RepairPolicies() {
// check the status and the type on the condition
nodeCondition := nodeutils.GetCondition(node, policy.ConditionType)
if nodeCondition.Status == policy.ConditionStatus {
terminationTime := nodeCondition.LastTransitionTime.Add(policy.TolerationDuration)
// Determine requeue time
if requeueTime.IsZero() || requeueTime.After(terminationTime) {
nc = lo.ToPtr(nodeCondition)
cpTerminationDuration = policy.TolerationDuration
requeueTime = terminationTime
}
}
}
return nc, cpTerminationDuration
}

func (c *Controller) annotateTerminationGracePeriod(ctx context.Context, nodeClaim *v1.NodeClaim) error {
stored := nodeClaim.DeepCopy()
nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: c.clock.Now().Format(time.RFC3339)})
Expand Down
42 changes: 33 additions & 9 deletions pkg/controllers/node/health/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,17 @@ import (
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
clock "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/cache"

"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/karpenter/pkg/apis"
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/node/health"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
nodeclaimlifecycle "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
Expand All @@ -49,8 +46,6 @@ import (

var ctx context.Context
var healthController *health.Controller
var terminationController *termination.Controller
var nodeClaimController *nodeclaimlifecycle.Controller
var env *test.Environment
var fakeClock *clock.FakeClock
var cloudProvider *fake.CloudProvider
Expand All @@ -75,8 +70,6 @@ var _ = BeforeSuite(func() {
recorder = test.NewEventRecorder()
queue = terminator.NewTestingQueue(env.Client, recorder)
healthController = health.NewController(env.Client, cloudProvider, fakeClock)
terminationController = termination.NewController(fakeClock, env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder)
nodeClaimController = nodeclaimlifecycle.NewController(fakeClock, env.Client, cloudProvider, events.NewRecorder(&record.FakeRecorder{}))
})

var _ = AfterSuite(func() {
Expand Down Expand Up @@ -195,8 +188,25 @@ var _ = Describe("Node Health", func() {
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.Annotations).To(HaveKeyWithValue(v1.NodeClaimTerminationTimestampAnnotationKey, fakeClock.Now().Format(time.RFC3339)))
})
It("should return the requeue interval for the time between now and when the nodeClaim termination time", func() {
It("should return the requeue interval for the condition closest to its terminationDuration", func() {
cloudProvider.RepairPolicy = []cloudprovider.RepairPolicy{
{
ConditionType: "BadNode",
ConditionStatus: corev1.ConditionFalse,
TolerationDuration: 60 * time.Minute,
},
{
ConditionType: "ValidUnhealthyCondition",
ConditionStatus: corev1.ConditionFalse,
TolerationDuration: 30 * time.Minute,
},
}
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "ValidUnhealthyCondition",
Status: corev1.ConditionFalse,
// We expect the last transition for HealthyNode condition to wait 30 minutes
LastTransitionTime: metav1.Time{Time: time.Now()},
}, corev1.NodeCondition{
Type: "BadNode",
Status: corev1.ConditionFalse,
// We expect the last transition for HealthyNode condition to wait 30 minutes
Expand All @@ -210,6 +220,20 @@ var _ = Describe("Node Health", func() {
fmt.Println(result.RequeueAfter.String())
Expect(result.RequeueAfter).To(BeNumerically("~", time.Minute*3, time.Second))
})
It("should return the requeue interval for the time between now and when the nodeClaim termination time", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "BadNode",
Status: corev1.ConditionFalse,
// We expect the last transition for HealthyNode condition to wait 30 minutes
LastTransitionTime: metav1.Time{Time: time.Now()},
})
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)

fakeClock.Step(27 * time.Minute)

result := ExpectObjectReconciled(ctx, env.Client, healthController, node)
Expect(result.RequeueAfter).To(BeNumerically("~", time.Minute*3, time.Second))
})
})

Context("Forceful termination", func() {
Expand Down
8 changes: 7 additions & 1 deletion pkg/controllers/nodeclaim/lifecycle/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ func (c *Controller) finalize(ctx context.Context, nodeClaim *v1.NodeClaim) (rec
return reconcile.Result{}, nil
}
if err := c.ensureTerminationGracePeriodTerminationTimeAnnotation(ctx, nodeClaim); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("adding nodeclaim terminationGracePeriod annotation, %w", err)
}

Expand Down Expand Up @@ -265,7 +268,10 @@ func (c *Controller) annotateTerminationGracePeriodTerminationTime(ctx context.C
stored := nodeClaim.DeepCopy()
nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: terminationTime})

if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil {
// We use client.MergeFromWithOptimisticLock because patching a terminationGracePeriod annotation
// can cause races with the health controller, as that controller sets the current time as the terminationGracePeriod annotation
// Here, We want to resolve any conflict and not overwrite the terminationGracePeriod annotation
if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
return client.IgnoreNotFound(err)
}
log.FromContext(ctx).WithValues(v1.NodeClaimTerminationTimestampAnnotationKey, terminationTime).Info("annotated nodeclaim")
Expand Down

0 comments on commit 66b9eda

Please sign in to comment.