Skip to content

Commit

Permalink
[YUNIKORN-1793] Handle placement rule and queue changes during initia…
Browse files Browse the repository at this point in the history
…lisation

Adds a tag to mark applications as forced create. This allows most validation to be
suppressed and if necessary, will redirect the app to a recovery queue.
  • Loading branch information
craigcondit committed Jul 31, 2023
1 parent 1664324 commit a2379d8
Show file tree
Hide file tree
Showing 14 changed files with 294 additions and 38 deletions.
10 changes: 6 additions & 4 deletions pkg/common/configs/configvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import (
)

const (
RootQueue = "root"
DOT = "."
DotReplace = "_dot_"
DefaultPartition = "default"
RootQueue = "root"
DOT = "."
DotReplace = "_dot_"
DefaultPartition = "default"
RecoveryQueue = "@recovery@"
RecoveryQueueFull = RootQueue + DOT + RecoveryQueue

ApplicationSortPolicy = "application.sort.policy"
ApplicationSortPriority = "application.sort.priority"
Expand Down
8 changes: 5 additions & 3 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ package common
const (
Empty = ""

Wildcard = "*"
Separator = ","
Space = " "
Wildcard = "*"
Separator = ","
Space = " "
AnonymousUser = "nobody"
AnonymousGroup = "nogroup"
)
11 changes: 9 additions & 2 deletions pkg/common/security/usergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"go.uber.org/zap"

"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
Expand Down Expand Up @@ -122,10 +123,16 @@ func (c *UserGroupCache) resetCache() {
c.ugs = make(map[string]*UserGroup)
}

func (c *UserGroupCache) ConvertUGI(ugi *si.UserGroupInformation) (UserGroup, error) {
func (c *UserGroupCache) ConvertUGI(ugi *si.UserGroupInformation, force bool) (UserGroup, error) {
// check if we have a user to convert
if ugi == nil || ugi.User == "" {
return UserGroup{}, fmt.Errorf("empty user cannot resolve")
if force {
// app creation is forced, so we need to synthesize a user / group
ugi.User = common.AnonymousUser
ugi.Groups = []string{common.AnonymousGroup}
} else {
return UserGroup{}, fmt.Errorf("empty user cannot resolve")
}
}
// try to resolve the user if group info is empty otherwise we just convert
if len(ugi.Groups) == 0 {
Expand Down
14 changes: 10 additions & 4 deletions pkg/common/security/usergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ func TestConvertUGI(t *testing.T) {
User: "",
Groups: nil,
}
ug, err := testCache.ConvertUGI(ugi)
ug, err := testCache.ConvertUGI(ugi, false)
if err == nil {
t.Errorf("empty user convert should have failed and did not: %v", ug)
}
// try known user without groups
ugi.User = "testuser1"
ug, err = testCache.ConvertUGI(ugi)
ug, err = testCache.ConvertUGI(ugi, false)
if err != nil {
t.Errorf("known user, no groups, convert should not have failed: %v", err)
}
Expand All @@ -213,15 +213,21 @@ func TestConvertUGI(t *testing.T) {
}
// try unknown user without groups
ugi.User = "unknown"
ug, err = testCache.ConvertUGI(ugi)
ug, err = testCache.ConvertUGI(ugi, false)
if err == nil {
t.Errorf("unknown user, no groups, convert should have failed: %v", ug)
}
// try empty user when forced
ugi.User = ""
ug, err = testCache.ConvertUGI(ugi, true)
if err != nil {
t.Errorf("empty user but forced, convert should not have failed: %v", err)
}
// try unknown user with groups
ugi.User = "unknown2"
group := "passedin"
ugi.Groups = []string{group}
ug, err = testCache.ConvertUGI(ugi)
ug, err = testCache.ConvertUGI(ugi, false)
if err != nil {
t.Errorf("unknown user with groups, convert should not have failed: %v", err)
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,24 @@ func IsAllowPreemptOther(policy *si.PreemptionPolicy) bool {
return policy != nil && policy.AllowPreemptOther
}

// IsAppCreationForced returns true if the application creation is triggered by the shim
// reporting an existing allocation. In this case, it needs to be accepted regardless
// of whether it maps to a valid queue.
func IsAppCreationForced(tags map[string]string) bool {
tagVal := ""
for key, val := range tags {
if strings.EqualFold(key, interfaceCommon.AppTagCreateForce) {
tagVal = val
break
}
}
result, err := strconv.ParseBool(tagVal)
if err != nil {
return false
}
return result
}

// ZeroTimeInUnixNano return the unix nano or nil if the time is zero.
func ZeroTimeInUnixNano(t time.Time) *int64 {
if t.IsZero() {
Expand Down
12 changes: 12 additions & 0 deletions pkg/common/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,18 @@ func TestAllowPreemptOther(t *testing.T) {
assert.Check(t, !IsAllowPreemptOther(&si.PreemptionPolicy{AllowPreemptOther: false}), "Preempt other should not be allowed if policy does not allow")
}

func TestIsAppCreationForced(t *testing.T) {
assert.Check(t, !IsAppCreationForced(nil), "nil tags should not result in forced app creation")
tags := make(map[string]string)
assert.Check(t, !IsAppCreationForced(tags), "empty tags should not result in forced app creation")
tags[common.AppTagCreateForce] = "false"
assert.Check(t, !IsAppCreationForced(tags), "false creation tag should not result in forced app creation")
tags[common.AppTagCreateForce] = "invalid"
assert.Check(t, !IsAppCreationForced(tags), "invalid creation tag should not result in forced app creation")
tags[common.AppTagCreateForce] = "true"
assert.Check(t, IsAppCreationForced(tags), "creation tag should result in forced app creation")
}

func TestConvertSITimeoutWithAdjustment(t *testing.T) {
created := time.Now().Unix() - 600
defaultTimeout := 15 * time.Minute
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (cc *ClusterContext) handleRMUpdateApplicationEvent(event *rmevent.RMUpdate
}
// convert and resolve the user: cache can be set per partition
// need to do this before we create the application
ugi, err := partition.convertUGI(app.Ugi)
ugi, err := partition.convertUGI(app.Ugi, common.IsAppCreationForced(app.Tags))
if err != nil {
rejectedApps = append(rejectedApps, &si.RejectedApplication{
ApplicationID: app.ApplicationID,
Expand Down
23 changes: 12 additions & 11 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ type StateLogEntry struct {
}

type Application struct {
ApplicationID string
Partition string
SubmissionTime time.Time
ApplicationID string // application ID
Partition string // partition Name
SubmissionTime time.Time // time application was submitted
tags map[string]string // application tags used in scheduling

// Private fields need protection
// Private mutable fields need protection
queuePath string
queue *Queue // queue the application is running in
pending *resources.Resource // pending resources from asks for the app
reservations map[string]*reservation // a map of reservations
requests map[string]*AllocationAsk // a map of asks
sortedRequests sortedRequests
user security.UserGroup // owner of the application
tags map[string]string // application tags used in scheduling
allocatedResource *resources.Resource // total allocated resources
sortedRequests sortedRequests // list of requests pre-sorted
user security.UserGroup // owner of the application
allocatedResource *resources.Resource // total allocated resources

usedResource *resources.UsedResource // keep track of resource usage of the application

Expand Down Expand Up @@ -1875,9 +1875,6 @@ func (sa *Application) GetUser() security.UserGroup {
// Get a tag from the application
// Note: tags are not case sensitive
func (sa *Application) GetTag(tag string) string {
sa.RLock()
defer sa.RUnlock()

tagVal := ""
for key, val := range sa.tags {
if strings.EqualFold(key, tag) {
Expand All @@ -1888,6 +1885,10 @@ func (sa *Application) GetTag(tag string) string {
return tagVal
}

func (sa *Application) IsCreateForced() bool {
return common.IsAppCreationForced(sa.tags)
}

func (sa *Application) SetTerminatedCallback(callback func(appID string)) {
sa.Lock()
defer sa.Unlock()
Expand Down
17 changes: 17 additions & 0 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,23 @@ func TestGetTag(t *testing.T) {
assert.Equal(t, tag, "test value", "expected tag value")
}

func TestIsCreateForced(t *testing.T) {
app := newApplicationWithTags(appID1, "default", "root.a", nil)
assert.Check(t, !app.IsCreateForced(), "found forced app but tags nil")
tags := make(map[string]string)
app = newApplicationWithTags(appID1, "default", "root.a", tags)
assert.Check(t, !app.IsCreateForced(), "found forced app but tags empty")
tags[siCommon.AppTagCreateForce] = "false"
app = newApplicationWithTags(appID1, "default", "root.a", tags)
assert.Check(t, !app.IsCreateForced(), "found forced app but forced tag was false")
tags[siCommon.AppTagCreateForce] = "unknown"
app = newApplicationWithTags(appID1, "default", "root.a", tags)
assert.Check(t, !app.IsCreateForced(), "found forced app but forced tag was invalid")
tags[siCommon.AppTagCreateForce] = "true"
app = newApplicationWithTags(appID1, "default", "root.a", tags)
assert.Check(t, app.IsCreateForced(), "found unforced app but forced tag was set")
}

func TestOnStatusChangeCalled(t *testing.T) {
app, testHandler := newApplicationWithHandler(appID1, "default", "root.a")
assert.Equal(t, New.String(), app.CurrentState(), "new app not in New state")
Expand Down
12 changes: 11 additions & 1 deletion pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,26 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue) (*Queue, error)
return sq, nil
}

// NewRecoveryQueue creates a recovery queue if it does not exist. The recovery queue
// is a dynamic queue, but has an invalid name so that it cannot be directly referenced.
func NewRecoveryQueue(parent *Queue) (*Queue, error) {
return newDynamicQueueInternal(configs.RecoveryQueue, true, parent, false)
}

// NewDynamicQueue creates a new queue to be added to the system based on the placement rules
// A dynamically added queue can never be the root queue so parent must be set
// lock free as it cannot be referenced yet
func NewDynamicQueue(name string, leaf bool, parent *Queue) (*Queue, error) {
return newDynamicQueueInternal(name, leaf, parent, true)
}

func newDynamicQueueInternal(name string, leaf bool, parent *Queue, validateName bool) (*Queue, error) {
// fail without a parent
if parent == nil {
return nil, fmt.Errorf("dynamic queue can not be added without parent: %s", name)
}
// name might not be checked do it here
if !configs.QueueNameRegExp.MatchString(name) {
if validateName && !configs.QueueNameRegExp.MatchString(name) {
return nil, fmt.Errorf("invalid queue name '%s', a name must only have alphanumeric characters, - or _, and be no longer than 64 characters", name)
}
sq := newBlankQueue()
Expand Down
16 changes: 16 additions & 0 deletions pkg/scheduler/objects/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2302,6 +2302,22 @@ func TestNewConfiguredQueue(t *testing.T) {
assert.Assert(t, childNonLeaf.maxResource == nil)
}

func TestNewRecoveryQueue(t *testing.T) {
parent, err := createRootQueue(nil)
assert.NilError(t, err, "failed to create queue: %v", err)
recoveryQueue, err := NewRecoveryQueue(parent)
assert.NilError(t, err, "failed to create recovery queue: %v", err)
assert.Equal(t, configs.RecoveryQueueFull, recoveryQueue.GetQueuePath(), "wrong queue name")
}

func TestNewDynamicQueueDoesNotCreateRecovery(t *testing.T) {
parent, err := createRootQueue(nil)
assert.NilError(t, err, "failed to create queue: %v", err)
if _, err := NewDynamicQueue(configs.RecoveryQueue, true, parent); err == nil {
t.Fatalf("invalid recovery queue %s was created", configs.RecoveryQueueFull)
}
}

func TestNewDynamicQueue(t *testing.T) {
parent, err := createManagedQueueWithProps(nil, "parent", true, nil, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
Expand Down
75 changes: 63 additions & 12 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,39 +305,85 @@ func (pc *PartitionContext) AddApplication(app *objects.Application) error {
return fmt.Errorf("adding application %s to partition %s, but application already existed", appID, pc.Name)
}

// Check if app creation is forced
forceCreate := app.IsCreateForced()

// Put app under the queue
queueName := app.GetQueuePath()

// applications are not allowed to reference the recovery queue directly
if strings.EqualFold(queueName, configs.RecoveryQueueFull) {
if forceCreate {
// empty out queue name for now in case a matching rule can work
app.SetQueuePath("")
} else {
return fmt.Errorf("invalid queue %s for application %s", queueName, appID)
}
}

pm := pc.getPlacementManager()
if pm.IsInitialised() {
err := pm.PlaceApplication(app)
if err != nil {
return fmt.Errorf("failed to place application %s: %v", appID, err)
if forceCreate {
queueName = configs.RecoveryQueueFull
app.SetQueuePath(queueName)
} else {
return fmt.Errorf("failed to place application %s: %v", appID, err)
}
}
queueName = app.GetQueuePath()
if queueName == "" {
return fmt.Errorf("application rejected by placement rules: %s", appID)
if forceCreate {
queueName = configs.RecoveryQueueFull
app.SetQueuePath(configs.RecoveryQueueFull)
} else {
return fmt.Errorf("application rejected by placement rules: %s", appID)
}
}
}

// lock the partition and make the last change: we need to do this before creating the queues.
// queue cleanup might otherwise remove the queue again before we can add the application
pc.Lock()
defer pc.Unlock()
// we have a queue name either from placement or direct, get the queue
queue := pc.getQueueInternal(queueName)
if queue == nil {
// queue must exist if not using placement rules
if !pm.IsInitialised() {
return fmt.Errorf("application '%s' rejected, cannot create queue '%s' without placement rules", appID, queueName)
}
// with placement rules the hierarchy might not exist so try and create it

// attempt to create the queue if we are using the placement manager and not recovery queue
if queue == nil && pm.IsInitialised() && !strings.EqualFold(queueName, configs.RecoveryQueueFull) {
var err error
queue, err = pc.createQueue(queueName, app.GetUser())
if err != nil {
if err != nil && !forceCreate {
return fmt.Errorf("failed to create rule based queue %s for application %s", queueName, appID)
}
}

// if queue is still not found, use the recovery queue if app creation is forced
if queue == nil && forceCreate {
var err error
queueName = configs.RecoveryQueueFull
app.SetQueuePath(queueName)

// attempt to use the already-created recovery queue
queue = pc.getQueueInternal(queueName)
if queue == nil {
// create the queue since it doesn't yet exist
queue, err = pc.createRecoveryQueue()
if err != nil {
return fmt.Errorf("failed to create recovery queue %s for application %s", configs.RecoveryQueueFull, appID)
}
}
}

// all attempts to resolve queue have failed, reject application
if queue == nil {
// queue must exist if not using placement rules and not a forced app
return fmt.Errorf("application '%s' rejected, cannot create queue '%s' without placement rules", appID, queueName)
}

// check the queue: is a leaf queue with submit access
if !queue.IsLeafQueue() || !queue.CheckSubmitAccess(app.GetUser()) {
if !queue.IsLeafQueue() || (!forceCreate && !queue.CheckSubmitAccess(app.GetUser())) {
return fmt.Errorf("failed to find queue %s for application %s", queueName, appID)
}

Expand Down Expand Up @@ -484,6 +530,11 @@ func (pc *PartitionContext) GetPartitionQueues() dao.PartitionQueueDAOInfo {
return PartitionQueueDAOInfo
}

// Create the recovery queue.
func (pc *PartitionContext) createRecoveryQueue() (*objects.Queue, error) {
return objects.NewRecoveryQueue(pc.root)
}

// Create a queue with full hierarchy. This is called when a new queue is created from a placement rule.
// The final leaf queue does not exist otherwise we would not get here.
// This means that at least 1 queue (a leaf queue) will be created
Expand Down Expand Up @@ -1165,10 +1216,10 @@ func (pc *PartitionContext) addAllocation(alloc *objects.Allocation) error {
return nil
}

func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation) (security.UserGroup, error) {
func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced bool) (security.UserGroup, error) {
pc.RLock()
defer pc.RUnlock()
return pc.userGroupCache.ConvertUGI(ugi)
return pc.userGroupCache.ConvertUGI(ugi, forced)
}

// calculate overall nodes resource usage and returns a map as the result,
Expand Down
Loading

0 comments on commit a2379d8

Please sign in to comment.