diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 62dcf163d29..0dd4f2b6c8c 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -42,6 +42,8 @@ type VPodLister func() ([]VPod, error) // Evictor allows for vreplicas to be evicted. // For instance, the evictor is used by the statefulset scheduler to // move vreplicas to pod with a lower ordinal. +// +// pod might be `nil`. type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) error // Scheduler is responsible for placing VPods into real Kubernetes pods diff --git a/pkg/scheduler/state/state.go b/pkg/scheduler/state/state.go index 9d4503b9158..4f3ed659793 100644 --- a/pkg/scheduler/state/state.go +++ b/pkg/scheduler/state/state.go @@ -44,15 +44,14 @@ type StateAccessor interface { // It is used by for the scheduler and the autoscaler type State struct { // free tracks the free capacity of each pod. + // + // Including pods that might not exist anymore, it reflects the free capacity determined by + // placements in the vpod status. FreeCap []int32 // schedulable pods tracks the pods that aren't being evicted. SchedulablePods []int32 - // LastOrdinal is the ordinal index corresponding to the last statefulset replica - // with placed vpods. - LastOrdinal int32 - // Pod capacity. Capacity int32 @@ -143,14 +142,10 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) { return nil, err } - free := make([]int32, 0) + freeCap := make([]int32, 0) pending := make(map[types.NamespacedName]int32, 4) expectedVReplicasByVPod := make(map[types.NamespacedName]int32, len(vpods)) schedulablePods := sets.NewInt32() - last := int32(-1) - - // keep track of (vpod key, podname) pairs with existing placements - withPlacement := make(map[types.NamespacedName]map[string]bool) podSpread := make(map[types.NamespacedName]map[string]int32) @@ -172,7 +167,7 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) { } for _, p := range schedulablePods.List() { - free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0) + freeCap = s.updateFreeCapacity(logger, freeCap, PodNameFromOrdinal(s.statefulSetName, p), 0) } // Getting current state from existing placements for all vpods @@ -182,16 +177,13 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) { pending[vpod.GetKey()] = pendingFromVPod(vpod) expectedVReplicasByVPod[vpod.GetKey()] = vpod.GetVReplicas() - withPlacement[vpod.GetKey()] = make(map[string]bool) podSpread[vpod.GetKey()] = make(map[string]int32) for i := 0; i < len(ps); i++ { podName := ps[i].PodName vreplicas := ps[i].VReplicas - free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas) - - withPlacement[vpod.GetKey()][podName] = true + freeCap = s.updateFreeCapacity(logger, freeCap, podName, vreplicas) pod, err := s.podLister.Get(podName) if err != nil { @@ -204,8 +196,17 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) { } } - state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, StatefulSetName: s.statefulSetName, PodLister: s.podLister, - PodSpread: podSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod} + state := &State{ + FreeCap: freeCap, + SchedulablePods: schedulablePods.List(), + Capacity: s.capacity, + Replicas: scale.Spec.Replicas, + StatefulSetName: s.statefulSetName, + PodLister: s.podLister, + PodSpread: podSpread, + Pending: pending, + ExpectedVReplicaByVPod: expectedVReplicasByVPod, + } logger.Infow("cluster state info", zap.Any("state", state)) @@ -219,23 +220,19 @@ func pendingFromVPod(vpod scheduler.VPod) int32 { return int32(math.Max(float64(0), float64(expected-scheduled))) } -func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) { +func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, freeCap []int32, podName string, vreplicas int32) []int32 { ordinal := OrdinalFromPodName(podName) - free = grow(free, ordinal, s.capacity) + freeCap = grow(freeCap, ordinal, s.capacity) - free[ordinal] -= vreplicas + freeCap[ordinal] -= vreplicas // Assert the pod is not overcommitted - if free[ordinal] < 0 { + if overcommit := freeCap[ordinal]; overcommit < 0 { // This should not happen anymore. Log as an error but do not interrupt the current scheduling. - logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) - } - - if ordinal > last { - last = ordinal + logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("overcommit", overcommit)) } - return free, last + return freeCap } func (s *State) TotalPending() int32 { @@ -283,23 +280,16 @@ func (s *State) MarshalJSON() ([]byte, error) { type S struct { FreeCap []int32 `json:"freeCap"` SchedulablePods []int32 `json:"schedulablePods"` - LastOrdinal int32 `json:"lastOrdinal"` Capacity int32 `json:"capacity"` Replicas int32 `json:"replicas"` - NumZones int32 `json:"numZones"` - NumNodes int32 `json:"numNodes"` - NodeToZoneMap map[string]string `json:"nodeToZoneMap"` StatefulSetName string `json:"statefulSetName"` PodSpread map[string]map[string]int32 `json:"podSpread"` - NodeSpread map[string]map[string]int32 `json:"nodeSpread"` - ZoneSpread map[string]map[string]int32 `json:"zoneSpread"` Pending map[string]int32 `json:"pending"` } sj := S{ FreeCap: s.FreeCap, SchedulablePods: s.SchedulablePods, - LastOrdinal: s.LastOrdinal, Capacity: s.Capacity, Replicas: s.Replicas, StatefulSetName: s.StatefulSetName, diff --git a/pkg/scheduler/state/state_test.go b/pkg/scheduler/state/state_test.go index b1f518d95df..b2dfcb651e5 100644 --- a/pkg/scheduler/state/state_test.go +++ b/pkg/scheduler/state/state_test.go @@ -58,14 +58,14 @@ func TestStateBuilder(t *testing.T) { name: "no vpods", replicas: int32(0), vpods: [][]duckv1alpha1.Placement{}, - expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}}, + expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}}, freec: int32(0), }, { name: "one vpods", replicas: int32(1), vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}}, - expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { "statefulset-name-0": 1, @@ -88,7 +88,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 2}}, {{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}}, }, - expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(0), int32(1), int32(2)}, LastOrdinal: 2, Replicas: 3, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(0), int32(1), int32(2)}, Replicas: 3, StatefulSetName: sfsName, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { "statefulset-name-0": 1, @@ -124,7 +124,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 2}}, {{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}}, }, - expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(1), int32(2)}, LastOrdinal: 2, Replicas: 3, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(1), int32(2)}, Replicas: 3, StatefulSetName: sfsName, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { "statefulset-name-2": 5, @@ -157,7 +157,7 @@ func TestStateBuilder(t *testing.T) { {{PodName: "statefulset-name-1", VReplicas: 0}}, {{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}}, }, - expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, Replicas: 4, StatefulSetName: sfsName, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { "statefulset-name-0": 1, @@ -188,7 +188,7 @@ func TestStateBuilder(t *testing.T) { name: "three vpods but one tainted and one with no zone label", replicas: int32(1), vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}}, - expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { "statefulset-name-0": 1, @@ -207,7 +207,7 @@ func TestStateBuilder(t *testing.T) { name: "one vpod (HA)", replicas: int32(1), vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}}, - expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName, + expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName, PodSpread: map[types.NamespacedName]map[string]int32{ {Name: vpodName + "-0", Namespace: vpodNs + "-0"}: { "statefulset-name-0": 1, diff --git a/pkg/scheduler/statefulset/autoscaler.go b/pkg/scheduler/statefulset/autoscaler.go index 8b61ca4a83c..653ec12f156 100644 --- a/pkg/scheduler/statefulset/autoscaler.go +++ b/pkg/scheduler/statefulset/autoscaler.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/integer" @@ -250,17 +251,8 @@ func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State) error { zap.Any("state", s), ) - // when there is only one pod there is nothing to move or number of pods is just enough! - if s.LastOrdinal < 1 || len(s.SchedulablePods) <= 1 { - return nil - } - - // Determine if there is enough free capacity to - // move all vreplicas placed in the last pod to pods with a lower ordinal - freeCapacity := s.FreeCapacity() - s.Free(s.LastOrdinal) - usedInLastPod := s.Capacity - s.Free(s.LastOrdinal) - - if freeCapacity >= usedInLastPod { + // Determine if there are vpods that need compaction + if s.Replicas != int32(len(s.FreeCap)) { a.lastCompactAttempt = time.Now() err := a.compact(s) if err != nil { @@ -285,9 +277,9 @@ func (a *autoscaler) compact(s *st.State) error { for i := len(placements) - 1; i >= 0; i-- { //start from the last placement ordinal := st.OrdinalFromPodName(placements[i].PodName) - if ordinal == s.LastOrdinal { + if ordinal >= s.Replicas { pod, err = s.PodLister.Get(placements[i].PodName) - if err != nil { + if err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err) } diff --git a/pkg/scheduler/statefulset/autoscaler_test.go b/pkg/scheduler/statefulset/autoscaler_test.go index 441c6cab7a8..ea8d3fb77de 100644 --- a/pkg/scheduler/statefulset/autoscaler_test.go +++ b/pkg/scheduler/statefulset/autoscaler_test.go @@ -470,9 +470,7 @@ func TestCompactor(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: int32(8)}, {PodName: "statefulset-name-1", VReplicas: int32(2)}}), }, - wantEvictions: map[types.NamespacedName][]duckv1alpha1.Placement{ - {Name: "vpod-1", Namespace: testNs}: {{PodName: "statefulset-name-1", VReplicas: int32(2)}}, - }, + wantEvictions: nil, }, { name: "multiple vpods, with placements in multiple pods, compacted", @@ -500,8 +498,49 @@ func TestCompactor(t *testing.T) { {PodName: "statefulset-name-0", VReplicas: int32(2)}, {PodName: "statefulset-name-2", VReplicas: int32(7)}}), }, + wantEvictions: nil, + }, + { + name: "multiple vpods, scale down, with placements in multiple pods, compacted", + replicas: int32(2), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 10, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(3)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}}), + tscheduler.NewVPod(testNs, "vpod-2", 10, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(7)}, + {PodName: "statefulset-name-2", VReplicas: int32(3)}}), + }, wantEvictions: map[types.NamespacedName][]duckv1alpha1.Placement{ - {Name: "vpod-2", Namespace: testNs}: {{PodName: "statefulset-name-2", VReplicas: int32(7)}}, + {Name: "vpod-2", Namespace: testNs}: {{PodName: "statefulset-name-2", VReplicas: int32(3)}}, + }, + }, + { + name: "multiple vpods, scale down multiple, with placements in multiple pods, compacted", + replicas: int32(1), + vpods: []scheduler.VPod{ + tscheduler.NewVPod(testNs, "vpod-1", 6, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(3)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}, + {PodName: "statefulset-name-2", VReplicas: int32(6)}, + }), + tscheduler.NewVPod(testNs, "vpod-2", 3, []duckv1alpha1.Placement{ + {PodName: "statefulset-name-0", VReplicas: int32(7)}, + {PodName: "statefulset-name-2", VReplicas: int32(3)}, + {PodName: "statefulset-name-3", VReplicas: int32(2)}, + {PodName: "statefulset-name-10", VReplicas: int32(1)}, + }), + }, + wantEvictions: map[types.NamespacedName][]duckv1alpha1.Placement{ + {Name: "vpod-1", Namespace: testNs}: { + {PodName: "statefulset-name-2", VReplicas: int32(6)}, + {PodName: "statefulset-name-1", VReplicas: int32(7)}, + }, + {Name: "vpod-2", Namespace: testNs}: { + {PodName: "statefulset-name-10", VReplicas: int32(1)}, + {PodName: "statefulset-name-3", VReplicas: int32(2)}, + {PodName: "statefulset-name-2", VReplicas: int32(3)}, + }, }, }, }