Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #22004: Scheduler performance optimized #22030

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/hostman/hostdeployer/apis/deploy_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,15 @@ type cloudregionSchedtagW struct {
func (p *CloudregionSchedtagPredicate) GetInputs(u *core.Unit) []ISchedtagCustomer {
data := u.SchedData()
tags := data.Schedtags
schedtags := GetInputSchedtagByType(tags, computemodels.CloudregionManager.KeywordPlural())
if len(schedtags) == 0 {
return nil
}
return []ISchedtagCustomer{
&cloudregionSchedtagW{
schedData: data,
cloudregion: data.PreferRegion,
schedtags: GetInputSchedtagByType(tags, computemodels.CloudregionManager.KeywordPlural()),
schedtags: schedtags,
}}
}

Expand Down
63 changes: 44 additions & 19 deletions pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"strings"
"sync"

"golang.org/x/sync/errgroup"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/gotypes"
Expand Down Expand Up @@ -355,16 +357,12 @@ func (p *BaseSchedtagPredicate) GetHypervisorDriver() models.IGuestDriver {
return driver
}

func (p *BaseSchedtagPredicate) check(input ISchedtagCustomer, candidate ISchedtagCandidateResource, u *core.Unit, c core.Candidater) (*PredicatedSchedtagResource, error) {
func (p *BaseSchedtagPredicate) check(input ISchedtagCustomer, candidate ISchedtagCandidateResource, u *core.Unit, c core.Candidater, allTags []schedtag.ISchedtag) (*PredicatedSchedtagResource, error) {
// allTags, err := GetAllSchedtags(getSchedtagResourceType(candidate))
// sMan, err := schedtag.GetSessionManager(u.SessionID())
// if err != nil {
// return nil, err
// }
allTags, err := schedtag.GetAllSchedtags(getSchedtagResourceType(candidate))
if err != nil {
return nil, err
}
tagPredicate := NewSchedtagPredicate(input.GetSchedtags(), allTags)
res := &PredicatedSchedtagResource{
ISchedtagCandidateResource: candidate,
Expand All @@ -384,22 +382,39 @@ func (p *BaseSchedtagPredicate) check(input ISchedtagCustomer, candidate ISchedt
return res, nil
}

func (p *BaseSchedtagPredicate) checkResources(input ISchedtagCustomer, ress []ISchedtagCandidateResource, u *core.Unit, c core.Candidater) ([]*PredicatedSchedtagResource, error) {
errs := make([]error, 0)
ret := make([]*PredicatedSchedtagResource, 0)
for _, res := range ress {
ps, err := p.check(input, res, u, c)
if err != nil {
// append err, resource not suit input customer
errs = append(errs, err)
continue
func (p *BaseSchedtagPredicate) checkResources(input ISchedtagCustomer, ress []ISchedtagCandidateResource, u *core.Unit, c core.Candidater, allTags []schedtag.ISchedtag) ([]*PredicatedSchedtagResource, error) {
errs := make([]error, len(ress))
ret := make([]*PredicatedSchedtagResource, len(ress))
errGrp := errgroup.Group{}
for i := range ress {
res := ress[i]
errGrp.Go(func() error {
ps, err := p.check(input, res, u, c, allTags)
if err != nil {
// append err, resource not suit input customer
errs[i] = err
} else {
ret[i] = ps
}
return nil
})
}
if err := errGrp.Wait(); err != nil {
return nil, fmt.Errorf("errGrp.Wait: %v", err)
}
newRet := make([]*PredicatedSchedtagResource, 0)
newErrs := make([]error, 0)
for i := range ress {
if ps := ret[i]; ps != nil {
newRet = append(newRet, ps)
} else {
newErrs = append(newErrs, errs[i])
}
ret = append(ret, ps)
}
if len(ret) == 0 {
return nil, errors.NewAggregate(errs)
if len(newRet) == 0 {
return nil, errors.NewAggregate(newErrs)
}
return ret, nil
return newRet, nil
}

func (p *BaseSchedtagPredicate) GetInputResourcesMap(candidateId string) SchedtagInputResourcesMap {
Expand Down Expand Up @@ -435,8 +450,10 @@ func (p *BaseSchedtagPredicate) Execute(
u *core.Unit,
c core.Candidater,
) (bool, []core.PredicateFailureReason, error) {
//inputTime := time.Now()
inputs := sp.GetInputs(u)
resources := sp.GetResources(c)
//log.Infof("=======%s get input time: %s, inputs: %s", sp.Name(), time.Since(inputTime), jsonutils.Marshal(inputs))

h := NewPredicateHelper(sp, u, c)

Expand Down Expand Up @@ -472,7 +489,14 @@ func (p *BaseSchedtagPredicate) Execute(
filterErrs = append(filterErrs, errs...)
}

matchedResources, err := p.checkResources(input, fitResources, u, c)
allTags, err := schedtag.GetAllSchedtags(getSchedtagResourceType(fitResources[0]))
if err != nil {
h.Exclude(fmt.Sprintf("get all schedtags"))
break
}
//checkTime := time.Now()
matchedResources, err := p.checkResources(input, fitResources, u, c, allTags)
//log.Infof("---%s checkResources time: %s", sp.Name(), time.Since(checkTime))
if err != nil {
if len(filterErrs) > 0 {
h.ExcludeByErrors(filterErrs)
Expand All @@ -483,6 +507,7 @@ func (p *BaseSchedtagPredicate) Execute(
inputRes[idx] = matchedResources
}

//log.Infof("=======%s get execute time: %s", sp.Name(), time.Since(inputTime))
return h.GetResult()
}

Expand Down
59 changes: 37 additions & 22 deletions pkg/scheduler/algorithm/predicates/schedtag_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
computeapi "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/compute/models"
"yunion.io/x/onecloud/pkg/scheduler/data_manager/schedtag"
"yunion.io/x/onecloud/pkg/scheduler/options"
"yunion.io/x/onecloud/pkg/util/conditionparser"
)

Expand Down Expand Up @@ -167,39 +168,46 @@ func GetRequestSchedtags(reqTags []*computeapi.SchedtagConfig, allTags []schedta
type SchedtagChecker struct {
}

type apiTags []computeapi.SchedtagConfig
type apiTags map[string]computeapi.SchedtagConfig

func newApiTags(tags []computeapi.SchedtagConfig) apiTags {
ret := make(map[string]computeapi.SchedtagConfig)
for _, tag := range tags {
ret[tag.Id] = tag
}
return ret
}

func (t apiTags) contains(objTag schedtag.ISchedtag) bool {
for _, tag := range t {
if tag.Id == objTag.GetId() || tag.Id == objTag.GetName() {
return true
}
if _, ok := t[objTag.GetId()]; ok {
return true
}
if _, ok := t[objTag.GetName()]; ok {
return true
}
return false
}

type objTags []schedtag.ISchedtag
type objTags map[string]schedtag.ISchedtag

func (t objTags) contains(atag computeapi.SchedtagConfig) bool {
for _, tag := range t {
if tag.GetId() == atag.Id || tag.GetName() == atag.Id {
return true
}
func newObjTags(tags []schedtag.ISchedtag) objTags {
ret := make(map[string]schedtag.ISchedtag)
for _, tag := range tags {
ret[tag.GetId()] = tag
ret[tag.GetName()] = tag
}
return false
return ret
}

func (c *SchedtagChecker) contains(tags []computeapi.SchedtagConfig, objTag models.SSchedtag) bool {
for _, tag := range tags {
if tag.Id == objTag.Id || tag.Id == objTag.Name {
return true
}
func (t objTags) contains(atag computeapi.SchedtagConfig) bool {
if _, ok := t[atag.Id]; ok {
return true
}
return false
}

func (c *SchedtagChecker) HasIntersection(tags []computeapi.SchedtagConfig, objTags []schedtag.ISchedtag) (bool, schedtag.ISchedtag) {
var atags apiTags = tags
atags := newApiTags(tags)
for _, objTag := range objTags {
if atags.contains(objTag) {
return true, objTag
Expand All @@ -209,7 +217,7 @@ func (c *SchedtagChecker) HasIntersection(tags []computeapi.SchedtagConfig, objT
}

func (c *SchedtagChecker) Contains(objectTags []schedtag.ISchedtag, tags []computeapi.SchedtagConfig) (bool, *computeapi.SchedtagConfig) {
var otags objTags = objectTags
otags := newObjTags(objectTags)
for _, tag := range tags {
if !otags.contains(tag) {
return false, &tag
Expand Down Expand Up @@ -268,18 +276,24 @@ func (c *SchedtagChecker) mergeSchedtags(candiate ISchedtagCandidate, staticTags
func (c *SchedtagChecker) GetCandidateSchedtags(candidate ISchedtagCandidate) ([]schedtag.ISchedtag, error) {
// staticTags := candidate.GetSchedtags()
staticTags := schedtag.GetCandidateSchedtags(candidate.ResourceType(), candidate.GetId())
dynamicTags, err := c.getDynamicSchedtags(candidate.ResourceType(), candidate.GetDynamicSchedDesc())
if err != nil {
return nil, err
dynamicTags := []schedtag.ISchedtag{}
var err error
if options.Options.EnableDynamicSchedtag {
dynamicTags, err = c.getDynamicSchedtags(candidate.ResourceType(), candidate.GetDynamicSchedDesc())
if err != nil {
return nil, err
}
}
return c.mergeSchedtags(candidate, staticTags, dynamicTags), nil
}

func (c *SchedtagChecker) Check(p ISchedtagPredicate, candidate ISchedtagCandidate) error {
//getT := time.Now()
candidateTags, err := c.GetCandidateSchedtags(candidate)
if err != nil {
return err
}
//log.Infof("=====%s getCandidateSchedtags %s =====", candidate.IndexKey(), time.Since(getT))

execludeTags := p.GetExcludeTags()
requireTags := p.GetRequireTags()
Expand All @@ -298,6 +312,7 @@ func (c *SchedtagChecker) Check(p ISchedtagPredicate, candidate ISchedtagCandida
return fmt.Errorf("%s need schedtag: %q", candiInfo, tag.Id)
}
}
//log.Infof("-------%s check time: %s", candidate.IndexKey(), time.Since(getT))

return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,15 @@ type zoneSchedtagInputW struct {
func (p *ZoneSchedtagPredicate) GetInputs(u *core.Unit) []ISchedtagCustomer {
data := u.SchedData()
tags := data.Schedtags
schedtags := GetInputSchedtagByType(tags, computemodels.ZoneManager.KeywordPlural())
if len(schedtags) == 0 {
return nil
}
return []ISchedtagCustomer{
&zoneSchedtagInputW{
schedData: data,
zone: data.PreferZone,
schedtags: GetInputSchedtagByType(tags, computemodels.ZoneManager.KeywordPlural()),
schedtags: schedtags,
},
}
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/scheduler/core/analysor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,38 @@ import (
"time"

"yunion.io/x/log"

"yunion.io/x/onecloud/pkg/scheduler/options"
)

type predicateAnalysor struct {
enable bool
hint string
starts map[string]time.Time
elpased map[string]time.Duration
}

func newPredicateAnalysor(hint string) *predicateAnalysor {
return &predicateAnalysor{
enable: options.Options.EnableAnalysis,
hint: hint,
starts: make(map[string]time.Time),
elpased: make(map[string]time.Duration),
}
}

func (p *predicateAnalysor) Start(pName string) *predicateAnalysor {
if !p.enable {
return p
}
p.starts[pName] = time.Now()
return p
}

func (p *predicateAnalysor) End(pName string, end time.Time) *predicateAnalysor {
if !p.enable {
return p
}
start, ok := p.starts[pName]
if !ok {
panic(fmt.Sprintf("Not found start time of %q", pName))
Expand Down Expand Up @@ -70,6 +80,9 @@ func (p predicateDurations) Less(i, j int) bool {
}

func (p *predicateAnalysor) ShowResult() {
if !p.enable {
return
}
lists := make([]*predicateDuration, 0)
for name, d := range p.elpased {
lists = append(lists, &predicateDuration{
Expand Down
Loading
Loading