Skip to content

Commit

Permalink
Scheduler: LastOrdinal based on replicas instead of FreeCap
Browse files Browse the repository at this point in the history
When scaling down and compacting, basing the last ordinal on the
free capacity structure leads to have a lastOrdinal off by one since
`FreeCap` might contain the free capacity for unschedulable pods.

We will have to continue including unschduelable pods in FreeCap
because it might happen that a pod becomes unscheduleble for external
reasons like node gets shutdown for pods with lower ordinals
and the pod need to be rescheduled and during that time period
we want to consider those when compacting; once all vpods that
were on that pod that is gone get rescheduled, FreeCap will only
include scheduleable pods.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Dec 16, 2024
1 parent 4087c3a commit 3f807e9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 42 deletions.
49 changes: 18 additions & 31 deletions pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ type State struct {
// schedulable pods tracks the pods that aren't being evicted.
SchedulablePods []int32

// LastOrdinal is the ordinal index corresponding to the last statefulset replica
// with placed vpods.
LastOrdinal int32

// Pod capacity.
Capacity int32

Expand Down Expand Up @@ -143,14 +139,10 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
return nil, err
}

free := make([]int32, 0)
freeCap := make([]int32, 0)
pending := make(map[types.NamespacedName]int32, 4)
expectedVReplicasByVPod := make(map[types.NamespacedName]int32, len(vpods))
schedulablePods := sets.NewInt32()
last := int32(-1)

// keep track of (vpod key, podname) pairs with existing placements
withPlacement := make(map[types.NamespacedName]map[string]bool)

podSpread := make(map[types.NamespacedName]map[string]int32)

Expand All @@ -172,7 +164,7 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
}

for _, p := range schedulablePods.List() {
free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
freeCap = s.updateFreeCapacity(logger, freeCap, PodNameFromOrdinal(s.statefulSetName, p), 0)
}

// Getting current state from existing placements for all vpods
Expand All @@ -182,16 +174,13 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
pending[vpod.GetKey()] = pendingFromVPod(vpod)
expectedVReplicasByVPod[vpod.GetKey()] = vpod.GetVReplicas()

withPlacement[vpod.GetKey()] = make(map[string]bool)
podSpread[vpod.GetKey()] = make(map[string]int32)

for i := 0; i < len(ps); i++ {
podName := ps[i].PodName
vreplicas := ps[i].VReplicas

free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas)

withPlacement[vpod.GetKey()][podName] = true
freeCap = s.updateFreeCapacity(logger, freeCap, podName, vreplicas)

pod, err := s.podLister.Get(podName)
if err != nil {
Expand All @@ -204,8 +193,17 @@ func (s *stateBuilder) State(ctx context.Context) (*State, error) {
}
}

state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, StatefulSetName: s.statefulSetName, PodLister: s.podLister,
PodSpread: podSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod}
state := &State{
FreeCap: freeCap,
SchedulablePods: schedulablePods.List(),
Capacity: s.capacity,
Replicas: scale.Spec.Replicas,
StatefulSetName: s.statefulSetName,
PodLister: s.podLister,
PodSpread: podSpread,
Pending: pending,
ExpectedVReplicaByVPod: expectedVReplicasByVPod,
}

logger.Infow("cluster state info", zap.Any("state", state))

Expand All @@ -219,23 +217,19 @@ func pendingFromVPod(vpod scheduler.VPod) int32 {
return int32(math.Max(float64(0), float64(expected-scheduled)))
}

func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, podName string, vreplicas int32) []int32 {
ordinal := OrdinalFromPodName(podName)
free = grow(free, ordinal, s.capacity)

free[ordinal] -= vreplicas

// Assert the pod is not overcommitted
if free[ordinal] < 0 {
if overcommit := free[ordinal]; overcommit < 0 {
// This should not happen anymore. Log as an error but do not interrupt the current scheduling.
logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
}

if ordinal > last {
last = ordinal
logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", overcommit))
}

return free, last
return free
}

func (s *State) TotalPending() int32 {
Expand Down Expand Up @@ -283,23 +277,16 @@ func (s *State) MarshalJSON() ([]byte, error) {
type S struct {
FreeCap []int32 `json:"freeCap"`
SchedulablePods []int32 `json:"schedulablePods"`
LastOrdinal int32 `json:"lastOrdinal"`
Capacity int32 `json:"capacity"`
Replicas int32 `json:"replicas"`
NumZones int32 `json:"numZones"`
NumNodes int32 `json:"numNodes"`
NodeToZoneMap map[string]string `json:"nodeToZoneMap"`
StatefulSetName string `json:"statefulSetName"`
PodSpread map[string]map[string]int32 `json:"podSpread"`
NodeSpread map[string]map[string]int32 `json:"nodeSpread"`
ZoneSpread map[string]map[string]int32 `json:"zoneSpread"`
Pending map[string]int32 `json:"pending"`
}

sj := S{
FreeCap: s.FreeCap,
SchedulablePods: s.SchedulablePods,
LastOrdinal: s.LastOrdinal,
Capacity: s.Capacity,
Replicas: s.Replicas,
StatefulSetName: s.StatefulSetName,
Expand Down
14 changes: 7 additions & 7 deletions pkg/scheduler/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ func TestStateBuilder(t *testing.T) {
name: "no vpods",
replicas: int32(0),
vpods: [][]duckv1alpha1.Placement{},
expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, LastOrdinal: -1, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}},
expected: State{Capacity: 10, FreeCap: []int32{}, SchedulablePods: []int32{}, StatefulSetName: sfsName, Pending: map[types.NamespacedName]int32{}, ExpectedVReplicaByVPod: map[types.NamespacedName]int32{}},
freec: int32(0),
},
{
name: "one vpods",
replicas: int32(1),
vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}},
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand All @@ -88,7 +88,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 2}},
{{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(0), int32(1), int32(2)}, LastOrdinal: 2, Replicas: 3, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(0), int32(1), int32(2)}, Replicas: 3, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 2}},
{{PodName: "statefulset-name-1", VReplicas: 3}, {PodName: "statefulset-name-0", VReplicas: 1}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(1), int32(2)}, LastOrdinal: 2, Replicas: 3, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(8), int32(5), int32(5)}, SchedulablePods: []int32{int32(1), int32(2)}, Replicas: 3, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-2": 5,
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestStateBuilder(t *testing.T) {
{{PodName: "statefulset-name-1", VReplicas: 0}},
{{PodName: "statefulset-name-1", VReplicas: 0}, {PodName: "statefulset-name-3", VReplicas: 0}},
},
expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, LastOrdinal: 3, Replicas: 4, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9), int32(10), int32(5), int32(10)}, SchedulablePods: []int32{int32(0), int32(1), int32(2), int32(3)}, Replicas: 4, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestStateBuilder(t *testing.T) {
name: "three vpods but one tainted and one with no zone label",
replicas: int32(1),
vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}},
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand All @@ -207,7 +207,7 @@ func TestStateBuilder(t *testing.T) {
name: "one vpod (HA)",
replicas: int32(1),
vpods: [][]duckv1alpha1.Placement{{{PodName: "statefulset-name-0", VReplicas: 1}}},
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, LastOrdinal: 0, Replicas: 1, StatefulSetName: sfsName,
expected: State{Capacity: 10, FreeCap: []int32{int32(9)}, SchedulablePods: []int32{int32(0)}, Replicas: 1, StatefulSetName: sfsName,
PodSpread: map[types.NamespacedName]map[string]int32{
{Name: vpodName + "-0", Namespace: vpodNs + "-0"}: {
"statefulset-name-0": 1,
Expand Down
12 changes: 8 additions & 4 deletions pkg/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,17 @@ func (a *autoscaler) mayCompact(logger *zap.SugaredLogger, s *st.State) error {
zap.Any("state", s),
)

lastOrdinal := s.Replicas - 1

// when there is only one pod there is nothing to move or number of pods is just enough!
if s.LastOrdinal < 1 || len(s.SchedulablePods) <= 1 {
if lastOrdinal < 1 || len(s.SchedulablePods) <= 1 {
return nil
}

// Determine if there is enough free capacity to
// move all vreplicas placed in the last pod to pods with a lower ordinal
freeCapacity := s.FreeCapacity() - s.Free(s.LastOrdinal)
usedInLastPod := s.Capacity - s.Free(s.LastOrdinal)
freeCapacity := s.FreeCapacity() - s.Free(lastOrdinal)
usedInLastPod := s.Capacity - s.Free(lastOrdinal)

if freeCapacity >= usedInLastPod {
a.lastCompactAttempt = time.Now()
Expand All @@ -280,12 +282,14 @@ func (a *autoscaler) compact(s *st.State) error {
return err
}

lastOrdinal := s.Replicas - 1

for _, vpod := range vpods {
placements := vpod.GetPlacements()
for i := len(placements) - 1; i >= 0; i-- { //start from the last placement
ordinal := st.OrdinalFromPodName(placements[i].PodName)

if ordinal == s.LastOrdinal {
if ordinal == lastOrdinal {
pod, err = s.PodLister.Get(placements[i].PodName)
if err != nil {
return fmt.Errorf("failed to get pod %s: %w", placements[i].PodName, err)
Expand Down

0 comments on commit 3f807e9

Please sign in to comment.