Skip to content

Commit

Permalink
Merge pull request #1 from TommyLike/feature/support-enqueue
Browse files Browse the repository at this point in the history
Add enqueue action support
  • Loading branch information
k82cn authored Apr 29, 2019
2 parents 73981cf + 246518a commit a340a79
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 2 deletions.
9 changes: 9 additions & 0 deletions pkg/apis/scheduling/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const (
// PodGroupUnknown means part of `spec.minMember` pods are running but the other part can not
// be scheduled, e.g. not enough resource; scheduler will wait for related controller to recover it.
PodGroupUnknown PodGroupPhase = "Unknown"

// PodGroupInqueue means controllers can start to create pods,
// is a new state between PodGroupPending and PodGroupRunning
PodGroupInqueue PodGroupPhase = "Inqueue"
)

type PodGroupConditionType string
Expand Down Expand Up @@ -123,6 +127,11 @@ type PodGroupSpec struct {
// default.
// +optional
PriorityClassName string `json:"priorityClassName,omitempty" protobuf:"bytes,3,opt,name=priorityClassName"`

// MinResources defines the minimal resource of members/tasks to run the pod group;
// if there's not enough resources to start all tasks, the scheduler
// will not start anyone.
MinResources *v1.ResourceList `json:"minResources,omitempty" protobuf:"bytes,4,opt,name=minResources"`
}

// PodGroupStatus represents the current state of a pod group.
Expand Down
15 changes: 14 additions & 1 deletion pkg/apis/scheduling/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util"
Expand Down Expand Up @@ -48,6 +49,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
jobsMap := map[api.QueueID]*util.PriorityQueue{}

for _, job := range ssn.Jobs {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}

if queue, found := ssn.Queues[job.Queue]; found {
queues.Push(queue)
} else {
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package backfill
import (
"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
)
Expand All @@ -43,6 +44,10 @@ func (alloc *backfillAction) Execute(ssn *framework.Session) {

// TODO (k82cn): When backfill, it's also need to balance between Queues.
for _, job := range ssn.Jobs {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}

for _, task := range job.TaskStatusIndex[api.Pending] {
if task.InitResreq.IsEmpty() {
// As task did not request resources, so it only need to meet predicates.
Expand Down
128 changes: 128 additions & 0 deletions pkg/scheduler/actions/enqueue/enqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package enqueue

import (
"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util"
)

type enqueueAction struct {
ssn *framework.Session
}

func New() *enqueueAction {
return &enqueueAction{}
}

func (enqueue *enqueueAction) Name() string {
return "enqueue"
}

func (enqueue *enqueueAction) Initialize() {}

func (enqueue *enqueueAction) Execute(ssn *framework.Session) {
glog.V(3).Infof("Enter Enqueue ...")
defer glog.V(3).Infof("Leaving Enqueue ...")

queues := util.NewPriorityQueue(ssn.QueueOrderFn)
queueMap := map[api.QueueID]*api.QueueInfo{}

jobsMap := map[api.QueueID]*util.PriorityQueue{}

for _, job := range ssn.Jobs {
if queue, found := ssn.Queues[job.Queue]; !found {
glog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
continue
} else {
if _, existed := queueMap[queue.UID]; !existed {
glog.V(3).Infof("Added Queue <%s> for Job <%s/%s>",
queue.Name, job.Namespace, job.Name)

queueMap[queue.UID] = queue
queues.Push(queue)
}
}

if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
glog.V(3).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobsMap[job.Queue].Push(job)
}
}

glog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap))

emptyRes := api.EmptyResource()
nodesIdleRes := api.EmptyResource()
for _, node := range ssn.Nodes {
nodesIdleRes.Add(node.Allocatable.Clone().Multi(1.2).Sub(node.Used))
}

for {
if queues.Empty() {
break
}

if nodesIdleRes.Less(emptyRes) {
glog.V(3).Infof("Node idle resource <%s> is overused, ignore it.")
break
}

queue := queues.Pop().(*api.QueueInfo)

// Found "high" priority job
jobs, found := jobsMap[queue.UID]
if !found || jobs.Empty() {
continue
}
job := jobs.Pop().(*api.JobInfo)

inqueue := false
if len(job.TaskStatusIndex[api.Pending]) != 0 {
inqueue = true
} else {
if job.PodGroup.Spec.MinResources == nil {
inqueue = true
} else {
pgResource := api.NewResource(*job.PodGroup.Spec.MinResources)

if pgResource.LessEqual(nodesIdleRes) {
nodesIdleRes.Sub(pgResource)
inqueue = true
}
}
}

if inqueue {
job.PodGroup.Status.Phase = v1alpha1.PodGroupInqueue
ssn.Jobs[job.UID] = job
}

// Added Queue back until no job in Queue.
queues.Push(queue)
}
}

func (enqueue *enqueueAction) UnInitialize() {}
2 changes: 2 additions & 0 deletions pkg/scheduler/actions/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/allocate"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/backfill"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/enqueue"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/preempt"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/actions/reclaim"
)
Expand All @@ -30,4 +31,5 @@ func init() {
framework.RegisterAction(allocate.New())
framework.RegisterAction(backfill.New())
framework.RegisterAction(preempt.New())
framework.RegisterAction(enqueue.New())
}
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
Expand Down Expand Up @@ -52,6 +53,10 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
queues := map[api.QueueID]*api.QueueInfo{}

for _, job := range ssn.Jobs {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}

if queue, found := ssn.Queues[job.Queue]; !found {
continue
} else if _, existed := queues[queue.UID]; !existed {
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package reclaim
import (
"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/util"
Expand Down Expand Up @@ -53,6 +54,10 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {

var underRequest []*api.JobInfo
for _, job := range ssn.Jobs {
if job.PodGroup.Status.Phase == v1alpha1.PodGroupPending {
continue
}

if queue, found := ssn.Queues[job.Queue]; !found {
glog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
Expand Down
13 changes: 13 additions & 0 deletions pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"math"

"k8s.io/apimachinery/pkg/api/resource"

v1 "k8s.io/api/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
)
Expand Down Expand Up @@ -324,3 +326,14 @@ func (r *Resource) SetScalar(name v1.ResourceName, quantity float64) {
}
r.ScalarResources[name] = quantity
}

func (r *Resource) Convert2K8sResource() *v1.ResourceList {
list := v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(int64(r.MilliCPU), resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(int64(r.Memory), resource.BinarySI),
}
for name, value := range r.ScalarResources {
list[name] = *resource.NewQuantity(int64(value), resource.BinarySI)
}
return &list
}
2 changes: 1 addition & 1 deletion pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func jobStatus(ssn *Session, jobInfo *api.JobInfo) v1alpha1.PodGroupStatus {
// If there're enough allocated resource, it's running
if int32(allocated) > jobInfo.PodGroup.Spec.MinMember {
status.Phase = v1alpha1.PodGroupRunning
} else {
} else if jobInfo.PodGroup.Status.Phase != v1alpha1.PodGroupInqueue {
status.Phase = v1alpha1.PodGroupPending
}
}
Expand Down

0 comments on commit a340a79

Please sign in to comment.