Skip to content

Commit

Permalink
Merge pull request kubernetes-retired#535 from Jeffwan/pod_condition
Browse files Browse the repository at this point in the history
Add Unschedulable PodCondition for pods in pending
  • Loading branch information
k8s-ci-robot authored Jan 13, 2019
2 parents d70d63c + fb13a28 commit 38c8002
Show file tree
Hide file tree
Showing 11 changed files with 340 additions and 99 deletions.
286 changes: 201 additions & 85 deletions Gopkg.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@
name = "k8s.io/client-go"
version = "v10.0.0"

[[constraint]]
name = "github.com/stretchr/testify"
version = "1.1.4"

[[override]]
name = "gopkg.in/fsnotify.v1"
source = "https://github.com/fsnotify/fsnotify.git"

[[override]]
name = "github.com/imdario/mergo"
revision = "9316a62528ac99aaecb4e47eadd6dc8aa6533d58"
Expand Down
2 changes: 1 addition & 1 deletion contrib/DLaaS/pkg/scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/contrib/DLaaS/contrib/DLaaS/pkg/scheduler/framework"
"github.com/kubernetes-sigs/kube-batch/contrib/DLaaS/pkg/scheduler/framework"
)

var defaultSchedulerConf = map[string]string{
Expand Down
5 changes: 3 additions & 2 deletions pkg/apis/scheduling/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (
type Event string

const (
EvictEvent Event = "Evict"
UnschedulableEvent Event = "Unschedulable"
EvictEvent Event = "Evict"
UnschedulableEvent Event = "Unschedulable"
FailedSchedulingEvent Event = "FailedScheduling"
)

// PodGroupPhase is the phase of a pod group at the current time.
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
//store information about missing resources
job.NodesFitDelta[node.Name] = node.Idle.Clone()
job.NodesFitDelta[node.Name].FitDelta(task.Resreq)
glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)
}

// Allocate releasing resource to the task if any.
Expand Down
19 changes: 14 additions & 5 deletions pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"testing"
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -119,6 +119,14 @@ func (fb *fakeBinder) Bind(p *v1.Pod, hostname string) error {
return nil
}

type fakeTaskStatusUpdater struct {
}

func (ftsu *fakeTaskStatusUpdater) Update(pod *v1.Pod, podCondition *v1.PodCondition) error {
// do nothing here
return nil
}

func TestAllocate(t *testing.T) {
framework.RegisterPluginBuilder("drf", drf.New)
framework.RegisterPluginBuilder("proportion", proportion.New)
Expand Down Expand Up @@ -227,10 +235,11 @@ func TestAllocate(t *testing.T) {
c: make(chan string),
}
schedulerCache := &cache.SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
Queues: make(map[api.QueueID]*api.QueueInfo),
Binder: binder,
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
Queues: make(map[api.QueueID]*api.QueueInfo),
Binder: binder,
TsUpdater: &fakeTaskStatusUpdater{},
}
for _, node := range test.nodes {
schedulerCache.AddNode(node)
Expand Down
42 changes: 40 additions & 2 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"

arbcorev1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
"github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/scheme"
arbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions"
arbcoreinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
arbapi "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
)

Expand All @@ -61,8 +63,9 @@ type SchedulerCache struct {
podGroupInformer arbcoreinfo.PodGroupInformer
queueInformer arbcoreinfo.QueueInformer

Binder Binder
Evictor Evictor
Binder Binder
Evictor Evictor
TsUpdater TaskStatusUpdater

recorder record.EventRecorder

Expand Down Expand Up @@ -113,6 +116,21 @@ func (de *defaultEvictor) Evict(p *v1.Pod) error {
return nil
}

// defaultTaskStatusUpdater is the default implementation of the TaskStatusUpdater interface
type defaultTaskStatusUpdater struct {
kubeclient *kubernetes.Clientset
}

// Update pod with podCondition
func (tsUpdater *defaultTaskStatusUpdater) Update(pod *v1.Pod, condition *v1.PodCondition) error {
glog.V(3).Infof("Updating pod condition for %s/%s to (%s==%s)", pod.Namespace, pod.Name, condition.Type, condition.Status)
if podutil.UpdatePodCondition(&pod.Status, condition) {
_, err := tsUpdater.kubeclient.CoreV1().Pods(pod.Namespace).UpdateStatus(pod)
return err
}
return nil
}

func taskKey(obj interface{}) (string, error) {
if obj == nil {
return "", fmt.Errorf("the object is nil")
Expand Down Expand Up @@ -166,6 +184,8 @@ func newSchedulerCache(config *rest.Config, schedulerName string, nsAsQueue bool
kubeclient: sc.kubeclient,
}

sc.TsUpdater = &defaultTaskStatusUpdater{kubeclient: sc.kubeclient}

informerFactory := informers.NewSharedInformerFactory(sc.kubeclient, 0)

// create informer for node information
Expand Down Expand Up @@ -368,6 +388,24 @@ func (sc *SchedulerCache) Bind(taskInfo *arbapi.TaskInfo, hostname string) error
return nil
}

// TaskUnschedulable updates pod status of pending task
func (sc *SchedulerCache) TaskUnschedulable(task *api.TaskInfo, event arbcorev1.Event, reason string) error {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

pod := task.Pod.DeepCopy()

sc.recorder.Eventf(pod, v1.EventTypeWarning, string(event), reason)
sc.TsUpdater.Update(pod, &v1.PodCondition{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
Reason: v1.PodReasonUnschedulable,
Message: reason,
})

return nil
}

func (sc *SchedulerCache) deleteJob(job *arbapi.JobInfo) {
glog.V(3).Infof("Try to delete Job <%v:%v/%v>", job.UID, job.Namespace, job.Name)

Expand Down
40 changes: 39 additions & 1 deletion pkg/scheduler/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import (
"reflect"
"testing"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"

arbcorev1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func nodesEqual(l, r map[string]*api.NodeInfo) bool {
Expand Down Expand Up @@ -234,3 +238,37 @@ func TestAddNode(t *testing.T) {
}
}
}

func TestTaskUnschedulable(t *testing.T) {
pod := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"),
[]metav1.OwnerReference{}, make(map[string]string))
taskInfo := api.NewTaskInfo(pod)
jobInfo := api.NewJobInfo("job_id")
jobInfo.AddTaskInfo(taskInfo)

tsUpdater := new(TaskStatusUpdaterMock)

tsUpdater.On("Update", taskInfo.Pod, mock.Anything).Return(nil)

cache := &SchedulerCache{
Nodes: make(map[string]*api.NodeInfo),
Jobs: make(map[api.JobID]*api.JobInfo),
TsUpdater: tsUpdater,
recorder: record.NewFakeRecorder(100),
}

result := cache.TaskUnschedulable(taskInfo, arbcorev1.FailedSchedulingEvent, jobInfo.FitError())
assert.Nil(t, result)

tsUpdater.AssertExpectations(t)

}

type TaskStatusUpdaterMock struct {
mock.Mock
}

func (m *TaskStatusUpdaterMock) Update(pod *v1.Pod, condition *v1.PodCondition) error {
args := m.Called(pod, condition)
return args.Error(0)
}
2 changes: 1 addition & 1 deletion pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/golang/glog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down
11 changes: 9 additions & 2 deletions pkg/scheduler/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ limitations under the License.
package cache

import (
"k8s.io/api/core/v1"

arbcorev1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
v1 "k8s.io/api/core/v1"
)

// Cache collects pods/nodes/queues information
Expand All @@ -47,6 +46,9 @@ type Cache interface {

// Backoff puts job in backlog for a while.
Backoff(job *api.JobInfo, event arbcorev1.Event, reason string) error

// TaskUnschedulable updates pod status of pending task
TaskUnschedulable(task *api.TaskInfo, event arbcorev1.Event, reason string) error
}

type Binder interface {
Expand All @@ -56,3 +58,8 @@ type Binder interface {
type Evictor interface {
Evict(pod *v1.Pod) error
}

// TaskStatusUpdater updates pod with given PodCondition
type TaskStatusUpdater interface {
Update(pod *v1.Pod, podCondition *v1.PodCondition) error
}
22 changes: 22 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,17 @@ func (ssn *Session) Preemptable(preemptor *api.TaskInfo, preemptees []*api.TaskI

// Backoff discards a job from session, so no plugin/action handles it.
func (ssn *Session) Backoff(job *api.JobInfo, event arbcorev1.Event, reason string) error {
jobErrMsg := job.FitError()

// Update podCondition for tasks Allocated and Pending before job discarded
for _, taskInfo := range job.TaskStatusIndex[api.Pending] {
ssn.TaskUnschedulable(taskInfo, arbcorev1.FailedSchedulingEvent, jobErrMsg)
}

for _, taskInfo := range job.TaskStatusIndex[api.Allocated] {
ssn.TaskUnschedulable(taskInfo, arbcorev1.FailedSchedulingEvent, jobErrMsg)
}

if err := ssn.cache.Backoff(job, event, reason); err != nil {
glog.Errorf("Failed to backoff job <%s/%s>: %v",
job.Namespace, job.Name, err)
Expand All @@ -340,6 +351,17 @@ func (ssn *Session) Backoff(job *api.JobInfo, event arbcorev1.Event, reason stri
return nil
}

// TaskUnschedulable updates task status
func (ssn *Session) TaskUnschedulable(task *api.TaskInfo, event arbcorev1.Event, reason string) error {
if err := ssn.cache.TaskUnschedulable(task, event, reason); err != nil {
glog.Errorf("Failed to update unschedulable task status <%s/%s>: %v",
task.Namespace, task.Name, err)
return err
}

return nil
}

func (ssn *Session) AddEventHandler(eh *EventHandler) {
ssn.eventHandlers = append(ssn.eventHandlers, eh)
}
Expand Down

0 comments on commit 38c8002

Please sign in to comment.