Skip to content

Commit

Permalink
Merge pull request kubernetes-retired#584 from k82cn/fifo_2_wq
Browse files Browse the repository at this point in the history
Replaced FIFO by workqueue.
  • Loading branch information
k8s-ci-robot authored Feb 11, 2019
2 parents 2a4b52c + 8188e6c commit 9b9fc70
Showing 1 changed file with 47 additions and 94 deletions.
141 changes: 47 additions & 94 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
infov1 "k8s.io/client-go/informers/core/v1"
policyv1 "k8s.io/client-go/informers/policy/v1beta1"
Expand All @@ -35,6 +36,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/scheduler/volumebinder"

Expand Down Expand Up @@ -79,8 +81,8 @@ type SchedulerCache struct {
Nodes map[string]*kbapi.NodeInfo
Queues map[kbapi.QueueID]*kbapi.QueueInfo

errTasks *cache.FIFO
deletedJobs *cache.FIFO
errTasks workqueue.RateLimitingInterface
deletedJobs workqueue.RateLimitingInterface

namespaceAsQueue bool
}
Expand Down Expand Up @@ -164,41 +166,13 @@ func (dvb *defaultVolumeBinder) BindVolumes(task *api.TaskInfo) error {
return dvb.volumeBinder.Binder.BindPodVolumes(task.Pod)
}

func taskKey(obj interface{}) (string, error) {
if obj == nil {
return "", fmt.Errorf("the object is nil")
}

task, ok := obj.(*kbapi.TaskInfo)

if !ok {
return "", fmt.Errorf("failed to convert %v to TaskInfo", obj)
}

return string(task.UID), nil
}

func jobKey(obj interface{}) (string, error) {
if obj == nil {
return "", fmt.Errorf("the object is nil")
}

job, ok := obj.(*kbapi.JobInfo)

if !ok {
return "", fmt.Errorf("failed to convert %v to TaskInfo", obj)
}

return string(job.UID), nil
}

func newSchedulerCache(config *rest.Config, schedulerName string, nsAsQueue bool) *SchedulerCache {
sc := &SchedulerCache{
Jobs: make(map[kbapi.JobID]*kbapi.JobInfo),
Nodes: make(map[string]*kbapi.NodeInfo),
Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo),
errTasks: cache.NewFIFO(taskKey),
deletedJobs: cache.NewFIFO(jobKey),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
deletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeclient: kubernetes.NewForConfigOrDie(config),
kbclient: kbver.NewForConfigOrDie(config),
namespaceAsQueue: nsAsQueue,
Expand Down Expand Up @@ -324,10 +298,10 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
}

// Re-sync error tasks.
go sc.resync()
go wait.Until(sc.processResyncTask, 0, stopCh)

// Cleanup jobs.
go sc.cleanupJobs()
go wait.Until(sc.processCleanupJob, 0, stopCh)
}

func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool {
Expand Down Expand Up @@ -388,7 +362,9 @@ func (sc *SchedulerCache) Evict(taskInfo *kbapi.TaskInfo, reason string) error {
}

// Add new task to node.
node.UpdateTask(task)
if err := node.UpdateTask(task); err != nil {
return err
}

p := task.Pod

Expand Down Expand Up @@ -430,7 +406,9 @@ func (sc *SchedulerCache) Bind(taskInfo *kbapi.TaskInfo, hostname string) error
task.NodeName = hostname

// Add task to the node.
node.AddTask(task)
if err := node.AddTask(task); err != nil {
return err
}

p := task.Pod

Expand Down Expand Up @@ -476,77 +454,52 @@ func (sc *SchedulerCache) taskUnschedulable(task *api.TaskInfo, message string)
func (sc *SchedulerCache) deleteJob(job *kbapi.JobInfo) {
glog.V(3).Infof("Try to delete Job <%v:%v/%v>", job.UID, job.Namespace, job.Name)

time.AfterFunc(5*time.Second, func() {
sc.deletedJobs.AddIfNotPresent(job)
})
sc.deletedJobs.AddRateLimited(job)
}

func (sc *SchedulerCache) processCleanupJob() error {
_, err := sc.deletedJobs.Pop(func(obj interface{}) error {
job, ok := obj.(*kbapi.JobInfo)
if !ok {
return fmt.Errorf("failed to convert %v to *v1.Pod", obj)
}

func() {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

if kbapi.JobTerminated(job) {
delete(sc.Jobs, job.UID)
glog.V(3).Infof("Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name)
} else {
// Retry
sc.deleteJob(job)
}
}()
func (sc *SchedulerCache) processCleanupJob() {
obj, shutdown := sc.deletedJobs.Get()
if shutdown {
return
}

return nil
})
job, found := obj.(*kbapi.JobInfo)
if !found {
glog.Errorf("Failed to convert <%v> to *JobInfo", obj)
return
}

return err
}
sc.Mutex.Lock()
defer sc.Mutex.Unlock()

func (sc *SchedulerCache) cleanupJobs() {
for {
err := sc.processCleanupJob()
if err != nil {
glog.Errorf("Failed to process job clean up: %v", err)
}
if kbapi.JobTerminated(job) {
delete(sc.Jobs, job.UID)
glog.V(3).Infof("Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name)
} else {
// Retry
sc.deleteJob(job)
}
}

func (sc *SchedulerCache) resyncTask(task *kbapi.TaskInfo) {
if err := sc.errTasks.AddIfNotPresent(task); err != nil {
glog.Errorf("Failed to re-sync tasks <%v/%v>: %v",
task.Namespace, task.Name, err)
}
sc.errTasks.AddRateLimited(task)
}

func (sc *SchedulerCache) resync() {
for {
err := sc.processResyncTask()
if err != nil {
glog.Errorf("Failed to process resync: %v", err)
}
func (sc *SchedulerCache) processResyncTask() {
obj, shutdown := sc.errTasks.Get()
if shutdown {
return
}
task, ok := obj.(*kbapi.TaskInfo)
if !ok {
glog.Errorf("failed to convert %v to *v1.Pod", obj)
return
}
}

func (sc *SchedulerCache) processResyncTask() error {
_, err := sc.errTasks.Pop(func(obj interface{}) error {
task, ok := obj.(*kbapi.TaskInfo)
if !ok {
return fmt.Errorf("failed to convert %v to *v1.Pod", obj)
}

if err := sc.syncTask(task); err != nil {
glog.Errorf("Failed to sync pod <%v/%v>", task.Namespace, task.Name)
return err
}
return nil
})

return err
if err := sc.syncTask(task); err != nil {
glog.Errorf("Failed to sync pod <%v/%v>, retry it.", task.Namespace, task.Name)
sc.resyncTask(task)
}
}

func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
Expand Down

0 comments on commit 9b9fc70

Please sign in to comment.