Skip to content

Commit

Permalink
optimized(scheduler): reduce scheduling time
Browse files Browse the repository at this point in the history
  • Loading branch information
zexi committed Jan 26, 2025
1 parent a437752 commit f07680b
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,15 @@ type cloudregionSchedtagW struct {
func (p *CloudregionSchedtagPredicate) GetInputs(u *core.Unit) []ISchedtagCustomer {
data := u.SchedData()
tags := data.Schedtags
schedtags := GetInputSchedtagByType(tags, computemodels.CloudregionManager.KeywordPlural())
if len(schedtags) == 0 {
return nil
}
return []ISchedtagCustomer{
&cloudregionSchedtagW{
schedData: data,
cloudregion: data.PreferRegion,
schedtags: GetInputSchedtagByType(tags, computemodels.CloudregionManager.KeywordPlural()),
schedtags: schedtags,
}}
}

Expand Down
63 changes: 44 additions & 19 deletions pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"strings"
"sync"

"golang.org/x/sync/errgroup"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/gotypes"
Expand Down Expand Up @@ -355,16 +357,12 @@ func (p *BaseSchedtagPredicate) GetHypervisorDriver() models.IGuestDriver {
return driver
}

func (p *BaseSchedtagPredicate) check(input ISchedtagCustomer, candidate ISchedtagCandidateResource, u *core.Unit, c core.Candidater) (*PredicatedSchedtagResource, error) {
func (p *BaseSchedtagPredicate) check(input ISchedtagCustomer, candidate ISchedtagCandidateResource, u *core.Unit, c core.Candidater, allTags []schedtag.ISchedtag) (*PredicatedSchedtagResource, error) {
// allTags, err := GetAllSchedtags(getSchedtagResourceType(candidate))
// sMan, err := schedtag.GetSessionManager(u.SessionID())
// if err != nil {
// return nil, err
// }
allTags, err := schedtag.GetAllSchedtags(getSchedtagResourceType(candidate))
if err != nil {
return nil, err
}
tagPredicate := NewSchedtagPredicate(input.GetSchedtags(), allTags)
res := &PredicatedSchedtagResource{
ISchedtagCandidateResource: candidate,
Expand All @@ -384,22 +382,39 @@ func (p *BaseSchedtagPredicate) check(input ISchedtagCustomer, candidate ISchedt
return res, nil
}

func (p *BaseSchedtagPredicate) checkResources(input ISchedtagCustomer, ress []ISchedtagCandidateResource, u *core.Unit, c core.Candidater) ([]*PredicatedSchedtagResource, error) {
errs := make([]error, 0)
ret := make([]*PredicatedSchedtagResource, 0)
for _, res := range ress {
ps, err := p.check(input, res, u, c)
if err != nil {
// append err, resource not suit input customer
errs = append(errs, err)
continue
func (p *BaseSchedtagPredicate) checkResources(input ISchedtagCustomer, ress []ISchedtagCandidateResource, u *core.Unit, c core.Candidater, allTags []schedtag.ISchedtag) ([]*PredicatedSchedtagResource, error) {
errs := make([]error, len(ress))
ret := make([]*PredicatedSchedtagResource, len(ress))
errGrp := errgroup.Group{}
for i := range ress {
res := ress[i]
errGrp.Go(func() error {
ps, err := p.check(input, res, u, c, allTags)
if err != nil {
// append err, resource not suit input customer
errs[i] = err
} else {
ret[i] = ps
}
return nil
})
}
if err := errGrp.Wait(); err != nil {
return nil, fmt.Errorf("errGrp.Wait: %v", err)
}
newRet := make([]*PredicatedSchedtagResource, 0)
newErrs := make([]error, 0)
for i := range ress {
if ps := ret[i]; ps != nil {
newRet = append(newRet, ps)
} else {
newErrs = append(newErrs, errs[i])
}
ret = append(ret, ps)
}
if len(ret) == 0 {
return nil, errors.NewAggregate(errs)
if len(newRet) == 0 {
return nil, errors.NewAggregate(newErrs)
}
return ret, nil
return newRet, nil
}

func (p *BaseSchedtagPredicate) GetInputResourcesMap(candidateId string) SchedtagInputResourcesMap {
Expand Down Expand Up @@ -435,8 +450,10 @@ func (p *BaseSchedtagPredicate) Execute(
u *core.Unit,
c core.Candidater,
) (bool, []core.PredicateFailureReason, error) {
//inputTime := time.Now()
inputs := sp.GetInputs(u)
resources := sp.GetResources(c)
//log.Infof("=======%s get input time: %s, inputs: %s", sp.Name(), time.Since(inputTime), jsonutils.Marshal(inputs))

h := NewPredicateHelper(sp, u, c)

Expand Down Expand Up @@ -472,7 +489,14 @@ func (p *BaseSchedtagPredicate) Execute(
filterErrs = append(filterErrs, errs...)
}

matchedResources, err := p.checkResources(input, fitResources, u, c)
allTags, err := schedtag.GetAllSchedtags(getSchedtagResourceType(fitResources[0]))
if err != nil {
h.Exclude(fmt.Sprintf("get all schedtags"))
break
}
//checkTime := time.Now()
matchedResources, err := p.checkResources(input, fitResources, u, c, allTags)
//log.Infof("---%s checkResources time: %s", sp.Name(), time.Since(checkTime))
if err != nil {
if len(filterErrs) > 0 {
h.ExcludeByErrors(filterErrs)
Expand All @@ -483,6 +507,7 @@ func (p *BaseSchedtagPredicate) Execute(
inputRes[idx] = matchedResources
}

//log.Infof("=======%s get execute time: %s", sp.Name(), time.Since(inputTime))
return h.GetResult()
}

Expand Down
59 changes: 37 additions & 22 deletions pkg/scheduler/algorithm/predicates/schedtag_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
computeapi "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/compute/models"
"yunion.io/x/onecloud/pkg/scheduler/data_manager/schedtag"
"yunion.io/x/onecloud/pkg/scheduler/options"
"yunion.io/x/onecloud/pkg/util/conditionparser"
)

Expand Down Expand Up @@ -167,39 +168,46 @@ func GetRequestSchedtags(reqTags []*computeapi.SchedtagConfig, allTags []schedta
type SchedtagChecker struct {
}

type apiTags []computeapi.SchedtagConfig
type apiTags map[string]computeapi.SchedtagConfig

func newApiTags(tags []computeapi.SchedtagConfig) apiTags {
ret := make(map[string]computeapi.SchedtagConfig)
for _, tag := range tags {
ret[tag.Id] = tag
}
return ret
}

func (t apiTags) contains(objTag schedtag.ISchedtag) bool {
for _, tag := range t {
if tag.Id == objTag.GetId() || tag.Id == objTag.GetName() {
return true
}
if _, ok := t[objTag.GetId()]; ok {
return true
}
if _, ok := t[objTag.GetName()]; ok {
return true
}
return false
}

type objTags []schedtag.ISchedtag
type objTags map[string]schedtag.ISchedtag

func (t objTags) contains(atag computeapi.SchedtagConfig) bool {
for _, tag := range t {
if tag.GetId() == atag.Id || tag.GetName() == atag.Id {
return true
}
func newObjTags(tags []schedtag.ISchedtag) objTags {
ret := make(map[string]schedtag.ISchedtag)
for _, tag := range tags {
ret[tag.GetId()] = tag
ret[tag.GetName()] = tag
}
return false
return ret
}

func (c *SchedtagChecker) contains(tags []computeapi.SchedtagConfig, objTag models.SSchedtag) bool {
for _, tag := range tags {
if tag.Id == objTag.Id || tag.Id == objTag.Name {
return true
}
func (t objTags) contains(atag computeapi.SchedtagConfig) bool {
if _, ok := t[atag.Id]; ok {
return true
}
return false
}

func (c *SchedtagChecker) HasIntersection(tags []computeapi.SchedtagConfig, objTags []schedtag.ISchedtag) (bool, schedtag.ISchedtag) {
var atags apiTags = tags
atags := newApiTags(tags)
for _, objTag := range objTags {
if atags.contains(objTag) {
return true, objTag
Expand All @@ -209,7 +217,7 @@ func (c *SchedtagChecker) HasIntersection(tags []computeapi.SchedtagConfig, objT
}

func (c *SchedtagChecker) Contains(objectTags []schedtag.ISchedtag, tags []computeapi.SchedtagConfig) (bool, *computeapi.SchedtagConfig) {
var otags objTags = objectTags
otags := newObjTags(objectTags)
for _, tag := range tags {
if !otags.contains(tag) {
return false, &tag
Expand Down Expand Up @@ -268,18 +276,24 @@ func (c *SchedtagChecker) mergeSchedtags(candiate ISchedtagCandidate, staticTags
func (c *SchedtagChecker) GetCandidateSchedtags(candidate ISchedtagCandidate) ([]schedtag.ISchedtag, error) {
// staticTags := candidate.GetSchedtags()
staticTags := schedtag.GetCandidateSchedtags(candidate.ResourceType(), candidate.GetId())
dynamicTags, err := c.getDynamicSchedtags(candidate.ResourceType(), candidate.GetDynamicSchedDesc())
if err != nil {
return nil, err
dynamicTags := []schedtag.ISchedtag{}
var err error
if options.Options.EnableDynamicSchedtag {
dynamicTags, err = c.getDynamicSchedtags(candidate.ResourceType(), candidate.GetDynamicSchedDesc())
if err != nil {
return nil, err
}
}
return c.mergeSchedtags(candidate, staticTags, dynamicTags), nil
}

func (c *SchedtagChecker) Check(p ISchedtagPredicate, candidate ISchedtagCandidate) error {
//getT := time.Now()
candidateTags, err := c.GetCandidateSchedtags(candidate)
if err != nil {
return err
}
//log.Infof("=====%s getCandidateSchedtags %s =====", candidate.IndexKey(), time.Since(getT))

execludeTags := p.GetExcludeTags()
requireTags := p.GetRequireTags()
Expand All @@ -298,6 +312,7 @@ func (c *SchedtagChecker) Check(p ISchedtagPredicate, candidate ISchedtagCandida
return fmt.Errorf("%s need schedtag: %q", candiInfo, tag.Id)
}
}
//log.Infof("-------%s check time: %s", candidate.IndexKey(), time.Since(getT))

return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,15 @@ type zoneSchedtagInputW struct {
func (p *ZoneSchedtagPredicate) GetInputs(u *core.Unit) []ISchedtagCustomer {
data := u.SchedData()
tags := data.Schedtags
schedtags := GetInputSchedtagByType(tags, computemodels.ZoneManager.KeywordPlural())
if len(schedtags) == 0 {
return nil
}
return []ISchedtagCustomer{
&zoneSchedtagInputW{
schedData: data,
zone: data.PreferZone,
schedtags: GetInputSchedtagByType(tags, computemodels.ZoneManager.KeywordPlural()),
schedtags: schedtags,
},
}
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/scheduler/core/analysor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,38 @@ import (
"time"

"yunion.io/x/log"

"yunion.io/x/onecloud/pkg/scheduler/options"
)

type predicateAnalysor struct {
enable bool
hint string
starts map[string]time.Time
elpased map[string]time.Duration
}

func newPredicateAnalysor(hint string) *predicateAnalysor {
return &predicateAnalysor{
enable: options.Options.EnableAnalysis,
hint: hint,
starts: make(map[string]time.Time),
elpased: make(map[string]time.Duration),
}
}

func (p *predicateAnalysor) Start(pName string) *predicateAnalysor {
if !p.enable {
return p
}
p.starts[pName] = time.Now()
return p
}

func (p *predicateAnalysor) End(pName string, end time.Time) *predicateAnalysor {
if !p.enable {
return p
}
start, ok := p.starts[pName]
if !ok {
panic(fmt.Sprintf("Not found start time of %q", pName))
Expand Down Expand Up @@ -70,6 +80,9 @@ func (p predicateDurations) Less(i, j int) bool {
}

func (p *predicateAnalysor) ShowResult() {
if !p.enable {
return
}
lists := make([]*predicateDuration, 0)
for name, d := range p.elpased {
lists = append(lists, &predicateDuration{
Expand Down
Loading

0 comments on commit f07680b

Please sign in to comment.