From ed718647f330f22a234f73f22165a5bfb2b8e054 Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Fri, 8 Sep 2023 11:08:31 +0200 Subject: [PATCH] Scheduler handle overcommitted pods Signed-off-by: Pierangelo Di Pilato --- .../patches/handle_overcommitted_pods.patch | 62 +++++++++++++++++++ .../eventing/pkg/scheduler/state/state.go | 2 +- .../pkg/scheduler/statefulset/scheduler.go | 34 +++++++++- 3 files changed, 94 insertions(+), 4 deletions(-) create mode 100644 openshift/patches/handle_overcommitted_pods.patch diff --git a/openshift/patches/handle_overcommitted_pods.patch b/openshift/patches/handle_overcommitted_pods.patch new file mode 100644 index 0000000000..51dd29bb50 --- /dev/null +++ b/openshift/patches/handle_overcommitted_pods.patch @@ -0,0 +1,62 @@ +diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go +index 2d5460cf8..65018e7c5 100644 +--- a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go ++++ b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go +@@ -361,7 +361,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri + // Assert the pod is not overcommitted + if free[ordinal] < 0 { + // This should not happen anymore. Log as an error but do not interrupt the current scheduling. +- s.logger.Errorw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) ++ s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) + } + + if ordinal > last { +diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +index ed1defaa6..d9bcef1f8 100644 +--- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go ++++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +@@ -272,13 +272,41 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 + existingPlacements := vpod.GetPlacements() + var left int32 + +- // Remove unschedulable pods from placements ++ // Remove unschedulable or adjust overcommitted pods from placements + var placements []duckv1alpha1.Placement + if len(existingPlacements) > 0 { + placements = make([]duckv1alpha1.Placement, 0, len(existingPlacements)) + for _, p := range existingPlacements { +- if state.IsSchedulablePod(st.OrdinalFromPodName(p.PodName)) { +- placements = append(placements, *p.DeepCopy()) ++ p := p.DeepCopy() ++ ordinal := st.OrdinalFromPodName(p.PodName) ++ ++ if !state.IsSchedulablePod(ordinal) { ++ continue ++ } ++ ++ // Handle overcommitted pods. ++ if state.FreeCap[ordinal] < 0 { ++ // vr > free => vr: 9, overcommit 4 -> free: 0, vr: 5, pending: +4 ++ // vr = free => vr: 4, overcommit 4 -> free: 0, vr: 0, pending: +4 ++ // vr < free => vr: 3, overcommit 4 -> free: -1, vr: 0, pending: +3 ++ ++ overcommit := -state.FreeCap[ordinal] ++ ++ if p.VReplicas >= overcommit { ++ state.SetFree(ordinal, 0) ++ state.Pending[vpod.GetKey()] += overcommit ++ ++ p.VReplicas = p.VReplicas - overcommit ++ } else { ++ state.SetFree(ordinal, p.VReplicas-overcommit) ++ state.Pending[vpod.GetKey()] += p.VReplicas ++ ++ p.VReplicas = 0 ++ } ++ } ++ ++ if p.VReplicas > 0 { ++ placements = append(placements, *p) + } + } + } diff --git a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go index 2d5460cf80..65018e7c5d 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/state/state.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/state/state.go @@ -361,7 +361,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri // Assert the pod is not overcommitted if free[ordinal] < 0 { // This should not happen anymore. Log as an error but do not interrupt the current scheduling. - s.logger.Errorw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) + s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal])) } if ordinal > last { diff --git a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go index ed1defaa6c..d9bcef1f80 100644 --- a/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go +++ b/vendor/knative.dev/eventing/pkg/scheduler/statefulset/scheduler.go @@ -272,13 +272,41 @@ func (s *StatefulSetScheduler) scheduleVPod(vpod scheduler.VPod) ([]duckv1alpha1 existingPlacements := vpod.GetPlacements() var left int32 - // Remove unschedulable pods from placements + // Remove unschedulable or adjust overcommitted pods from placements var placements []duckv1alpha1.Placement if len(existingPlacements) > 0 { placements = make([]duckv1alpha1.Placement, 0, len(existingPlacements)) for _, p := range existingPlacements { - if state.IsSchedulablePod(st.OrdinalFromPodName(p.PodName)) { - placements = append(placements, *p.DeepCopy()) + p := p.DeepCopy() + ordinal := st.OrdinalFromPodName(p.PodName) + + if !state.IsSchedulablePod(ordinal) { + continue + } + + // Handle overcommitted pods. + if state.FreeCap[ordinal] < 0 { + // vr > free => vr: 9, overcommit 4 -> free: 0, vr: 5, pending: +4 + // vr = free => vr: 4, overcommit 4 -> free: 0, vr: 0, pending: +4 + // vr < free => vr: 3, overcommit 4 -> free: -1, vr: 0, pending: +3 + + overcommit := -state.FreeCap[ordinal] + + if p.VReplicas >= overcommit { + state.SetFree(ordinal, 0) + state.Pending[vpod.GetKey()] += overcommit + + p.VReplicas = p.VReplicas - overcommit + } else { + state.SetFree(ordinal, p.VReplicas-overcommit) + state.Pending[vpod.GetKey()] += p.VReplicas + + p.VReplicas = 0 + } + } + + if p.VReplicas > 0 { + placements = append(placements, *p) } } }