Skip to content

Commit

Permalink
BookCapacity for ProvisioningRequest pods
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Jun 21, 2024
1 parent 83db225 commit 69b8c9f
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 59 deletions.
1 change: 0 additions & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr

// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)

preScaleUp := func() time.Time {
scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)

opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor := provreq.NewProvReqProcessor(client)
provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker)
if err != nil {
return nil, err
}
Expand All @@ -518,6 +518,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
return nil, err
}
podListProcessor.AddProcessor(injector)
podListProcessor.AddProcessor(provreqProcesor)
}
opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,24 @@ limitations under the License.
package provreq

import (
"fmt"
"time"

apiv1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)

const (
Expand All @@ -36,15 +44,20 @@ const (
defaultMaxUpdated = 20
)

type injector interface {
TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error)
}

type provReqProcessor struct {
now func() time.Time
maxUpdated int
client *provreqclient.ProvisioningRequestClient
injector injector
}

// NewProvReqProcessor return ProvisioningRequestProcessor.
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient) *provReqProcessor {
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client}
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient, predicateChecker predicatechecker.PredicateChecker) *provReqProcessor {
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator(predicateChecker)}
}

// Refresh implements loop.Observer interface and will be run at the start
Expand All @@ -56,15 +69,14 @@ func (p *provReqProcessor) Refresh() {
klog.Errorf("Failed to get ProvisioningRequests list, err: %v", err)
return
}

p.Process(provReqs)
p.refresh(provReqs)
}

// Process iterates over ProvisioningRequests and apply:
// refresh iterates over ProvisioningRequests and apply:
// -BookingExpired condition for Provisioned ProvisioningRequest if capacity reservation time is expired.
// -Failed condition for ProvisioningRequest that were not provisioned during defaultExpirationTime.
// TODO(yaroslava): fetch reservation and expiration time from ProvisioningRequest
func (p *provReqProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) {
func (p *provReqProcessor) refresh(provReqs []*provreqwrapper.ProvisioningRequest) {
expiredProvReq := []*provreqwrapper.ProvisioningRequest{}
failedProvReq := []*provreqwrapper.ProvisioningRequest{}
for _, provReq := range provReqs {
Expand Down Expand Up @@ -108,5 +120,50 @@ func (p *provReqProcessor) Process(provReqs []*provreqwrapper.ProvisioningReques
}
}

// Cleanup cleans up internal state.
// CleanUp cleans up internal state
func (p *provReqProcessor) CleanUp() {}

// Process implements PodListProcessor.Process() and inject fake pods to the cluster snapshoot for Provisioned ProvReqs in order to
// reserve capacity from ScaleDown.
func (p *provReqProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
err := p.bookCapacity(context)
if err != nil {
klog.Warning("Failed to book capacity for ProvisioningRequests: %s", err)
}
return unschedulablePods, nil
}

// bookCapacity schedule fake pods for ProvisioningRequest that should have reserved capacity
// in the cluster.
func (p *provReqProcessor) bookCapacity(ctx *context.AutoscalingContext) error {
provReqs, err := p.client.ProvisioningRequests()
if err != nil {
return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err)
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if !conditions.ShouldCapacityBeBooked(provReq) {
continue
}
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
if _, err := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil {
klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err)
}
continue
}
podsToCreate = append(podsToCreate, pods...)
}
if len(podsToCreate) == 0 {
return nil
}
// Scheduling the pods to reserve capacity for provisioning request.
if _, _, err = p.injector.TrySchedulePods(ctx.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
return err
}
return nil
}
88 changes: 85 additions & 3 deletions cluster-autoscaler/processors/provreq/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,26 @@ limitations under the License.
package provreq

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"

"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/config"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
)

func TestProcess(t *testing.T) {
func TestRefresh(t *testing.T) {
now := time.Now()
dayAgo := now.Add(-1 * 24 * time.Hour)
weekAgo := now.Add(-1 * defaultExpirationTime).Add(-1 * 5 * time.Minute)
Expand Down Expand Up @@ -146,8 +153,8 @@ func TestProcess(t *testing.T) {
additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional")
additionalPr.CreationTimestamp = metav1.NewTime(weekAgo)
additionalPr.Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity
processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr)}
processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})
processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), nil}
processor.refresh([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})
assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions)
if len(test.conditions) == len(test.wantConditions) {
assert.ElementsMatch(t, []metav1.Condition{
Expand All @@ -164,3 +171,78 @@ func TestProcess(t *testing.T) {
}
}
}

type fakeInjector struct {
pods []*apiv1.Pod
}

func (f *fakeInjector) TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error) {
f.pods = pods
return nil, 0, nil
}

func TestBookCapacity(t *testing.T) {
testCases := []struct {
name string
conditions []string
provReq *provreqwrapper.ProvisioningRequest
capacityIsBooked bool
}{
{
name: "ProvReq is new, check-capacity class",
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassCheckCapacity),
capacityIsBooked: false,
},
{
name: "ProvReq is Failed, best-effort-atomic class",
conditions: []string{v1beta1.Failed},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp),
capacityIsBooked: false,
},
{
name: "ProvReq is Provisioned, unknown class",
conditions: []string{v1beta1.Provisioned},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), "unknown"),
capacityIsBooked: false,
},
{
name: "ProvReq is Provisioned, capacity should be booked, check-capacity class",
conditions: []string{v1beta1.Provisioned},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassCheckCapacity),
capacityIsBooked: true,
},
{
name: "ProvReq is Provisioned, capacity should be booked, best-effort-atomic class",
conditions: []string{v1beta1.Provisioned},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp),
capacityIsBooked: true,
},
{
name: "ProvReq has BookingExpired, capacity should not be booked, best-effort-atomic class",
conditions: []string{v1beta1.Provisioned, v1beta1.BookingExpired},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp),
capacityIsBooked: false,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
test := test
injector := &fakeInjector{pods: []*apiv1.Pod{}}
for _, condition := range test.conditions {
conditions.AddOrUpdateCondition(test.provReq, condition, metav1.ConditionTrue, "", "", metav1.Now())
}

processor := &provReqProcessor{
now: func() time.Time { return time.Now() },
client: provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, test.provReq),
maxUpdated: 20,
injector: injector,
}
ctx, _ := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, nil, nil, nil, nil, nil)
processor.bookCapacity(&ctx)
if (test.capacityIsBooked && len(injector.pods) == 0) || (!test.capacityIsBooked && len(injector.pods) > 0) {
t.Fail()
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,14 @@ import (

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/klog/v2"

ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
Expand Down Expand Up @@ -96,7 +91,6 @@ func (o *provReqOrchestrator) ScaleUp(

o.context.ClusterSnapshot.Fork()
defer o.context.ClusterSnapshot.Revert()
o.bookCapacity()

// unschedulablePods pods should belong to one ProvisioningClass, so only one provClass should try to ScaleUp.
for _, provClass := range o.provisioningClasses {
Expand All @@ -115,35 +109,3 @@ func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize(
) (*status.ScaleUpStatus, ca_errors.AutoscalerError) {
return nil, nil
}

func (o *provReqOrchestrator) bookCapacity() error {
provReqs, err := o.client.ProvisioningRequests()
if err != nil {
return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err)
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if conditions.ShouldCapacityBeBooked(provReq) {
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
if _, err := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil {
klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err)
}
continue
}
podsToCreate = append(podsToCreate, pods...)
}
}
if len(podsToCreate) == 0 {
return nil
}
// scheduling the pods to reserve capacity for provisioning request with BookCapacity condition
if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
klog.Warningf("Error during capacity booking: %v", err)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ func TestScaleUp(t *testing.T) {
scaleUpResult: status.ScaleUpNotNeeded,
},
{
name: "capacity in the cluster is booked",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, bookedCapacityProvReq},
name: "capacity is there, check-capacity class",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq},
provReqToScaleUp: newCheckCapacityMemProvReq,
scaleUpResult: status.ScaleUpNoOptionsAvailable,
scaleUpResult: status.ScaleUpSuccessful,
},
{
name: "unsupported ProvisioningRequest is ignored",
Expand All @@ -211,12 +211,6 @@ func TestScaleUp(t *testing.T) {
provReqToScaleUp: atomicScaleUpProvReq,
scaleUpResult: status.ScaleUpNotNeeded,
},
{
name: "some capacity is pre-booked, large atomic scale-up request doesn't fit",
provReqs: []*provreqwrapper.ProvisioningRequest{bookedCapacityProvReq, largeAtomicScaleUpProvReq},
provReqToScaleUp: largeAtomicScaleUpProvReq,
scaleUpResult: status.ScaleUpNoOptionsAvailable,
},
{
name: "capacity is there, large atomic scale-up request doesn't require scale-up",
provReqs: []*provreqwrapper.ProvisioningRequest{largeAtomicScaleUpProvReq},
Expand Down

0 comments on commit 69b8c9f

Please sign in to comment.