Skip to content

Commit

Permalink
[YUNIKORN-1929] fix update group configs
Browse files Browse the repository at this point in the history
Signed-off-by: Frank Yang <yangpoan@gmail.com>
  • Loading branch information
FrankYang0529 committed Aug 25, 2023
1 parent 96553dd commit 0e20e7a
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 213 deletions.
69 changes: 40 additions & 29 deletions pkg/scheduler/ugm/group_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
Expand Down Expand Up @@ -98,22 +99,48 @@ func (gt *GroupTracker) GetGroupResourceUsageDAOInfo() *dao.GroupResourceUsageDA
return groupResourceUsage
}

func (gt *GroupTracker) IsQueuePathTrackedCompletely(queuePath string) bool {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.IsQueuePathTrackedCompletely(queuePath)
func (gt *GroupTracker) ClearEarlierSetLimits(prefixPath string, wildcardLimits map[string]*LimitConfig, pathLimits map[string]bool, removedApp map[string]bool) {
gt.Lock()
defer gt.Unlock()
gt.internalClearEarlierSetLimits(prefixPath, gt.queueTracker, wildcardLimits, pathLimits, removedApp)
}

func (gt *GroupTracker) IsUnlinkRequired(queuePath string) bool {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.IsUnlinkRequired(queuePath)
}
func (gt *GroupTracker) internalClearEarlierSetLimits(prefixPath string, queueTracker *QueueTracker, wildcardLimits map[string]*LimitConfig, pathLimits map[string]bool, removedApp map[string]bool) bool {
queuePath := prefixPath + configs.DOT + queueTracker.queueName
if prefixPath == common.Empty {
queuePath = queueTracker.queueName
}

func (gt *GroupTracker) UnlinkQT(queuePath string) bool {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.UnlinkQT(queuePath)
allLeafQueueRemoved := true
for _, childQt := range queueTracker.childQueueTrackers {
if gt.internalClearEarlierSetLimits(queuePath, childQt, wildcardLimits, pathLimits, removedApp) {
delete(queueTracker.childQueueTrackers, childQt.queueName)
} else {
allLeafQueueRemoved = false
}
}

currentPathHasLimits := wildcardLimits[queuePath] != nil || pathLimits[queuePath]
if allLeafQueueRemoved && !currentPathHasLimits {
for app := range queueTracker.runningApplications {
removedApp[app] = true
}
queueTracker.runningApplications = make(map[string]bool)
queueTracker.resourceUsage = resources.NewResource()
return true
} else {
// if we don't remove current tracker, we have to keep applications in current tracker
for app := range queueTracker.runningApplications {
delete(removedApp, app)
}
}

// if only have wildcard limits, then clear the limits
if wildcardLimits[queuePath] != nil && !pathLimits[queuePath] {
queueTracker.maxRunningApps = 0
queueTracker.maxResources = resources.NewResource()
}
return false
}

// canBeRemoved Does "root" queue has any child queue trackers? Is there any running applications in "root" qt?
Expand All @@ -130,22 +157,6 @@ func (gt *GroupTracker) getName() string {
return gt.groupName
}

func (gt *GroupTracker) decreaseAllTrackedResourceUsage(queuePath string) map[string]string {
if gt == nil {
return nil
}
gt.Lock()
defer gt.Unlock()
applications := gt.queueTracker.decreaseTrackedResourceUsageDownwards(queuePath)
removedApplications := make(map[string]string)
for app := range applications {
if u, ok := gt.applications[app]; ok {
removedApplications[app] = u
}
}
return removedApplications
}

func (gt *GroupTracker) canRunApp(queuePath, applicationID string) bool {
gt.Lock()
defer gt.Unlock()
Expand Down
146 changes: 62 additions & 84 deletions pkg/scheduler/ugm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,18 @@ func (m *Manager) UpdateConfig(config configs.QueueConfig, queuePath string) err
m.userWildCardLimitsConfig = make(map[string]*LimitConfig)
m.groupWildCardLimitsConfig = make(map[string]*LimitConfig)
m.configuredGroups = make(map[string][]string)
return m.internalProcessConfig(config, queuePath)

userNamePathLimits := make(map[string]map[string]bool)
groupNamePathLimits := make(map[string]map[string]bool)
if err := m.internalProcessConfig(config, queuePath, userNamePathLimits, groupNamePathLimits); err != nil {
return err
}

Check warning on line 326 in pkg/scheduler/ugm/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/ugm/manager.go#L325-L326

Added lines #L325 - L326 were not covered by tests
m.clearEarlierSetLimits(userNamePathLimits, groupNamePathLimits)
m.setWildcardGroupLimitsToConfiguredGroups()
return nil
}

func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath string) error {
// Holds user and group for which limits have been configured with specific queue path
userLimits := make(map[string]bool)
groupLimits := make(map[string]bool)
func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath string, userNamePathLimits, groupNamePathLimits map[string]map[string]bool) error {
// Traverse limits of specific queue path
for _, limit := range cur.Limits {
var maxResource *resources.Resource
Expand All @@ -351,7 +356,7 @@ func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath strin
m.userWildCardLimitsConfig[queuePath] = limitConfig
continue
}
if err := m.processUserConfig(user, limitConfig, queuePath, userLimits); err != nil {
if err := m.processUserConfig(user, limitConfig, queuePath, userNamePathLimits); err != nil {
return err
}
}
Expand All @@ -365,126 +370,99 @@ func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath strin
zap.String("queue path", queuePath),
zap.Uint64("max application", limit.MaxApplications),
zap.Any("max resources", limit.MaxResources))
if err := m.processGroupConfig(group, limitConfig, queuePath, groupLimits); err != nil {
return err
}
if group == common.Wildcard {
m.groupWildCardLimitsConfig[queuePath] = limitConfig
} else {
m.configuredGroups[queuePath] = append(m.configuredGroups[queuePath], group)
continue
}
}
}
if err := m.clearEarlierSetLimits(userLimits, groupLimits, queuePath); err != nil {
return err
}

if len(cur.Queues) > 0 {
for _, child := range cur.Queues {
childQueuePath := queuePath + configs.DOT + child.Name
if err := m.internalProcessConfig(child, childQueuePath); err != nil {
if err := m.processGroupConfig(group, limitConfig, queuePath, groupNamePathLimits); err != nil {
return err
}
m.configuredGroups[queuePath] = append(m.configuredGroups[queuePath], group)
}
}
for _, child := range cur.Queues {
childQueuePath := queuePath + configs.DOT + child.Name
if err := m.internalProcessConfig(child, childQueuePath, userNamePathLimits, groupNamePathLimits); err != nil {
return err

Check warning on line 386 in pkg/scheduler/ugm/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/ugm/manager.go#L386

Added line #L386 was not covered by tests
}
}
return nil
}

func (m *Manager) processUserConfig(user string, limitConfig *LimitConfig, queuePath string, userLimits map[string]bool) error {
func (m *Manager) processUserConfig(user string, limitConfig *LimitConfig, queuePath string, userNamePathLimits map[string]map[string]bool) error {
if err := m.setUserLimits(user, limitConfig, queuePath); err != nil {
return err
}
userLimits[user] = true
if len(userNamePathLimits[user]) == 0 {
userNamePathLimits[user] = make(map[string]bool)
}
userNamePathLimits[user][queuePath] = true
return nil
}

func (m *Manager) processGroupConfig(group string, limitConfig *LimitConfig, queuePath string, groupLimits map[string]bool) error {
func (m *Manager) processGroupConfig(group string, limitConfig *LimitConfig, queuePath string, groupNamePathLimits map[string]map[string]bool) error {
if err := m.setGroupLimits(group, limitConfig, queuePath); err != nil {
return err
}
groupLimits[group] = true
if len(groupNamePathLimits[group]) == 0 {
groupNamePathLimits[group] = make(map[string]bool)
}
groupNamePathLimits[group][queuePath] = true
return nil
}

// clearEarlierSetLimits Clear already configured limits of users and groups for which limits have been configured before but not now
func (m *Manager) clearEarlierSetLimits(userLimits map[string]bool, groupLimits map[string]bool, queuePath string) error {
func (m *Manager) clearEarlierSetLimits(userNamePathLimits, groupNamePathLimits map[string]map[string]bool) {
// Clear already configured limits of group for which limits have been configured before but not now
for _, gt := range m.groupTrackers {
appUsersMap := m.clearEarlierSetGroupLimits(gt, queuePath, groupLimits)
if len(appUsersMap) > 0 {
for app, user := range appUsersMap {
ut := m.userTrackers[user]
ut.setGroupForApp(app, nil)
}
appUsersMap := m.clearEarlierSetGroupLimits(gt, groupNamePathLimits[gt.groupName])
for app, user := range appUsersMap {
ut := m.userTrackers[user]
ut.setGroupForApp(app, nil)
}
}

// Clear already configured limits of user for which limits have been configured before but not now
for _, ut := range m.userTrackers {
m.clearEarlierSetUserLimits(ut, queuePath, userLimits)
m.clearEarlierSetUserLimits(ut, userNamePathLimits[ut.userName])
}
return nil
}

func (m *Manager) clearEarlierSetUserLimits(ut *UserTracker, queuePath string, userLimits map[string]bool) {
// Is this user already tracked for the queue path?
if ut.IsQueuePathTrackedCompletely(queuePath) {
u := ut.userName
// Is there any limit config set for user in the current configuration? If not, then clear those old limit settings
if _, ok := userLimits[u]; !ok {
log.Log(log.SchedUGM).Debug("Need to clear earlier set configs for user",
zap.String("user", u),
zap.String("queue path", queuePath))
// Is there any running applications in end queue of this queue path? If not, then remove the linkage between end queue and its immediate parent
if ut.IsUnlinkRequired(queuePath) {
ut.UnlinkQT(queuePath)
} else {
ut.setLimits(queuePath, resources.NewResource(), 0)
log.Log(log.SchedUGM).Debug("Cleared earlier set limit configs for user",
zap.String("user", u),
zap.String("queue path", queuePath))
}
// Does "root" queue has any child queue trackers? At some point during this whole traversal, root might
// not have any child queue trackers. When the situation comes, remove the linkage between the user and
// its root queue tracker
if ut.canBeRemoved() {
delete(m.userTrackers, ut.userName)
}
}
func (m *Manager) clearEarlierSetUserLimits(ut *UserTracker, pathLimits map[string]bool) {
ut.ClearEarlierSetLimits("", m.userWildCardLimitsConfig, pathLimits)
if ut.canBeRemoved() {
log.Log(log.SchedUGM).Debug("Removing tracker from manager", zap.String("user", ut.userName))
delete(m.userTrackers, ut.userName)
}
}

func (m *Manager) clearEarlierSetGroupLimits(gt *GroupTracker, queuePath string, groupLimits map[string]bool) map[string]string {
func (m *Manager) clearEarlierSetGroupLimits(gt *GroupTracker, pathLimits map[string]bool) map[string]string {
appUsersMap := make(map[string]string)
// Is this group already tracked for the queue path?
if gt.IsQueuePathTrackedCompletely(queuePath) {
g := gt.groupName
// Is there any limit config set for group in the current configuration? If not, then clear those old limit settings
if ok := groupLimits[g]; !ok {
log.Log(log.SchedUGM).Debug("Need to clear earlier set configs for group",
zap.String("group", g),
zap.String("queue path", queuePath))
appUsersMap = gt.decreaseAllTrackedResourceUsage(queuePath)
// Is there any running applications in end queue of this queue path? If not, then remove the linkage between end queue and its immediate parent
if gt.IsUnlinkRequired(queuePath) {
gt.UnlinkQT(queuePath)
} else {
gt.setLimits(queuePath, resources.NewResource(), 0)
log.Log(log.SchedUGM).Debug("Cleared earlier set limit configs for group",
zap.String("group", g),
zap.String("queue path", queuePath))
}
// Does "root" queue has any child queue trackers? At some point during this whole traversal, root might
// not have any child queue trackers. When the situation comes, remove the linkage between the group and
// its root queue tracker
if gt.canBeRemoved() {
delete(m.groupTrackers, gt.groupName)
}
removedApp := make(map[string]bool)
gt.ClearEarlierSetLimits("", m.groupWildCardLimitsConfig, pathLimits, removedApp)
for app := range removedApp {
if u, ok := gt.applications[app]; ok {
appUsersMap[app] = u
}
}
if gt.canBeRemoved() {
log.Log(log.SchedUGM).Debug("Removing tracker from manager", zap.String("group", gt.groupName))
delete(m.groupTrackers, gt.groupName)
}
return appUsersMap
}

func (m *Manager) setWildcardGroupLimitsToConfiguredGroups() {
for queuePath := range m.groupWildCardLimitsConfig {
for _, gt := range m.groupTrackers {
childQueueTracker := gt.queueTracker.getChildQueueTracker(queuePath)
if resources.Equals(resources.NewResource(), childQueueTracker.maxResources) && childQueueTracker.maxRunningApps == 0 {
m.configuredGroups[queuePath] = append(m.configuredGroups[queuePath], gt.groupName)
}
}
}
}

func (m *Manager) setUserLimits(user string, limitConfig *LimitConfig, queuePath string) error {
log.Log(log.SchedUGM).Debug("Setting user limits",
zap.String("user", user),
Expand Down
Loading

0 comments on commit 0e20e7a

Please sign in to comment.