Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Commit

Permalink
Merge pull request #602 from k82cn/automated-cherry-pick-of-#596-upst…
Browse files Browse the repository at this point in the history
…ream-release-0.4

Automated cherry pick of #596: Set default PodGroup for Pods.
  • Loading branch information
k8s-ci-robot authored Feb 25, 2019
2 parents 68a0b9c + 2608273 commit 68b458c
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 56 deletions.
62 changes: 49 additions & 13 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"k8s.io/apimachinery/pkg/types"

"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options"
arbcorev1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/apis/utils"
)

Expand All @@ -40,6 +40,8 @@ type TaskInfo struct {
Name string
Namespace string

PodGroup *v1alpha1.PodGroup

Resreq *Resource

NodeName string
Expand All @@ -50,15 +52,34 @@ type TaskInfo struct {
Pod *v1.Pod
}

func getJobID(pod *v1.Pod) JobID {
func getOwners(pod *v1.Pod) (JobID, *v1alpha1.PodGroup) {
if len(pod.Annotations) != 0 {
if gn, found := pod.Annotations[arbcorev1.GroupNameAnnotationKey]; found && len(gn) != 0 {
if gn, found := pod.Annotations[v1alpha1.GroupNameAnnotationKey]; found && len(gn) != 0 {
// Make sure Pod and PodGroup belong to the same namespace.
jobID := fmt.Sprintf("%s/%s", pod.Namespace, gn)
return JobID(jobID)
return JobID(jobID), nil
}
}
return JobID(utils.GetController(pod))

jobID := JobID(utils.GetController(pod))
if len(jobID) == 0 {
jobID = JobID(pod.UID)
}

pg := &v1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: string(jobID),
Annotations: map[string]string{
ShadowPodGroupKey: string(jobID),
},
},
Spec: v1alpha1.PodGroupSpec{
MinMember: 1,
},
}

return jobID, pg
}

func NewTaskInfo(pod *v1.Pod) *TaskInfo {
Expand All @@ -69,9 +90,12 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
req.Add(NewResource(c.Resources.Requests))
}

jobID, pg := getOwners(pod)

ti := &TaskInfo{
UID: TaskID(pod.UID),
Job: getJobID(pod),
Job: jobID,
PodGroup: pg,
Name: pod.Name,
Namespace: pod.Namespace,
NodeName: pod.Spec.NodeName,
Expand All @@ -94,6 +118,7 @@ func (ti *TaskInfo) Clone() *TaskInfo {
Job: ti.Job,
Name: ti.Name,
Namespace: ti.Namespace,
PodGroup: ti.PodGroup,
NodeName: ti.NodeName,
Status: ti.Status,
Priority: ti.Priority,
Expand All @@ -104,8 +129,8 @@ func (ti *TaskInfo) Clone() *TaskInfo {
}

func (ti TaskInfo) String() string {
return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v, resreq %v",
ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.Priority, ti.Resreq)
return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, type %v, pri %v, resreq %v",
ti.UID, ti.Namespace, ti.Name, ti.Job, ti.Status, ti.PodGroup, ti.Priority, ti.Resreq)
}

// JobID is the type of JobInfo's ID.
Expand Down Expand Up @@ -138,14 +163,14 @@ type JobInfo struct {
TotalRequest *Resource

CreationTimestamp metav1.Time
PodGroup *arbcorev1.PodGroup
PodGroup *v1alpha1.PodGroup

// TODO(k82cn): keep backward compatbility, removed it when v1alpha1 finalized.
PDB *policyv1.PodDisruptionBudget
}

func NewJobInfo(uid JobID) *JobInfo {
return &JobInfo{
func NewJobInfo(uid JobID, tasks ...*TaskInfo) *JobInfo {
job := &JobInfo{
UID: uid,

MinAvailable: 0,
Expand All @@ -157,13 +182,19 @@ func NewJobInfo(uid JobID) *JobInfo {
TaskStatusIndex: map[TaskStatus]tasksMap{},
Tasks: tasksMap{},
}

for _, task := range tasks {
job.AddTaskInfo(task)
}

return job
}

func (ji *JobInfo) UnsetPodGroup() {
ji.PodGroup = nil
}

func (ji *JobInfo) SetPodGroup(pg *arbcorev1.PodGroup) {
func (ji *JobInfo) SetPodGroup(pg *v1alpha1.PodGroup) {
ji.Name = pg.Name
ji.Namespace = pg.Namespace
ji.MinAvailable = pg.Spec.MinMember
Expand Down Expand Up @@ -234,6 +265,10 @@ func (ji *JobInfo) AddTaskInfo(ti *TaskInfo) {
if AllocatedStatus(ti.Status) {
ji.Allocated.Add(ti.Resreq)
}

if ji.PodGroup == nil && ti.PodGroup != nil {
ji.SetPodGroup(ti.PodGroup)
}
}

func (ji *JobInfo) UpdateTaskStatus(task *TaskInfo, status TaskStatus) error {
Expand Down Expand Up @@ -321,7 +356,8 @@ func (ji JobInfo) String() string {
i++
}

return fmt.Sprintf("Job (%v): name %v, minAvailable %d", ji.UID, ji.Name, ji.MinAvailable) + res
return fmt.Sprintf("Job (%v): namespace %v (%v), name %v, minAvailable %d, podGroup %+v",
ji.UID, ji.Namespace, ji.Queue, ji.Name, ji.MinAvailable, ji.PodGroup) + res
}

// Error returns detailed information on why a job's task failed to fit on
Expand Down
45 changes: 32 additions & 13 deletions pkg/scheduler/api/job_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,20 @@ func jobInfoEqual(l, r *JobInfo) bool {
func TestAddTaskInfo(t *testing.T) {
// case1
case01_uid := JobID("uid")
case01_ns := "c1"
case01_owner := buildOwnerReference("uid")

case01_pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_pod1 := buildPod(case01_ns, "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_task1 := NewTaskInfo(case01_pod1)
case01_pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_pod2 := buildPod(case01_ns, "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_task2 := NewTaskInfo(case01_pod2)
case01_pod3 := buildPod("c1", "p3", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_pod3 := buildPod(case01_ns, "p3", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_task3 := NewTaskInfo(case01_pod3)
case01_pod4 := buildPod("c1", "p4", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_pod4 := buildPod(case01_ns, "p4", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_task4 := NewTaskInfo(case01_pod4)

_, pg := getOwners(case01_pod1)

tests := []struct {
name string
uid JobID
Expand All @@ -58,7 +61,11 @@ func TestAddTaskInfo(t *testing.T) {
pods: []*v1.Pod{case01_pod1, case01_pod2, case01_pod3, case01_pod4},
expected: &JobInfo{
UID: case01_uid,
MinAvailable: 0,
Namespace: case01_ns,
Queue: QueueID(case01_ns),
Name: string(case01_uid),
MinAvailable: 1,
PodGroup: pg,
Allocated: buildResource("4000m", "4G"),
TotalRequest: buildResource("5000m", "5G"),
Tasks: tasksMap{
Expand Down Expand Up @@ -103,21 +110,25 @@ func TestAddTaskInfo(t *testing.T) {
func TestDeleteTaskInfo(t *testing.T) {
// case1
case01_uid := JobID("owner1")
case01_ns := "c1"
case01_owner := buildOwnerReference(string(case01_uid))
case01_pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_pod1 := buildPod(case01_ns, "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_task1 := NewTaskInfo(case01_pod1)
case01_pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_pod3 := buildPod("c1", "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_pod2 := buildPod(case01_ns, "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_pod3 := buildPod(case01_ns, "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{case01_owner}, make(map[string]string))
case01_task3 := NewTaskInfo(case01_pod3)
_, case01_job := getOwners(case01_pod1)

// case2
case02_uid := JobID("owner2")
case02_ns := "c2"
case02_owner := buildOwnerReference(string(case02_uid))
case02_pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case02_owner}, make(map[string]string))
case02_pod1 := buildPod(case02_ns, "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case02_owner}, make(map[string]string))
case02_task1 := NewTaskInfo(case02_pod1)
case02_pod2 := buildPod("c1", "p2", "n1", v1.PodPending, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case02_owner}, make(map[string]string))
case02_pod3 := buildPod("c1", "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{case02_owner}, make(map[string]string))
case02_pod2 := buildPod(case02_ns, "p2", "n1", v1.PodPending, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case02_owner}, make(map[string]string))
case02_pod3 := buildPod(case02_ns, "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{case02_owner}, make(map[string]string))
case02_task3 := NewTaskInfo(case02_pod3)
_, case02_job := getOwners(case02_pod1)

tests := []struct {
name string
Expand All @@ -133,7 +144,11 @@ func TestDeleteTaskInfo(t *testing.T) {
rmPods: []*v1.Pod{case01_pod2},
expected: &JobInfo{
UID: case01_uid,
MinAvailable: 0,
Namespace: case01_ns,
Name: string(case01_uid),
Queue: QueueID(case01_ns),
MinAvailable: 1,
PodGroup: case01_job,
Allocated: buildResource("3000m", "3G"),
TotalRequest: buildResource("4000m", "4G"),
Tasks: tasksMap{
Expand All @@ -155,7 +170,11 @@ func TestDeleteTaskInfo(t *testing.T) {
rmPods: []*v1.Pod{case02_pod2},
expected: &JobInfo{
UID: case02_uid,
MinAvailable: 0,
Namespace: case02_ns,
Name: string(case02_uid),
Queue: QueueID(case02_ns),
MinAvailable: 1,
PodGroup: case02_job,
Allocated: buildResource("3000m", "3G"),
TotalRequest: buildResource("4000m", "4G"),
Tasks: tasksMap{
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
Unknown
)

const (
ShadowPodGroupKey = "kube-batch/shadow-pod-group"
)

func (ts TaskStatus) String() string {
switch ts {
case Pending:
Expand Down
29 changes: 29 additions & 0 deletions pkg/scheduler/api/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"

func ShadowPodGroup(pg *v1alpha1.PodGroup) bool {
if pg == nil {
return true
}

_, found := pg.Annotations[ShadowPodGroupKey]

return found
}
43 changes: 20 additions & 23 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,14 +575,7 @@ func (sc *SchedulerCache) String() string {
if len(sc.Jobs) != 0 {
str = str + "Jobs:\n"
for _, job := range sc.Jobs {
str = str + fmt.Sprintf("\t Job(%s) name(%s) minAvailable(%v)\n",
job.UID, job.Name, job.MinAvailable)

i := 0
for _, task := range job.Tasks {
str = str + fmt.Sprintf("\t\t %d: %v\n", i, task)
i++
}
str = str + fmt.Sprintf("\t %s\n", job)
}
}

Expand All @@ -593,17 +586,19 @@ func (sc *SchedulerCache) String() string {
func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {
jobErrMsg := job.FitError()

pgUnschedulable := job.PodGroup != nil &&
(job.PodGroup.Status.Phase == v1alpha1.PodGroupUnknown ||
job.PodGroup.Status.Phase == v1alpha1.PodGroupPending)
pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0

// If pending or unschedulable, record unschedulable event.
if pgUnschedulable || pdbUnschedulabe {
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError())
sc.Recorder.Eventf(job.PodGroup, v1.EventTypeWarning,
string(v1alpha1.PodGroupUnschedulableType), msg)
if !kbapi.ShadowPodGroup(job.PodGroup) {
pgUnschedulable := job.PodGroup != nil &&
(job.PodGroup.Status.Phase == v1alpha1.PodGroupUnknown ||
job.PodGroup.Status.Phase == v1alpha1.PodGroupPending)
pdbUnschedulabe := job.PDB != nil && len(job.TaskStatusIndex[api.Pending]) != 0

// If pending or unschedulable, record unschedulable event.
if pgUnschedulable || pdbUnschedulabe {
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v",
len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError())
sc.Recorder.Eventf(job.PodGroup, v1.EventTypeWarning,
string(v1alpha1.PodGroupUnschedulableType), msg)
}
}

// Update podCondition for tasks Allocated and Pending before job discarded
Expand All @@ -619,11 +614,13 @@ func (sc *SchedulerCache) RecordJobStatusEvent(job *kbapi.JobInfo) {

// UpdateJobStatus update the status of job and its tasks.
func (sc *SchedulerCache) UpdateJobStatus(job *kbapi.JobInfo) (*kbapi.JobInfo, error) {
pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup)
if err != nil {
return nil, err
if !kbapi.ShadowPodGroup(job.PodGroup) {
pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup)
if err != nil {
return nil, err
}
job.PodGroup = pg
}
job.PodGroup = pg

sc.RecordJobStatusEvent(job)

Expand Down
14 changes: 10 additions & 4 deletions pkg/scheduler/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ func TestAddPod(t *testing.T) {
[]metav1.OwnerReference{owner}, make(map[string]string))
pi2 := api.NewTaskInfo(pod2)

j1 := api.NewJobInfo(api.JobID("j1"))
j1.AddTaskInfo(pi1)
j1.AddTaskInfo(pi2)
j1 := api.NewJobInfo(api.JobID("j1"), pi1, pi2)

node1 := buildNode("n1", buildResourceList("2000m", "10G"))
ni1 := api.NewNodeInfo(node1)
Expand Down Expand Up @@ -186,7 +184,6 @@ func TestAddPod(t *testing.T) {
}

func TestAddNode(t *testing.T) {

// case 1
node1 := buildNode("n1", buildResourceList("2000m", "10G"))
pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"),
Expand All @@ -198,6 +195,11 @@ func TestAddNode(t *testing.T) {
ni1 := api.NewNodeInfo(node1)
ni1.AddTask(pi2)

j1 := api.NewJobInfo("c1-p1")
j1.AddTaskInfo(api.NewTaskInfo(pod1))
j2 := api.NewJobInfo("c1-p2")
j2.AddTaskInfo(api.NewTaskInfo(pod2))

tests := []struct {
pods []*v1.Pod
nodes []*v1.Node
Expand All @@ -210,6 +212,10 @@ func TestAddNode(t *testing.T) {
Nodes: map[string]*api.NodeInfo{
"n1": ni1,
},
Jobs: map[api.JobID]*api.JobInfo{
"c1-p1": j1,
"c1-p2": j2,
},
},
},
}
Expand Down
Loading

0 comments on commit 68b458c

Please sign in to comment.