Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-1929] fix update group configs #625

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions pkg/scheduler/ugm/group_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"

"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
)
Expand Down Expand Up @@ -98,22 +99,48 @@ func (gt *GroupTracker) GetGroupResourceUsageDAOInfo() *dao.GroupResourceUsageDA
return groupResourceUsage
}

func (gt *GroupTracker) IsQueuePathTrackedCompletely(queuePath string) bool {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.IsQueuePathTrackedCompletely(queuePath)
func (gt *GroupTracker) ClearEarlierSetLimits(prefixPath string, wildcardLimits map[string]*LimitConfig, pathLimits map[string]bool, removedApp map[string]bool) {
gt.Lock()
defer gt.Unlock()
gt.internalClearEarlierSetLimits(prefixPath, gt.queueTracker, wildcardLimits, pathLimits, removedApp)
}

func (gt *GroupTracker) IsUnlinkRequired(queuePath string) bool {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.IsUnlinkRequired(queuePath)
}
func (gt *GroupTracker) internalClearEarlierSetLimits(prefixPath string, queueTracker *QueueTracker, wildcardLimits map[string]*LimitConfig, pathLimits map[string]bool, removedApp map[string]bool) bool {
queuePath := prefixPath + configs.DOT + queueTracker.queueName
if prefixPath == common.Empty {
queuePath = queueTracker.queueName
}

func (gt *GroupTracker) UnlinkQT(queuePath string) bool {
gt.RLock()
defer gt.RUnlock()
return gt.queueTracker.UnlinkQT(queuePath)
allLeafQueueRemoved := true
for _, childQt := range queueTracker.childQueueTrackers {
if gt.internalClearEarlierSetLimits(queuePath, childQt, wildcardLimits, pathLimits, removedApp) {
delete(queueTracker.childQueueTrackers, childQt.queueName)
} else {
allLeafQueueRemoved = false
}
}

currentPathHasLimits := wildcardLimits[queuePath] != nil || pathLimits[queuePath]
if allLeafQueueRemoved && !currentPathHasLimits {
for app := range queueTracker.runningApplications {
removedApp[app] = true
}
queueTracker.runningApplications = make(map[string]bool)
queueTracker.resourceUsage = resources.NewResource()
return true
} else {
// if we don't remove current tracker, we have to keep applications in current tracker
for app := range queueTracker.runningApplications {
delete(removedApp, app)
}
}

// if only have wildcard limits, then clear the limits
if wildcardLimits[queuePath] != nil && !pathLimits[queuePath] {
queueTracker.maxRunningApps = 0
queueTracker.maxResources = resources.NewResource()
}
return false
}

// canBeRemoved Does "root" queue has any child queue trackers? Is there any running applications in "root" qt?
Expand All @@ -130,22 +157,6 @@ func (gt *GroupTracker) getName() string {
return gt.groupName
}

func (gt *GroupTracker) decreaseAllTrackedResourceUsage(queuePath string) map[string]string {
if gt == nil {
return nil
}
gt.Lock()
defer gt.Unlock()
applications := gt.queueTracker.decreaseTrackedResourceUsageDownwards(queuePath)
removedApplications := make(map[string]string)
for app := range applications {
if u, ok := gt.applications[app]; ok {
removedApplications[app] = u
}
}
return removedApplications
}

func (gt *GroupTracker) canRunApp(queuePath, applicationID string) bool {
gt.Lock()
defer gt.Unlock()
Expand Down
146 changes: 62 additions & 84 deletions pkg/scheduler/ugm/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,18 @@
m.userWildCardLimitsConfig = make(map[string]*LimitConfig)
m.groupWildCardLimitsConfig = make(map[string]*LimitConfig)
m.configuredGroups = make(map[string][]string)
return m.internalProcessConfig(config, queuePath)

userNamePathLimits := make(map[string]map[string]bool)
groupNamePathLimits := make(map[string]map[string]bool)
if err := m.internalProcessConfig(config, queuePath, userNamePathLimits, groupNamePathLimits); err != nil {
return err
}

Check warning on line 326 in pkg/scheduler/ugm/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/ugm/manager.go#L325-L326

Added lines #L325 - L326 were not covered by tests
m.clearEarlierSetLimits(userNamePathLimits, groupNamePathLimits)
m.setWildcardGroupLimitsToConfiguredGroups()
return nil
}

func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath string) error {
// Holds user and group for which limits have been configured with specific queue path
userLimits := make(map[string]bool)
groupLimits := make(map[string]bool)
func (m *Manager) internalProcessConfig(cur configs.QueueConfig, queuePath string, userNamePathLimits, groupNamePathLimits map[string]map[string]bool) error {
// Traverse limits of specific queue path
for _, limit := range cur.Limits {
var maxResource *resources.Resource
Expand All @@ -351,7 +356,7 @@
m.userWildCardLimitsConfig[queuePath] = limitConfig
continue
}
if err := m.processUserConfig(user, limitConfig, queuePath, userLimits); err != nil {
if err := m.processUserConfig(user, limitConfig, queuePath, userNamePathLimits); err != nil {
return err
}
}
Expand All @@ -365,126 +370,99 @@
zap.String("queue path", queuePath),
zap.Uint64("max application", limit.MaxApplications),
zap.Any("max resources", limit.MaxResources))
if err := m.processGroupConfig(group, limitConfig, queuePath, groupLimits); err != nil {
return err
}
if group == common.Wildcard {
m.groupWildCardLimitsConfig[queuePath] = limitConfig
} else {
m.configuredGroups[queuePath] = append(m.configuredGroups[queuePath], group)
continue
}
}
}
if err := m.clearEarlierSetLimits(userLimits, groupLimits, queuePath); err != nil {
return err
}

if len(cur.Queues) > 0 {
for _, child := range cur.Queues {
childQueuePath := queuePath + configs.DOT + child.Name
if err := m.internalProcessConfig(child, childQueuePath); err != nil {
if err := m.processGroupConfig(group, limitConfig, queuePath, groupNamePathLimits); err != nil {
return err
}
m.configuredGroups[queuePath] = append(m.configuredGroups[queuePath], group)
}
}
for _, child := range cur.Queues {
childQueuePath := queuePath + configs.DOT + child.Name
if err := m.internalProcessConfig(child, childQueuePath, userNamePathLimits, groupNamePathLimits); err != nil {
return err

Check warning on line 386 in pkg/scheduler/ugm/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/ugm/manager.go#L386

Added line #L386 was not covered by tests
}
}
return nil
}

func (m *Manager) processUserConfig(user string, limitConfig *LimitConfig, queuePath string, userLimits map[string]bool) error {
func (m *Manager) processUserConfig(user string, limitConfig *LimitConfig, queuePath string, userNamePathLimits map[string]map[string]bool) error {
if err := m.setUserLimits(user, limitConfig, queuePath); err != nil {
return err
}
userLimits[user] = true
if len(userNamePathLimits[user]) == 0 {
userNamePathLimits[user] = make(map[string]bool)
}
userNamePathLimits[user][queuePath] = true
return nil
}

func (m *Manager) processGroupConfig(group string, limitConfig *LimitConfig, queuePath string, groupLimits map[string]bool) error {
func (m *Manager) processGroupConfig(group string, limitConfig *LimitConfig, queuePath string, groupNamePathLimits map[string]map[string]bool) error {
if err := m.setGroupLimits(group, limitConfig, queuePath); err != nil {
return err
}
groupLimits[group] = true
if len(groupNamePathLimits[group]) == 0 {
groupNamePathLimits[group] = make(map[string]bool)
}
groupNamePathLimits[group][queuePath] = true
return nil
}

// clearEarlierSetLimits Clear already configured limits of users and groups for which limits have been configured before but not now
func (m *Manager) clearEarlierSetLimits(userLimits map[string]bool, groupLimits map[string]bool, queuePath string) error {
func (m *Manager) clearEarlierSetLimits(userNamePathLimits, groupNamePathLimits map[string]map[string]bool) {
// Clear already configured limits of group for which limits have been configured before but not now
for _, gt := range m.groupTrackers {
appUsersMap := m.clearEarlierSetGroupLimits(gt, queuePath, groupLimits)
if len(appUsersMap) > 0 {
for app, user := range appUsersMap {
ut := m.userTrackers[user]
ut.setGroupForApp(app, nil)
}
appUsersMap := m.clearEarlierSetGroupLimits(gt, groupNamePathLimits[gt.groupName])
for app, user := range appUsersMap {
ut := m.userTrackers[user]
ut.setGroupForApp(app, nil)
}
}

// Clear already configured limits of user for which limits have been configured before but not now
for _, ut := range m.userTrackers {
m.clearEarlierSetUserLimits(ut, queuePath, userLimits)
m.clearEarlierSetUserLimits(ut, userNamePathLimits[ut.userName])
}
return nil
}

func (m *Manager) clearEarlierSetUserLimits(ut *UserTracker, queuePath string, userLimits map[string]bool) {
// Is this user already tracked for the queue path?
if ut.IsQueuePathTrackedCompletely(queuePath) {
u := ut.userName
// Is there any limit config set for user in the current configuration? If not, then clear those old limit settings
if _, ok := userLimits[u]; !ok {
log.Log(log.SchedUGM).Debug("Need to clear earlier set configs for user",
zap.String("user", u),
zap.String("queue path", queuePath))
// Is there any running applications in end queue of this queue path? If not, then remove the linkage between end queue and its immediate parent
if ut.IsUnlinkRequired(queuePath) {
ut.UnlinkQT(queuePath)
} else {
ut.setLimits(queuePath, resources.NewResource(), 0)
log.Log(log.SchedUGM).Debug("Cleared earlier set limit configs for user",
zap.String("user", u),
zap.String("queue path", queuePath))
}
// Does "root" queue has any child queue trackers? At some point during this whole traversal, root might
// not have any child queue trackers. When the situation comes, remove the linkage between the user and
// its root queue tracker
if ut.canBeRemoved() {
delete(m.userTrackers, ut.userName)
}
}
func (m *Manager) clearEarlierSetUserLimits(ut *UserTracker, pathLimits map[string]bool) {
ut.ClearEarlierSetLimits("", m.userWildCardLimitsConfig, pathLimits)
if ut.canBeRemoved() {
log.Log(log.SchedUGM).Debug("Removing tracker from manager", zap.String("user", ut.userName))
delete(m.userTrackers, ut.userName)
}
}

func (m *Manager) clearEarlierSetGroupLimits(gt *GroupTracker, queuePath string, groupLimits map[string]bool) map[string]string {
func (m *Manager) clearEarlierSetGroupLimits(gt *GroupTracker, pathLimits map[string]bool) map[string]string {
appUsersMap := make(map[string]string)
// Is this group already tracked for the queue path?
if gt.IsQueuePathTrackedCompletely(queuePath) {
g := gt.groupName
// Is there any limit config set for group in the current configuration? If not, then clear those old limit settings
if ok := groupLimits[g]; !ok {
log.Log(log.SchedUGM).Debug("Need to clear earlier set configs for group",
zap.String("group", g),
zap.String("queue path", queuePath))
appUsersMap = gt.decreaseAllTrackedResourceUsage(queuePath)
// Is there any running applications in end queue of this queue path? If not, then remove the linkage between end queue and its immediate parent
if gt.IsUnlinkRequired(queuePath) {
gt.UnlinkQT(queuePath)
} else {
gt.setLimits(queuePath, resources.NewResource(), 0)
log.Log(log.SchedUGM).Debug("Cleared earlier set limit configs for group",
zap.String("group", g),
zap.String("queue path", queuePath))
}
// Does "root" queue has any child queue trackers? At some point during this whole traversal, root might
// not have any child queue trackers. When the situation comes, remove the linkage between the group and
// its root queue tracker
if gt.canBeRemoved() {
delete(m.groupTrackers, gt.groupName)
}
removedApp := make(map[string]bool)
gt.ClearEarlierSetLimits("", m.groupWildCardLimitsConfig, pathLimits, removedApp)
for app := range removedApp {
if u, ok := gt.applications[app]; ok {
appUsersMap[app] = u
}
}
if gt.canBeRemoved() {
log.Log(log.SchedUGM).Debug("Removing tracker from manager", zap.String("group", gt.groupName))
delete(m.groupTrackers, gt.groupName)
}
return appUsersMap
}

func (m *Manager) setWildcardGroupLimitsToConfiguredGroups() {
for queuePath := range m.groupWildCardLimitsConfig {
for _, gt := range m.groupTrackers {
childQueueTracker := gt.queueTracker.getChildQueueTracker(queuePath)
if resources.Equals(resources.NewResource(), childQueueTracker.maxResources) && childQueueTracker.maxRunningApps == 0 {
m.configuredGroups[queuePath] = append(m.configuredGroups[queuePath], gt.groupName)
}
}
}
}

func (m *Manager) setUserLimits(user string, limitConfig *LimitConfig, queuePath string) error {
log.Log(log.SchedUGM).Debug("Setting user limits",
zap.String("user", user),
Expand Down
Loading