Skip to content

Commit 1230e3a

Browse files
ecosysbinecosysbin
ecosysbin
authored andcommitted
Add network-topology-aware plugin and hyperNode score callback
Signed-off-by: ecosysbin <14729934+ecosysbin@user.noreply.gitee.com>
1 parent 67957a1 commit 1230e3a

File tree

11 files changed

+2931
-26
lines changed

11 files changed

+2931
-26
lines changed

pkg/scheduler/actions/allocate/allocate.go

+17-20
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package allocate
1818

1919
import (
20-
"sort"
2120
"time"
2221

2322
"k8s.io/klog/v2"
@@ -34,7 +33,6 @@ type Action struct {
3433
session *framework.Session
3534
// configured flag for error cache
3635
enablePredicateErrorCache bool
37-
hyperNodesTiers []int
3836

3937
// hyperNodeScoresByJob stores job total score for all available hyperNodes, this is used for accumulate
4038
// all nodes' scores in each available hyperNode only when job has hard network topology constrains
@@ -45,7 +43,6 @@ type Action struct {
4543
func New() *Action {
4644
return &Action{
4745
enablePredicateErrorCache: true, // default to enable it
48-
hyperNodesTiers: []int{},
4946
hyperNodeScoresByJob: make(map[string]map[string]float64),
5047
}
5148
}
@@ -61,26 +58,11 @@ func (alloc *Action) parseArguments(ssn *framework.Session) {
6158
arguments.GetBool(&alloc.enablePredicateErrorCache, conf.EnablePredicateErrCacheKey)
6259
}
6360

64-
func (alloc *Action) parseHyperNodesTiers(ssn *framework.Session) {
65-
if ssn.HyperNodesSetByTier == nil || len(ssn.HyperNodesSetByTier) == 0 {
66-
return
67-
}
68-
69-
// sort to guarantee the traverse order is from down to top.
70-
var tiers []int
71-
for tier := range ssn.HyperNodesSetByTier {
72-
tiers = append(tiers, tier)
73-
}
74-
sort.Ints(tiers)
75-
alloc.hyperNodesTiers = tiers
76-
}
77-
7861
func (alloc *Action) Execute(ssn *framework.Session) {
7962
klog.V(5).Infof("Enter Allocate ...")
8063
defer klog.V(5).Infof("Leaving Allocate ...")
8164

8265
alloc.parseArguments(ssn)
83-
alloc.parseHyperNodesTiers(ssn)
8466

8567
// the allocation for pod may have many stages
8668
// 1. pick a queue named Q (using ssn.QueueOrderFn)
@@ -241,7 +223,7 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
241223
jobAllocatedHyperNode := job.PodGroup.Annotations[api.JobAllocatedHyperNode]
242224

243225
// Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier.
244-
for _, tier := range alloc.hyperNodesTiers {
226+
for _, tier := range ssn.HyperNodesTiers {
245227
if tier > highestAllowedTier {
246228
klog.V(4).ErrorS(nil, "Skip search for higher tier cause highest allowed tier reached", "jobName", job.UID, "highestAllowedTier", highestAllowedTier, "tier", tier)
247229
break
@@ -375,6 +357,8 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
375357
ssn := alloc.session
376358
stmt := framework.NewStatement(ssn)
377359
ph := util.NewPredicateHelper()
360+
jobAllocatedHyperNode := job.PodGroup.Annotations[api.JobAllocatedHyperNode]
361+
jobAllocatedNewHyperNode := jobAllocatedHyperNode
378362

379363
for !tasks.Empty() {
380364
task := tasks.Pop().(*api.TaskInfo)
@@ -414,11 +398,21 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
414398
}
415399
}
416400

401+
task.JobAllocatedHyperNode = jobAllocatedNewHyperNode
417402
bestNode, highestScore := alloc.prioritizeNodes(ssn, task, predicateNodes)
418403
if bestNode == nil {
419404
continue
420405
}
421-
406+
if hyperNode == "" {
407+
hyperNode = util.FindHyperNodeForNode(bestNode.Name, ssn.RealNodesList, ssn.HyperNodesTiers, ssn.HyperNodesSetByTier)
408+
if hyperNode != "" {
409+
if jobAllocatedNewHyperNode == "" {
410+
jobAllocatedNewHyperNode = hyperNode
411+
} else {
412+
jobAllocatedNewHyperNode = ssn.HyperNodes.GetLCAHyperNode(hyperNode, jobAllocatedNewHyperNode)
413+
}
414+
}
415+
}
422416
alloc.sumNodeScoresInHyperNode(string(job.UID), hyperNode, highestScore)
423417
alloc.allocateResourcesForTask(stmt, task, bestNode, job)
424418

@@ -429,6 +423,9 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
429423

430424
if ssn.JobReady(job) {
431425
klog.V(3).InfoS("Job ready, return statement", "jobName", job.UID)
426+
if jobAllocatedNewHyperNode != "" && jobAllocatedNewHyperNode != jobAllocatedHyperNode {
427+
job.PodGroup.GetAnnotations()[api.JobAllocatedHyperNode] = jobAllocatedNewHyperNode
428+
}
432429
return stmt
433430
} else {
434431
if !ssn.JobPipelined(job) {

0 commit comments

Comments
 (0)