Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Added Statement for eviction in batch. #499

Merged
merged 1 commit into from
Dec 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
BIN_DIR=_output/bin
RELEASE_VER=v0.2
RELEASE_VER=v0.3
REPO_PATH=github.com/kubernetes-sigs/kube-batch
GitSHA=`git rev-parse HEAD`
Date=`date "+%Y-%m-%d %H:%M:%S"`
Expand Down
68 changes: 47 additions & 21 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,28 +84,46 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {

preemptorJob := preemptors.Pop().(*api.JobInfo)

// If not preemptor tasks, next job.
if preemptorTasks[preemptorJob.UID].Empty() {
glog.V(3).Infof("No preemptor task in job <%s/%s>.",
preemptorJob.Namespace, preemptorJob.Name)
continue
}
stmt := ssn.Statement()
assigned := false
for {
// If not preemptor tasks, next job.
if preemptorTasks[preemptorJob.UID].Empty() {
glog.V(3).Infof("No preemptor task in job <%s/%s>.",
preemptorJob.Namespace, preemptorJob.Name)
break
}

preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo)
preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo)

assigned, _ := preempt(ssn, preemptor, ssn.Nodes, func(task *api.TaskInfo) bool {
// Ignore non running task.
if task.Status != api.Running {
return false
if preempted, _ := preempt(ssn, stmt, preemptor, ssn.Nodes, func(task *api.TaskInfo) bool {
// Ignore non running task.
if task.Status != api.Running {
return false
}

job, found := ssn.JobIndex[task.Job]
if !found {
return false
}
// Preempt other jobs within queue
return job.Queue == preemptorJob.Queue && preemptor.Job != task.Job
}); preempted {
assigned = true
}

job, found := ssn.JobIndex[task.Job]
if !found {
return false
// If job not ready, keep preempting
if ssn.JobReady(preemptorJob) {
stmt.Commit()
break
}
// Preempt other jobs within queue
return job.Queue == preemptorJob.Queue && preemptor.Job != task.Job
})
}

// If job not ready after try all tasks, next job.
if !ssn.JobReady(preemptorJob) {
stmt.Discard()
continue
}

if assigned {
preemptors.Push(preemptorJob)
Expand All @@ -125,7 +143,8 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {

preemptor := preemptorTasks[job.UID].Pop().(*api.TaskInfo)

assigned, _ := preempt(ssn, preemptor, ssn.Nodes, func(task *api.TaskInfo) bool {
stmt := ssn.Statement()
assigned, _ := preempt(ssn, stmt, preemptor, ssn.Nodes, func(task *api.TaskInfo) bool {
// Ignore non running task.
if task.Status != api.Running {
return false
Expand All @@ -134,6 +153,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
// Preempt tasks within job.
return preemptor.Job == task.Job
})
stmt.Commit()

// If no preemption, next job.
if !assigned {
Expand All @@ -146,7 +166,13 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {

func (alloc *preemptAction) UnInitialize() {}

func preempt(ssn *framework.Session, preemptor *api.TaskInfo, nodes []*api.NodeInfo, filter func(*api.TaskInfo) bool) (bool, error) {
func preempt(
ssn *framework.Session,
stmt *framework.Statement,
preemptor *api.TaskInfo,
nodes []*api.NodeInfo,
filter func(*api.TaskInfo) bool,
) (bool, error) {
resreq := preemptor.Resreq.Clone()
preempted := api.EmptyResource()

Expand Down Expand Up @@ -179,7 +205,7 @@ func preempt(ssn *framework.Session, preemptor *api.TaskInfo, nodes []*api.NodeI
for _, preemptee := range victims {
glog.Errorf("Try to preempt Task <%s/%s> for Tasks <%s/%s>",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name)
if err := ssn.Evict(preemptee, "preempt"); err != nil {
if err := stmt.Evict(preemptee, "preempt"); err != nil {
glog.Errorf("Failed to preempt Task <%s/%s> for Tasks <%s/%s>: %v",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name, err)
continue
Expand All @@ -195,7 +221,7 @@ func preempt(ssn *framework.Session, preemptor *api.TaskInfo, nodes []*api.NodeI
glog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.",
preempted, preemptor.Namespace, preemptor.Name, preemptor.Resreq)

if err := ssn.Pipeline(preemptor, node.Name); err != nil {
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
glog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>",
preemptor.Namespace, preemptor.Name, node.Name)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
var job *api.JobInfo
var task *api.TaskInfo

// TODO (k82cn): we should check whether queue deserved more resources.
queue := queues.Pop().(*api.QueueInfo)
if ssn.Overused(queue) {
glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
Expand Down
9 changes: 7 additions & 2 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,16 @@ func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error {
}

if ni.Node != nil {
if task.Status == Releasing {
switch task.Status {
case Releasing:
ni.Releasing.Sub(task.Resreq)
ni.Idle.Add(task.Resreq)
case Pipelined:
ni.Releasing.Add(task.Resreq)
default:
ni.Idle.Add(task.Resreq)
}

ni.Idle.Add(task.Resreq)
ni.Used.Sub(task.Resreq)
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/golang/glog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
infov1 "k8s.io/client-go/informers/core/v1"
Expand Down Expand Up @@ -102,6 +102,8 @@ func (de *defaultEvictor) Evict(p *v1.Pod) error {
// TODO (k82cn): makes grace period configurable.
threeSecs := int64(3)

glog.V(3).Infof("Evicting pod %v/%v", p.Namespace, p.Name)

if err := de.kubeclient.CoreV1().Pods(p.Namespace).Delete(p.Name, &metav1.DeleteOptions{
GracePeriodSeconds: &threeSecs,
}); err != nil {
Expand Down Expand Up @@ -188,7 +190,7 @@ func newSchedulerCache(config *rest.Config, schedulerName string, nsAsQueue bool
if strings.Compare(pod.Spec.SchedulerName, schedulerName) == 0 && pod.Status.Phase == v1.PodPending {
return true
}
return pod.Status.Phase == v1.PodRunning
return pod.Status.Phase != v1.PodPending
default:
return false
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func closeSession(ssn *Session) {
glog.V(3).Infof("Close Session %v", ssn.UID)
}

func (ssn *Session) Statement() *Statement {
return &Statement{
ssn: ssn,
}
}

func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error {
// Only update status in session
job, found := ssn.JobIndex[task.Job]
Expand Down
6 changes: 2 additions & 4 deletions pkg/scheduler/framework/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ limitations under the License.

package framework

/*
import (
"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
)
*/

type Statement struct {
operations []operation
Expand All @@ -34,7 +32,6 @@ type operation struct {
args []interface{}
}

/*
func (s *Statement) Evict(reclaimee *api.TaskInfo, reason string) error {
// Update status in session
job, found := s.ssn.JobIndex[reclaimee.Job]
Expand Down Expand Up @@ -195,6 +192,7 @@ func (s *Statement) unpipeline(task *api.TaskInfo) error {
}

func (s *Statement) Discard() {
glog.V(3).Info("Discarding operations ...")
for i := len(s.operations) - 1; i >= 0; i-- {
op := s.operations[i]
switch op.name {
Expand All @@ -207,6 +205,7 @@ func (s *Statement) Discard() {
}

func (s *Statement) Commit() {
glog.V(3).Info("Committing operations ...")
for _, op := range s.operations {
switch op.name {
case "evict":
Expand All @@ -216,4 +215,3 @@ func (s *Statement) Commit() {
}
}
}
*/
5 changes: 4 additions & 1 deletion pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package gang

import (
"fmt"

"github.com/golang/glog"

arbcorev1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
Expand All @@ -39,7 +40,9 @@ func New(args *framework.PluginArgs) framework.Plugin {
func readyTaskNum(job *api.JobInfo) int32 {
occupid := 0
for status, tasks := range job.TaskStatusIndex {
if api.AllocatedStatus(status) || status == api.Succeeded {
if api.AllocatedStatus(status) ||
status == api.Succeeded ||
status == api.Pipelined {
occupid = occupid + len(tasks)
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func createJobEx(context *context, job *jobSpec) ([]*batchv1.Job, *arbv1.PodGrou
},
Spec: v1.PodSpec{
SchedulerName: "kube-batch",
RestartPolicy: v1.RestartPolicyNever,
RestartPolicy: v1.RestartPolicyOnFailure,
Containers: createContainers(task.img, task.req, task.hostport),
Affinity: task.affinity,
},
Expand Down