From 4dbc2ba5c5d99ea1c3f59b68c2cb1e869ed6aff4 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Thu, 19 Dec 2024 08:12:11 +0100 Subject: [PATCH] Scheduler: LastOrdinal based on replicas instead of FreeCap (#8388) When scaling down and compacting, basing the last ordinal on the free capacity structure leads to have a lastOrdinal off by one since `FreeCap` might contain the free capacity for unschedulable pods. We will have to continue including unschduelable pods in FreeCap because it might happen that a pod becomes unscheduleble for external reasons like node gets shutdown for pods with lower ordinals and the pod need to be rescheduled and during that time period we want to consider those when compacting; once all vpods that were on that pod that is gone get rescheduled, FreeCap will only include scheduleable pods. Signed-off-by: Pierangelo Di Pilato --- pkg/scheduler/scheduler.go | 2 + pkg/scheduler/state/state.go | 56 ++++++++------------ pkg/scheduler/state/state_test.go | 14 ++--- pkg/scheduler/statefulset/autoscaler.go | 18 ++----- pkg/scheduler/statefulset/autoscaler_test.go | 47 ++++++++++++++-- 5 files changed, 80 insertions(+), 57 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 62dcf163d29..0dd4f2b6c8c 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -42,6 +42,8 @@ type VPodLister func() ([]VPod, error) // Evictor allows for vreplicas to be evicted. // For instance, the evictor is used by the statefulset scheduler to // move vreplicas to pod with a lower ordinal. +// +// pod might be `nil`. type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) error // Scheduler is responsible for placing VPods into real Kubernetes pods diff --git a/pkg/scheduler/state/state.go b/pkg/scheduler/state/state.go index 9d4503b9158..4f3ed659793 100644 --- a/pkg/scheduler/state/state.go +++ b/pkg/scheduler/state/state.go @@ -44,15 +44,14 @@ type StateAccessor interface { // It is used by for the scheduler and the autoscaler type State struct { // free tracks the free capacity of each pod. + // + // Including pods that might not exist anymore, it reflects the free capacity determined by + // placements in the vpod status. FreeCap []int32 // schedulable pods tracks the pods that aren't being evicted. SchedulablePods []int32 - // LastOrdinal is the ordinal index corresponding to the last statefulset replica - // with placed vpods. - LastOrdinal int32 - // Pod capacity. Capacity int32 @@ -143,14 +142,10 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) { return nil, err } - free := make([]int32, 0) + freeCap := make([]int32, 0) pending := make(map[types.NamespacedName]int32, 4) expectedVReplicasByVPod := make(map[types.NamespacedName]int32, len(vpods)) schedulablePods := sets.NewInt32() - last := int32(-1) - - // keep track of (vpod key, podname) pairs with existing placements - withPlacement := make(map[types.NamespacedName]map[string]bool) podSpread := make(map[types.NamespacedName]map[string]int32) @@ -172,7 +167,7 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) { } for _, p := range schedulablePods.List() { - free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) + freeCap = s.updateFreeCapacity(logger, freeCap, PodNameFromOrdinal(s.statefulSetName, p), 0) } // Getting current state from existing placements for all vpods @@ -182,16 +177,13 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) { pending[vpod.GetKey()] = pendingFromVPod(vpod) expectedVReplicasByVPod[vpod.GetKey()] = vpod.GetVReplicas() - withPlacement[vpod.GetKey()] = make(map[string]bool) podSpread[vpod.GetKey()] = make(map[string]int32) for i := 0; i < len(ps); i++ { podName := ps[i].PodName vreplicas := ps[i].VReplicas - free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas) - - withPlacement[vpod.GetKey()][podName] = true + freeCap = s.updateFreeCapacity(logger, freeCap, podName, vreplicas) pod, err := s.podLister.Get(podName) if err != nil { @@ -204,8 +196,17 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) { } } - state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, StatefulSetName: s.statefulSetName, PodLister: s.podLister, - PodSpread: podSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod} + state := &State{ + FreeCap: freeCap, + SchedulablePods: schedulablePods.List(), + Capacity: s.capacity, + Replicas: scale.Spec.Replicas, + StatefulSetName: s.statefulSetName, + PodLister: s.podLister, + PodSpread: podSpread, + Pending: pending, + ExpectedVReplicaByVPod: expectedVReplicasByVPod, + } logger.Infow("cluster state info", zap.Any("state", state)) @@ -219,23 +220,19 @@ func pendingFromVPod(vpod scheduler.VPod) int32 { return int32(math.Max(float64(0), float64(expected-scheduled))) } -func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { +func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, freeCap []int32, podName string, vreplicas int32) []int32 { ordinal := OrdinalFromPodName(podName) - free = grow(free, ordinal, s.capacity) + freeCap = grow(freeCap, ordinal, s.capacity) - free[ordinal] -= vreplicas + freeCap[ordinal] -= vreplicas // Assert the pod is not overcommitted - if free[ordinal] < 0 { + if overcommit := freeCap[ordinal]; overcommit < 0 { // This should not happen anymore. Log as an error but do not interrupt the current scheduling. - logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) - } - - if ordinal > last { - last = ordinal + logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("overcommit", overcommit)) } - return free, last + return freeCap } func (s *State) TotalPending() int32 { @@ -283,23 +280,16 @@ func (s *State) MarshalJSON() ([]byte, error) { type S struct { FreeCap []int32 `json:"freeCap"` SchedulablePods []int32 `json:"schedulablePods"` - LastOrdinal int32 `json:"lastOrdinal"` Capacity int32 `json:"capacity"` Replicas int32 `json:"replicas"` - NumZones int32 `json:"numZones"` - NumNodes int32 `json:"numNodes"` - NodeToZoneMap map[string]string `json:"nodeToZoneMap"` StatefulSetName string `json:"statefulSetName"` PodSpread map[string]map[string]int32 `json:"podSpread"` - NodeSpread map[string]map[string]int32 `json:"nodeSpread"` - ZoneSpread map[string]map[string]int32 `json:"zoneSpread"` Pending map[string]int32 `json:"pending"` } sj := S{ FreeCap: s.FreeCap, SchedulablePods: s.SchedulablePods, - LastOrdinal: s.LastOrdinal, Capacity: s.Capacity, Replicas: s.Replicas, StatefulSetName: s.StatefulSetName, diff --git a/pkg/scheduler/state/state_test.go b/pkg/scheduler/state/state_test.go index b1f518d95df..b2dfcb651e5 100644 --- a/pkg/scheduler/state/state_test.go +++ b/pkg/scheduler/state/state_test.go @@ -58,14 +58,14 @@ func TestStateBuilder(t *testing.T) { name: "no vpods", replicas: int32(0), vpods: [][]duckv1alpha1.Placement{}, - expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}}, + expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}}, freec: int32(0), }, { name: "one vpods", replicas: int32(1), vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}}, - expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { "statefulset-name-0": 1, @@ -88,7 +88,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 2}}, {{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}}, }, - expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(0), int32(1), int32(2)}, LastOrdinal: 2, Replicas: 3, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(0), int32(1), int32(2)}, Replicas: 3, StatefulSetName: sfsName, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { "statefulset-name-0": 1, @@ -124,7 +124,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 2}}, {{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}}, }, - expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(1), int32(2)}, LastOrdinal: 2, Replicas: 3, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(1), int32(2)}, Replicas: 3, StatefulSetName: sfsName, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { "statefulset-name-2": 5, @@ -157,7 +157,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 0}}, {{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}}, }, - expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, Replicas: 4, StatefulSetName: sfsName, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { "statefulset-name-0": 1, @@ -188,7 +188,7 @@ func TestStateBuilder(t *testing.T) { name: "three vpods but one tainted and one with no zone label", replicas: int32(1), vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}}, - expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { "statefulset-name-0": 1, @@ -207,7 +207,7 @@ func TestStateBuilder(t *testing.T) { name: "one vpod (HA)", replicas: int32(1), vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}}, - expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { "statefulset-name-0": 1, diff --git a/pkg/scheduler/statefulset/autoscaler.go b/pkg/scheduler/statefulset/autoscaler.go index 8b61ca4a83c..653ec12f156 100644 --- a/pkg/scheduler/statefulset/autoscaler.go +++ b/pkg/scheduler/statefulset/autoscaler.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/integer" @@ -250,17 +251,8 @@ func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State) error { zap.Any("state", s), ) - // when there is only one pod there is nothing to move or number of pods is just enough! - if s.LastOrdinal < 1 || len(s.SchedulablePods) <= 1 { - return nil - } - - // Determine if there is enough free capacity to - // move all vreplicas placed in the last pod to pods with a lower ordinal - freeCapacity := s.FreeCapacity() - s.Free(s.LastOrdinal) - usedInLastPod := s.Capacity - s.Free(s.LastOrdinal) - - if freeCapacity >= usedInLastPod { + // Determine if there are vpods that need compaction + if s.Replicas != int32(len(s.FreeCap)) { a.lastCompactAttempt = time.Now() err := a.compact(s) if err != nil { @@ -285,9 +277,9 @@ func (a *autoscaler) compact(s *st.State) error { for i := len(placements) - 1; i >= 0; i-- { //start from the last placement ordinal := st.OrdinalFromPodName(placements[i].PodName) - if ordinal == s.LastOrdinal { + if ordinal >= s.Replicas { pod, err = s.PodLister.Get(placements[i].PodName) - if err != nil { + if err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err) } diff --git a/pkg/scheduler/statefulset/autoscaler_test.go b/pkg/scheduler/statefulset/autoscaler_test.go index 441c6cab7a8..ea8d3fb77de 100644 --- a/pkg/scheduler/statefulset/autoscaler_test.go +++ b/pkg/scheduler/statefulset/autoscaler_test.go @@ -470,9 +470,7 @@ func TestCompactor(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(2)}}), }, - wantEvictions: map[types.NamespacedName][]duckv1alpha1.Placement{ - {Name: "vpod-1", Namespace: testNs}: {{PodName: "statefulset-name-1", VReplicas: int32(2)}}, - }, + wantEvictions: nil, }, { name: "multiple vpods, with placements in multiple pods, compacted", @@ -500,8 +498,49 @@ func TestCompactor(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: int32(2)}, {PodName: "statefulset-name-2", VReplicas: int32(7)}}), }, + wantEvictions: nil, + }, + { + name: "multiple vpods, scale down, with placements in multiple pods, compacted", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 10, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(3)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + tscheduler.NewVPod(testNs, "vpod-2", 10, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(7)}, + {PodName: "statefulset-name-2", VReplicas: int32(3)}}), + }, wantEvictions: map[types.NamespacedName][]duckv1alpha1.Placement{ - {Name: "vpod-2", Namespace: testNs}: {{PodName: "statefulset-name-2", VReplicas: int32(7)}}, + {Name: "vpod-2", Namespace: testNs}: {{PodName: "statefulset-name-2", VReplicas: int32(3)}}, + }, + }, + { + name: "multiple vpods, scale down multiple, with placements in multiple pods, compacted", + replicas: int32(1), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 6, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(3)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}, + {PodName: "statefulset-name-2", VReplicas: int32(6)}, + }), + tscheduler.NewVPod(testNs, "vpod-2", 3, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(7)}, + {PodName: "statefulset-name-2", VReplicas: int32(3)}, + {PodName: "statefulset-name-3", VReplicas: int32(2)}, + {PodName: "statefulset-name-10", VReplicas: int32(1)}, + }), + }, + wantEvictions: map[types.NamespacedName][]duckv1alpha1.Placement{ + {Name: "vpod-1", Namespace: testNs}: { + {PodName: "statefulset-name-2", VReplicas: int32(6)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}, + }, + {Name: "vpod-2", Namespace: testNs}: { + {PodName: "statefulset-name-10", VReplicas: int32(1)}, + {PodName: "statefulset-name-3", VReplicas: int32(2)}, + {PodName: "statefulset-name-2", VReplicas: int32(3)}, + }, }, }, }