diff --git a/pkg/common/constants.go b/pkg/common/constants.go index b0b3a5840..2b882ff7a 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -21,7 +21,11 @@ package common const ( Empty = "" - Wildcard = "*" - Separator = "," - Space = " " + Wildcard = "*" + Separator = "," + Space = " " + AnonymousUser = "nobody" + AnonymousGroup = "nogroup" + RecoveryQueue = "@recovery@" + RecoveryQueueFull = "root." + RecoveryQueue ) diff --git a/pkg/common/security/usergroup.go b/pkg/common/security/usergroup.go index e381204db..124521bd5 100644 --- a/pkg/common/security/usergroup.go +++ b/pkg/common/security/usergroup.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" + "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/log" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) @@ -122,14 +123,25 @@ func (c *UserGroupCache) resetCache() { c.ugs = make(map[string]*UserGroup) } -func (c *UserGroupCache) ConvertUGI(ugi *si.UserGroupInformation) (UserGroup, error) { +func (c *UserGroupCache) ConvertUGI(ugi *si.UserGroupInformation, force bool) (UserGroup, error) { // check if we have a user to convert if ugi == nil || ugi.User == "" { - return UserGroup{}, fmt.Errorf("empty user cannot resolve") + if force { + // app creation is forced, so we need to synthesize a user / group + ugi.User = common.AnonymousUser + ugi.Groups = []string{common.AnonymousGroup} + } else { + return UserGroup{}, fmt.Errorf("empty user cannot resolve") + } } // try to resolve the user if group info is empty otherwise we just convert if len(ugi.Groups) == 0 { - return c.GetUserGroup(ugi.User) + ug, err := c.GetUserGroup(ugi.User) + if force && (err != nil || ug.failed) { + ugi.Groups = []string{common.AnonymousGroup} + } else { + return ug, err + } } // If groups are already present we should just convert newUG := UserGroup{User: ugi.User} diff --git a/pkg/common/security/usergroup_test.go b/pkg/common/security/usergroup_test.go index 279dadf2c..9eef2c6e2 100644 --- a/pkg/common/security/usergroup_test.go +++ b/pkg/common/security/usergroup_test.go @@ -198,13 +198,13 @@ func TestConvertUGI(t *testing.T) { User: "", Groups: nil, } - ug, err := testCache.ConvertUGI(ugi) + ug, err := testCache.ConvertUGI(ugi, false) if err == nil { t.Errorf("empty user convert should have failed and did not: %v", ug) } // try known user without groups ugi.User = "testuser1" - ug, err = testCache.ConvertUGI(ugi) + ug, err = testCache.ConvertUGI(ugi, false) if err != nil { t.Errorf("known user, no groups, convert should not have failed: %v", err) } @@ -213,15 +213,21 @@ func TestConvertUGI(t *testing.T) { } // try unknown user without groups ugi.User = "unknown" - ug, err = testCache.ConvertUGI(ugi) + ug, err = testCache.ConvertUGI(ugi, false) if err == nil { t.Errorf("unknown user, no groups, convert should have failed: %v", ug) } + // try empty user when forced + ugi.User = "" + ug, err = testCache.ConvertUGI(ugi, true) + if err != nil { + t.Errorf("empty user but forced, convert should not have failed: %v", err) + } // try unknown user with groups ugi.User = "unknown2" group := "passedin" ugi.Groups = []string{group} - ug, err = testCache.ConvertUGI(ugi) + ug, err = testCache.ConvertUGI(ugi, false) if err != nil { t.Errorf("unknown user with groups, convert should not have failed: %v", err) } diff --git a/pkg/common/utils.go b/pkg/common/utils.go index c8437a8d4..bb4bd3307 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -191,6 +191,29 @@ func IsAllowPreemptOther(policy *si.PreemptionPolicy) bool { return policy != nil && policy.AllowPreemptOther } +// IsAppCreationForced returns true if the application creation is triggered by the shim +// reporting an existing allocation. In this case, it needs to be accepted regardless +// of whether it maps to a valid queue. +func IsAppCreationForced(tags map[string]string) bool { + tagVal := "" + for key, val := range tags { + if strings.EqualFold(key, interfaceCommon.AppTagCreateForce) { + tagVal = val + break + } + } + result, err := strconv.ParseBool(tagVal) + if err != nil { + return false + } + return result +} + +// IsRecoveryQueue returns true if the given queue represents the recovery queue +func IsRecoveryQueue(queueName string) bool { + return strings.EqualFold(queueName, RecoveryQueueFull) +} + // ZeroTimeInUnixNano return the unix nano or nil if the time is zero. func ZeroTimeInUnixNano(t time.Time) *int64 { if t.IsZero() { diff --git a/pkg/common/utils_test.go b/pkg/common/utils_test.go index 1aa862064..d97fe637d 100644 --- a/pkg/common/utils_test.go +++ b/pkg/common/utils_test.go @@ -161,6 +161,18 @@ func TestAllowPreemptOther(t *testing.T) { assert.Check(t, !IsAllowPreemptOther(&si.PreemptionPolicy{AllowPreemptOther: false}), "Preempt other should not be allowed if policy does not allow") } +func TestIsAppCreationForced(t *testing.T) { + assert.Check(t, !IsAppCreationForced(nil), "nil tags should not result in forced app creation") + tags := make(map[string]string) + assert.Check(t, !IsAppCreationForced(tags), "empty tags should not result in forced app creation") + tags[common.AppTagCreateForce] = "false" + assert.Check(t, !IsAppCreationForced(tags), "false creation tag should not result in forced app creation") + tags[common.AppTagCreateForce] = "invalid" + assert.Check(t, !IsAppCreationForced(tags), "invalid creation tag should not result in forced app creation") + tags[common.AppTagCreateForce] = "true" + assert.Check(t, IsAppCreationForced(tags), "creation tag should result in forced app creation") +} + func TestConvertSITimeoutWithAdjustment(t *testing.T) { created := time.Now().Unix() - 600 defaultTimeout := 15 * time.Minute diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go index c20a3e8c3..6c783e9b4 100644 --- a/pkg/scheduler/context.go +++ b/pkg/scheduler/context.go @@ -524,7 +524,7 @@ func (cc *ClusterContext) handleRMUpdateApplicationEvent(event *rmevent.RMUpdate } // convert and resolve the user: cache can be set per partition // need to do this before we create the application - ugi, err := partition.convertUGI(app.Ugi) + ugi, err := partition.convertUGI(app.Ugi, common.IsAppCreationForced(app.Tags)) if err != nil { rejectedApps = append(rejectedApps, &si.RejectedApplication{ ApplicationID: app.ApplicationID, diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 00d3650ce..7b832b71b 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -70,20 +70,20 @@ type StateLogEntry struct { } type Application struct { - ApplicationID string - Partition string - SubmissionTime time.Time + ApplicationID string // application ID + Partition string // partition Name + SubmissionTime time.Time // time application was submitted + tags map[string]string // application tags used in scheduling - // Private fields need protection + // Private mutable fields need protection queuePath string queue *Queue // queue the application is running in pending *resources.Resource // pending resources from asks for the app reservations map[string]*reservation // a map of reservations requests map[string]*AllocationAsk // a map of asks - sortedRequests sortedRequests - user security.UserGroup // owner of the application - tags map[string]string // application tags used in scheduling - allocatedResource *resources.Resource // total allocated resources + sortedRequests sortedRequests // list of requests pre-sorted + user security.UserGroup // owner of the application + allocatedResource *resources.Resource // total allocated resources usedResource *resources.UsedResource // keep track of resource usage of the application @@ -1874,9 +1874,6 @@ func (sa *Application) GetUser() security.UserGroup { // Get a tag from the application // Note: tags are not case sensitive func (sa *Application) GetTag(tag string) string { - sa.RLock() - defer sa.RUnlock() - tagVal := "" for key, val := range sa.tags { if strings.EqualFold(key, tag) { @@ -1887,6 +1884,10 @@ func (sa *Application) GetTag(tag string) string { return tagVal } +func (sa *Application) IsCreateForced() bool { + return common.IsAppCreationForced(sa.tags) +} + func (sa *Application) SetTerminatedCallback(callback func(appID string)) { sa.Lock() defer sa.Unlock() diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index ef515ec75..cda3de426 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -1195,6 +1195,23 @@ func TestGetTag(t *testing.T) { assert.Equal(t, tag, "test value", "expected tag value") } +func TestIsCreateForced(t *testing.T) { + app := newApplicationWithTags(appID1, "default", "root.a", nil) + assert.Check(t, !app.IsCreateForced(), "found forced app but tags nil") + tags := make(map[string]string) + app = newApplicationWithTags(appID1, "default", "root.a", tags) + assert.Check(t, !app.IsCreateForced(), "found forced app but tags empty") + tags[siCommon.AppTagCreateForce] = "false" + app = newApplicationWithTags(appID1, "default", "root.a", tags) + assert.Check(t, !app.IsCreateForced(), "found forced app but forced tag was false") + tags[siCommon.AppTagCreateForce] = "unknown" + app = newApplicationWithTags(appID1, "default", "root.a", tags) + assert.Check(t, !app.IsCreateForced(), "found forced app but forced tag was invalid") + tags[siCommon.AppTagCreateForce] = "true" + app = newApplicationWithTags(appID1, "default", "root.a", tags) + assert.Check(t, app.IsCreateForced(), "found unforced app but forced tag was set") +} + func TestOnStatusChangeCalled(t *testing.T) { app, testHandler := newApplicationWithHandler(appID1, "default", "root.a") assert.Equal(t, New.String(), app.CurrentState(), "new app not in New state") diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index 32255d013..0a16aae0e 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -20,6 +20,7 @@ package objects import ( "context" + "errors" "fmt" "strconv" "strings" @@ -29,6 +30,7 @@ import ( "github.com/looplab/fsm" "go.uber.org/zap" + "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-core/pkg/common/resources" "github.com/apache/yunikorn-core/pkg/common/security" @@ -38,7 +40,7 @@ import ( "github.com/apache/yunikorn-core/pkg/scheduler/objects/template" "github.com/apache/yunikorn-core/pkg/scheduler/policies" "github.com/apache/yunikorn-core/pkg/webservice/dao" - "github.com/apache/yunikorn-scheduler-interface/lib/go/common" + siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" ) var ( @@ -145,6 +147,25 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue) (*Queue, error) return sq, nil } +// NewRecoveryQueue creates a recovery queue if it does not exist. The recovery queue +// is a dynamic queue, but has an invalid name so that it cannot be directly referenced. +func NewRecoveryQueue(parent *Queue) (*Queue, error) { + if parent == nil { + return nil, errors.New("recovery queue cannot be created with nil parent") + } + if parent.GetQueuePath() != configs.RootQueue { + return nil, fmt.Errorf("recovery queue cannot be created with non-root parent: %s", parent.GetQueuePath()) + } + queue, err := newDynamicQueueInternal(common.RecoveryQueue, true, parent) + if err == nil { + queue.Lock() + defer queue.Unlock() + queue.submitACL = security.ACL{} + queue.sortType = policies.FifoSortPolicy + } + return queue, err +} + // NewDynamicQueue creates a new queue to be added to the system based on the placement rules // A dynamically added queue can never be the root queue so parent must be set // lock free as it cannot be referenced yet @@ -157,6 +178,10 @@ func NewDynamicQueue(name string, leaf bool, parent *Queue) (*Queue, error) { if !configs.QueueNameRegExp.MatchString(name) { return nil, fmt.Errorf("invalid queue name '%s', a name must only have alphanumeric characters, - or _, and be no longer than 64 characters", name) } + return newDynamicQueueInternal(name, leaf, parent) +} + +func newDynamicQueueInternal(name string, leaf bool, parent *Queue) (*Queue, error) { sq := newBlankQueue() sq.Name = strings.ToLower(name) sq.QueuePath = parent.QueuePath + configs.DOT + sq.Name @@ -384,12 +409,15 @@ func (sq *Queue) setTemplate(conf configs.ChildTemplate) error { func (sq *Queue) UpdateQueueProperties() { sq.Lock() defer sq.Unlock() - + if common.IsRecoveryQueue(sq.QueuePath) { + // recovery queue properties should never be updated + sq.sortType = policies.FifoSortPolicy + return + } if !sq.isLeaf { // set the sorting type for parent queues sq.sortType = policies.FairSortPolicy } - // walk over all properties and process var err error for key, value := range sq.properties { @@ -535,6 +563,10 @@ func (sq *Queue) GetPreemptionDelay() time.Duration { // The check is performed recursively: i.e. access to the parent allows access to this queue. // This will check both submitACL and adminACL. func (sq *Queue) CheckSubmitAccess(user security.UserGroup) bool { + if common.IsRecoveryQueue(sq.QueuePath) { + // recovery queue can never pass ACL checks + return false + } sq.RLock() allow := sq.submitACL.CheckAccess(user) || sq.adminACL.CheckAccess(user) sq.RUnlock() @@ -680,9 +712,9 @@ func (sq *Queue) AddApplication(app *Application) { sq.queueEvents.sendNewApplicationEvent(appID) // YUNIKORN-199: update the quota from the namespace // get the tag with the quota - quota := app.GetTag(common.AppTagNamespaceResourceQuota) + quota := app.GetTag(siCommon.AppTagNamespaceResourceQuota) // get the tag with the guaranteed resource - guaranteed := app.GetTag(common.AppTagNamespaceResourceGuaranteed) + guaranteed := app.GetTag(siCommon.AppTagNamespaceResourceGuaranteed) if quota == "" && guaranteed == "" { return } diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index 62d08102a..1c9bbd490 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -2302,6 +2302,40 @@ func TestNewConfiguredQueue(t *testing.T) { assert.Assert(t, childNonLeaf.maxResource == nil) } +func TestNewRecoveryQueue(t *testing.T) { + var err error + if _, err = NewRecoveryQueue(nil); err == nil { + t.Fatalf("recovery queue creation should fail with nil parent") + } + + parent, err := createManagedQueueWithProps(nil, "parent", true, nil, nil) + assert.NilError(t, err, "failed to create queue: %v", err) + if _, err = NewRecoveryQueue(parent); err == nil { + t.Fatalf("recovery queue creation should fail with non-root parent") + } + + parentConfig := configs.QueueConfig{ + Name: "root", + Parent: true, + Properties: map[string]string{configs.ApplicationSortPolicy: "fair"}, + ChildTemplate: configs.ChildTemplate{Properties: map[string]string{configs.ApplicationSortPolicy: "fair"}}, + } + parent, err = NewConfiguredQueue(parentConfig, nil) + assert.NilError(t, err, "failed to create queue: %v", err) + recoveryQueue, err := NewRecoveryQueue(parent) + assert.NilError(t, err, "failed to create recovery queue: %v", err) + assert.Equal(t, common.RecoveryQueueFull, recoveryQueue.GetQueuePath(), "wrong queue name") + assert.Equal(t, policies.FifoSortPolicy, recoveryQueue.getSortType(), "wrong sort type") +} + +func TestNewDynamicQueueDoesNotCreateRecovery(t *testing.T) { + parent, err := createRootQueue(nil) + assert.NilError(t, err, "failed to create queue: %v", err) + if _, err := NewDynamicQueue(common.RecoveryQueue, true, parent); err == nil { + t.Fatalf("invalid recovery queue %s was created", common.RecoveryQueueFull) + } +} + func TestNewDynamicQueue(t *testing.T) { parent, err := createManagedQueueWithProps(nil, "parent", true, nil, nil) assert.NilError(t, err, "failed to create queue: %v", err) diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 101200ded..8bf2786e4 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -157,22 +157,13 @@ func (pc *PartitionContext) updatePartitionDetails(conf configs.PartitionConfig) if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue { return fmt.Errorf("partition cannot be created without root queue") } - - if pc.placementManager.IsInitialised() { - log.Log(log.SchedPartition).Info("Updating placement manager rules on config reload") - err := pc.placementManager.UpdateRules(conf.PlacementRules) - if err != nil { - log.Log(log.SchedPartition).Info("New placement rules not activated, config reload failed", zap.Error(err)) - return err - } - pc.rules = &conf.PlacementRules - } else { - log.Log(log.SchedPartition).Info("Creating new placement manager on config reload") - pc.rules = &conf.PlacementRules - // We need to pass in the locked version of the GetQueue function. - // Placing an application will not have a lock on the partition context. - pc.placementManager = placement.NewPlacementManager(*pc.rules, pc.GetQueue) + log.Log(log.SchedPartition).Info("Updating placement manager rules on config reload") + err := pc.placementManager.UpdateRules(conf.PlacementRules) + if err != nil { + log.Log(log.SchedPartition).Info("New placement rules not activated, config reload failed", zap.Error(err)) + return err } + pc.rules = &conf.PlacementRules pc.updateNodeSortingPolicy(conf) // start at the root: there is only one queue queueConf := conf.Queues[0] @@ -308,38 +299,41 @@ func (pc *PartitionContext) AddApplication(app *objects.Application) error { } // Put app under the queue - queueName := app.GetQueuePath() pm := pc.getPlacementManager() - if pm.IsInitialised() { - err := pm.PlaceApplication(app) - if err != nil { - return fmt.Errorf("failed to place application %s: %v", appID, err) - } - queueName = app.GetQueuePath() - if queueName == "" { - return fmt.Errorf("application rejected by placement rules: %s", appID) - } + err := pm.PlaceApplication(app) + if err != nil { + return fmt.Errorf("failed to place application %s: %v", appID, err) } + queueName := app.GetQueuePath() + if queueName == "" { + return fmt.Errorf("application rejected by placement rules: %s", appID) + } + // lock the partition and make the last change: we need to do this before creating the queues. // queue cleanup might otherwise remove the queue again before we can add the application pc.Lock() defer pc.Unlock() // we have a queue name either from placement or direct, get the queue queue := pc.getQueueInternal(queueName) + + // create the queue if necessary if queue == nil { - // queue must exist if not using placement rules - if !pm.IsInitialised() { - return fmt.Errorf("application '%s' rejected, cannot create queue '%s' without placement rules", appID, queueName) - } - // with placement rules the hierarchy might not exist so try and create it var err error - queue, err = pc.createQueue(queueName, app.GetUser()) - if err != nil { - return fmt.Errorf("failed to create rule based queue %s for application %s", queueName, appID) + if common.IsRecoveryQueue(queueName) { + queue, err = pc.createRecoveryQueue() + if err != nil { + return fmt.Errorf("failed to create recovery queue %s for application %s", common.RecoveryQueueFull, appID) + } + } else { + queue, err = pc.createQueue(queueName, app.GetUser()) + if err != nil { + return fmt.Errorf("failed to create rule based queue %s for application %s", queueName, appID) + } } } - // check the queue: is a leaf queue with submit access - if !queue.IsLeafQueue() || !queue.CheckSubmitAccess(app.GetUser()) { + + // check the queue: is a leaf queue + if !queue.IsLeafQueue() { return fmt.Errorf("failed to find queue %s for application %s", queueName, appID) } @@ -486,6 +480,11 @@ func (pc *PartitionContext) GetPartitionQueues() dao.PartitionQueueDAOInfo { return PartitionQueueDAOInfo } +// Create the recovery queue. +func (pc *PartitionContext) createRecoveryQueue() (*objects.Queue, error) { + return objects.NewRecoveryQueue(pc.root) +} + // Create a queue with full hierarchy. This is called when a new queue is created from a placement rule. // The final leaf queue does not exist otherwise we would not get here. // This means that at least 1 queue (a leaf queue) will be created @@ -1186,10 +1185,10 @@ func (pc *PartitionContext) addAllocation(alloc *objects.Allocation) error { return nil } -func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation) (security.UserGroup, error) { +func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced bool) (security.UserGroup, error) { pc.RLock() defer pc.RUnlock() - return pc.userGroupCache.ConvertUGI(ugi) + return pc.userGroupCache.ConvertUGI(ugi, forced) } // calculate overall nodes resource usage and returns a map as the result, diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index ed1e69e69..8817e5d33 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -118,7 +118,6 @@ func TestNewPartition(t *testing.T) { if partition.root.QueuePath != "root" { t.Fatal("partition root queue not set as expected") } - assert.Assert(t, !partition.placementManager.IsInitialised(), "partition should not have initialised placement manager") } func TestNewWithPlacement(t *testing.T) { @@ -143,8 +142,7 @@ func TestNewWithPlacement(t *testing.T) { } partition, err := newPartitionContext(confWith, rmID, nil) assert.NilError(t, err, "test partition create failed with error") - assert.Assert(t, partition.placementManager.IsInitialised(), "partition should have initialised placement manager") - assert.Equal(t, len(*partition.rules), 1, "Placement rules not set as expected ") + assert.Equal(t, len(*partition.rules), 1, "Placement rules not set as expected") // add a rule and check if it is updated confWith = configs.PartitionConfig{ @@ -170,8 +168,7 @@ func TestNewWithPlacement(t *testing.T) { } err = partition.updatePartitionDetails(confWith) assert.NilError(t, err, "update partition failed unexpected with error") - assert.Assert(t, partition.placementManager.IsInitialised(), "partition should have initialised placement manager") - assert.Equal(t, len(*partition.rules), 2, "Placement rules not updated as expected ") + assert.Equal(t, len(*partition.rules), 2, "Placement rules not updated as expected") // update to turn off placement manager conf := configs.PartitionConfig{ @@ -187,14 +184,12 @@ func TestNewWithPlacement(t *testing.T) { } err = partition.updatePartitionDetails(conf) assert.NilError(t, err, "update partition failed unexpected with error") - assert.Assert(t, !partition.placementManager.IsInitialised(), "partition should not have initialised placement manager") - assert.Equal(t, len(*partition.rules), 0, "Placement rules not updated as expected ") + assert.Equal(t, len(*partition.rules), 0, "Placement rules not updated as expected") // set the old config back this should turn on the placement again err = partition.updatePartitionDetails(confWith) assert.NilError(t, err, "update partition failed unexpected with error") - assert.Assert(t, partition.placementManager.IsInitialised(), "partition should have initialised placement manager") - assert.Equal(t, len(*partition.rules), 2, "Placement rules not updated as expected ") + assert.Equal(t, len(*partition.rules), 2, "Placement rules not updated as expected") } func TestAddNode(t *testing.T) { @@ -927,6 +922,115 @@ func TestAddApp(t *testing.T) { } } +func TestAddAppForced(t *testing.T) { + partition, err := newBasePartition() + assert.NilError(t, err, "partition create failed") + + // add a new app to an invalid queue + app := newApplication(appID1, "default", "root.invalid") + err = partition.AddApplication(app) + if err == nil || partition.getApplication(appID1) != nil { + t.Fatalf("add application to nonexistent queue should have failed but did not") + } + + // re-add the app, but mark it as forced. this should create the recovery queue and assign the app to it + app = newApplicationTags(appID1, "default", "root.invalid", map[string]string{siCommon.AppTagCreateForce: "true"}) + err = partition.AddApplication(app) + assert.NilError(t, err, "app create failed") + partApp := partition.getApplication(appID1) + if partApp == nil { + t.Fatalf("app not found after adding to partition") + } + recoveryQueue := partition.GetQueue(common.RecoveryQueueFull) + if recoveryQueue == nil { + t.Fatalf("recovery queue not found") + } + assert.Equal(t, common.RecoveryQueueFull, partApp.GetQueuePath(), "wrong queue path for app2") + assert.Check(t, recoveryQueue == partApp.GetQueue(), "wrong queue for app") + assert.Equal(t, 1, len(recoveryQueue.GetCopyOfApps()), "wrong queue length") + + // add second forced app. this should use the existing recovery queue rather than recreating it + app2 := newApplicationTags(appID2, "default", "root.invalid2", map[string]string{siCommon.AppTagCreateForce: "true"}) + err = partition.AddApplication(app2) + assert.NilError(t, err, "app2 create failed") + partApp2 := partition.getApplication(appID2) + if partApp2 == nil { + t.Fatalf("app2 not found after adding to partition") + } + assert.Equal(t, common.RecoveryQueueFull, partApp2.GetQueuePath(), "wrong queue path for app2") + assert.Check(t, recoveryQueue == partApp2.GetQueue(), "wrong queue for app2") + assert.Equal(t, 2, len(recoveryQueue.GetCopyOfApps()), "wrong queue length") + + // add third app (not forced), but referencing the recovery queue. this should fail. + app3 := newApplication(appID3, "default", common.RecoveryQueueFull) + err = partition.AddApplication(app3) + if err == nil || partition.getApplication(appID3) != nil { + t.Fatalf("add app3 to recovery queue should have failed but did not") + } + + // re-add third app, but forced. This should succeed. + app3 = newApplicationTags(appID3, "default", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}) + err = partition.AddApplication(app3) + assert.NilError(t, err, "app3 create failed") + partApp3 := partition.getApplication(appID3) + if partApp3 == nil { + t.Fatalf("app3 not found after adding to partition") + } + assert.Equal(t, common.RecoveryQueueFull, partApp3.GetQueuePath(), "wrong queue path for app3") + assert.Check(t, recoveryQueue == partApp3.GetQueue(), "wrong queue for app3") + assert.Equal(t, 3, len(recoveryQueue.GetCopyOfApps()), "wrong queue length") +} + +func TestAddAppForcedWithPlacement(t *testing.T) { + confWith := configs.PartitionConfig{ + Name: "test", + Queues: []configs.QueueConfig{ + { + Name: "root", + Parent: true, + SubmitACL: "*", + Queues: nil, + }, + }, + PlacementRules: []configs.PlacementRule{ + { + Name: "tag", + Value: "queue", + Create: true, + }, + }, + Limits: nil, + NodeSortPolicy: configs.NodeSortingPolicy{}, + } + partition, err := newPartitionContext(confWith, rmID, nil) + assert.NilError(t, err, "test partition create failed with error") + + // add a new app using tag rule + app := newApplicationTags(appID1, "default", "", map[string]string{"queue": "root.test"}) + err = partition.AddApplication(app) + assert.NilError(t, err, "failed to add app to tagged queue") + assert.Equal(t, "root.test", app.GetQueuePath(), "app assigned to wrong queue") + + // add a second app without a tag rule + app2 := newApplicationTags(appID2, "default", "root.untagged", map[string]string{}) + err = partition.AddApplication(app2) + if err == nil || partition.getApplication(appID2) != nil { + t.Fatalf("add app2 to fixed queue should have failed but did not") + } + + // attempt to add the app again, but with forced addition + app2 = newApplicationTags(appID2, "default", "root.untagged", map[string]string{siCommon.AppTagCreateForce: "true"}) + err = partition.AddApplication(app2) + assert.NilError(t, err, "failed to add app2 to tagged queue") + assert.Equal(t, common.RecoveryQueueFull, app2.GetQueuePath(), "app2 assigned to wrong queue") + + // add a third app, but with the recovery queue tagged + app3 := newApplicationTags(appID3, "default", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}) + err = partition.AddApplication(app3) + assert.NilError(t, err, "failed to add app3 to tagged queue") + assert.Equal(t, common.RecoveryQueueFull, app3.GetQueuePath(), "app2 assigned to wrong queue") +} + func TestAddAppTaskGroup(t *testing.T) { partition, err := newBasePartition() assert.NilError(t, err, "partition create failed") diff --git a/pkg/scheduler/placement/fixed_rule.go b/pkg/scheduler/placement/fixed_rule.go index 8dce0bbac..0dce38471 100644 --- a/pkg/scheduler/placement/fixed_rule.go +++ b/pkg/scheduler/placement/fixed_rule.go @@ -63,30 +63,31 @@ func (fr *fixedRule) initialise(conf configs.PlacementRule) error { return err } -func (fr *fixedRule) placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, error) { +func (fr *fixedRule) placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, bool, error) { // before anything run the filter if !fr.filter.allowUser(app.GetUser()) { log.Log(log.Config).Debug("Fixed rule filtered", zap.String("application", app.ApplicationID), zap.Any("user", app.GetUser()), zap.String("queueName", fr.queue)) - return "", nil + return "", true, nil } var parentName string + var aclCheck = true var err error queueName := fr.queue // if the fixed queue is already fully qualified skip the parent check if !fr.qualified { // run the parent rule if set if fr.parent != nil { - parentName, err = fr.parent.placeApplication(app, queueFn) + parentName, aclCheck, err = fr.parent.placeApplication(app, queueFn) // failed parent rule, fail this rule if err != nil { - return "", err + return "", aclCheck, err } // rule did not return a parent: this could be filter or create flag related if parentName == "" { - return "", nil + return "", aclCheck, nil } // check if this is a parent queue and qualify it if !strings.HasPrefix(parentName, configs.RootQueue+configs.DOT) { @@ -95,7 +96,7 @@ func (fr *fixedRule) placeApplication(app *objects.Application, queueFn func(str // if the parent queue exists it cannot be a leaf parentQueue := queueFn(parentName) if parentQueue != nil && parentQueue.IsLeafQueue() { - return "", fmt.Errorf("parent rule returned a leaf queue: %s", parentName) + return "", aclCheck, fmt.Errorf("parent rule returned a leaf queue: %s", parentName) } } // the parent is set from the rule otherwise set it to the root @@ -112,10 +113,10 @@ func (fr *fixedRule) placeApplication(app *objects.Application, queueFn func(str queue := queueFn(queueName) // if we cannot create the queue must exist if !fr.create && queue == nil { - return "", nil + return "", aclCheck, nil } log.Log(log.Config).Info("Fixed rule application placed", zap.String("application", app.ApplicationID), zap.String("queue", queueName)) - return queueName, nil + return queueName, aclCheck, nil } diff --git a/pkg/scheduler/placement/fixed_rule_test.go b/pkg/scheduler/placement/fixed_rule_test.go index 5067f93eb..c3e4e9513 100644 --- a/pkg/scheduler/placement/fixed_rule_test.go +++ b/pkg/scheduler/placement/fixed_rule_test.go @@ -90,10 +90,12 @@ partitions: t.Errorf("fixed rule create failed with queue name, err %v", err) } var queue string - queue, err = fr.placeApplication(app, queueFunc) + var aclCheck bool + queue, aclCheck, err = fr.placeApplication(app, queueFunc) if queue != "root.testqueue" || err != nil { t.Errorf("fixed rule failed to place queue in correct queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // fixed queue that exists directly in hierarchy conf = configs.PlacementRule{ @@ -104,10 +106,11 @@ partitions: if err != nil || fr == nil { t.Errorf("fixed rule create failed with queue name, err %v", err) } - queue, err = fr.placeApplication(app, queueFunc) + queue, aclCheck, err = fr.placeApplication(app, queueFunc) if queue != "root.testparent.testchild" || err != nil { t.Errorf("fixed rule failed to place queue in correct queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // fixed queue that does not exists conf = configs.PlacementRule{ @@ -119,10 +122,11 @@ partitions: if err != nil || fr == nil { t.Errorf("fixed rule create failed with queue name, err %v", err) } - queue, err = fr.placeApplication(app, queueFunc) + queue, aclCheck, err = fr.placeApplication(app, queueFunc) if queue != "root.newqueue" || err != nil { t.Errorf("fixed rule failed to place queue in to be created queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a parent queue should not fail: failure happens on create in this case conf = configs.PlacementRule{ @@ -133,10 +137,11 @@ partitions: if err != nil || fr == nil { t.Errorf("fixed rule create failed with queue name, err %v", err) } - queue, err = fr.placeApplication(app, queueFunc) + queue, aclCheck, err = fr.placeApplication(app, queueFunc) if queue != "root.testparent" || err != nil { t.Errorf("fixed rule did fail with parent queue '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a parent conf = configs.PlacementRule{ @@ -151,10 +156,11 @@ partitions: if err != nil || fr == nil { t.Errorf("fixed rule create failed with queue name, err %v", err) } - queue, err = fr.placeApplication(app, queueFunc) + queue, aclCheck, err = fr.placeApplication(app, queueFunc) if queue != "root.testparent.testchild" || err != nil { t.Errorf("fixed rule with parent queue should not have failed '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") } func TestFixedRuleParent(t *testing.T) { @@ -184,10 +190,12 @@ func TestFixedRuleParent(t *testing.T) { t.Errorf("fixed rule create failed with queue name, err %v", err) } var queue string - queue, err = fr.placeApplication(app, queueFunc) + var aclCheck bool + queue, aclCheck, err = fr.placeApplication(app, queueFunc) if queue != "" || err != nil { t.Errorf("fixed rule with create false for child should have failed and gave '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a non creatable parent conf = configs.PlacementRule{ @@ -204,10 +212,11 @@ func TestFixedRuleParent(t *testing.T) { if err != nil || fr == nil { t.Errorf("fixed rule create failed with queue name, err %v", err) } - queue, err = fr.placeApplication(app, queueFunc) + queue, aclCheck, err = fr.placeApplication(app, queueFunc) if queue != "" || err != nil { t.Errorf("fixed rule with non existing parent queue should have failed '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a creatable parent conf = configs.PlacementRule{ @@ -224,10 +233,11 @@ func TestFixedRuleParent(t *testing.T) { if err != nil || fr == nil { t.Errorf("fixed rule create failed with queue name, err %v", err) } - queue, err = fr.placeApplication(app, queueFunc) + queue, aclCheck, err = fr.placeApplication(app, queueFunc) if queue != nameParentChild || err != nil { t.Errorf("fixed rule with non existing parent queue should created '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a parent which is defined as a leaf conf = configs.PlacementRule{ @@ -243,8 +253,9 @@ func TestFixedRuleParent(t *testing.T) { if err != nil || fr == nil { t.Errorf("fixed rule create failed with queue name, err %v", err) } - queue, err = fr.placeApplication(app, queueFunc) + queue, aclCheck, err = fr.placeApplication(app, queueFunc) if queue != "" || err == nil { t.Errorf("fixed rule with parent declared as leaf should have failed '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") } diff --git a/pkg/scheduler/placement/placement.go b/pkg/scheduler/placement/placement.go index c4b6ae8d8..b74a4865e 100644 --- a/pkg/scheduler/placement/placement.go +++ b/pkg/scheduler/placement/placement.go @@ -28,29 +28,22 @@ import ( "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-core/pkg/log" "github.com/apache/yunikorn-core/pkg/scheduler/objects" + "github.com/apache/yunikorn-core/pkg/scheduler/placement/types" ) type AppPlacementManager struct { - name string - rules []rule - initialised bool - queueFn func(string) *objects.Queue + rules []rule + queueFn func(string) *objects.Queue sync.RWMutex } func NewPlacementManager(rules []configs.PlacementRule, queueFunc func(string) *objects.Queue) *AppPlacementManager { - m := &AppPlacementManager{} - if queueFunc == nil { - log.Log(log.Config).Info("Placement manager created without queue function: not active") - return m + m := &AppPlacementManager{ + queueFn: queueFunc, } - m.queueFn = queueFunc - if len(rules) > 0 { - if err := m.initialise(rules); err != nil { - log.Log(log.Config).Info("Placement manager created without rules: not active", - zap.Error(err)) - } + if err := m.initialise(rules); err != nil { + log.Log(log.Config).Error("Placement manager created without rules: not active", zap.Error(err)) } return m } @@ -58,32 +51,14 @@ func NewPlacementManager(rules []configs.PlacementRule, queueFunc func(string) * // Update the rules for an active placement manager // Note that this will only be called when the manager is created earlier and the config is updated. func (m *AppPlacementManager) UpdateRules(rules []configs.PlacementRule) error { - if len(rules) > 0 { - log.Log(log.Config).Info("Building new rule list for placement manager") - if err := m.initialise(rules); err != nil { - log.Log(log.Config).Info("Placement manager rules not reloaded", - zap.Error(err)) - return err - } - } - // if there are no rules in the config we should turn off the placement manager - if len(rules) == 0 && m.initialised { - m.Lock() - defer m.Unlock() - log.Log(log.Config).Info("Placement manager rules removed on config reload") - m.initialised = false - m.rules = make([]rule, 0) + log.Log(log.Config).Info("Building new rule list for placement manager") + if err := m.initialise(rules); err != nil { + log.Log(log.Config).Info("Placement manager rules not reloaded", zap.Error(err)) + return err } return nil } -// Return the state of the placement manager -func (m *AppPlacementManager) IsInitialised() bool { - m.RLock() - defer m.RUnlock() - return m.initialised -} - // Initialise the rules from a parsed config. func (m *AppPlacementManager) initialise(rules []configs.PlacementRule) error { log.Log(log.Config).Info("Building new rule list for placement manager") @@ -94,14 +69,10 @@ func (m *AppPlacementManager) initialise(rules []configs.PlacementRule) error { } m.Lock() defer m.Unlock() - if m.queueFn == nil { - return fmt.Errorf("placement manager queue function nil") - } log.Log(log.Config).Info("Activated rule set in placement manager") m.rules = tempRules // all done manager is initialised - m.initialised = true for rule := range m.rules { log.Log(log.Config).Debug("rule set", zap.Int("ruleNumber", rule), @@ -114,9 +85,13 @@ func (m *AppPlacementManager) initialise(rules []configs.PlacementRule) error { // If the rule set is correct and can be used the new set is returned. // If any error is encountered a nil array is returned and the error set func (m *AppPlacementManager) buildRules(rules []configs.PlacementRule) ([]rule, error) { - // catch an empty list + // empty list should result in a single "provided" rule if len(rules) == 0 { - return nil, fmt.Errorf("placement manager rule list request is empty") + log.Log(log.Config).Info("Placement manager configured without rules: using implicit provided rule") + rules = []configs.PlacementRule{{ + Name: types.Provided, + Create: false, + }} } // build temp list from new config var newRules []rule @@ -127,23 +102,24 @@ func (m *AppPlacementManager) buildRules(rules []configs.PlacementRule) ([]rule, } newRules = append(newRules, buildRule) } + // ensure the recovery rule is always present + newRules = append(newRules, &recoveryRule{}) + return newRules, nil } func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error { - // Placement manager not initialised cannot place application, just return m.RLock() defer m.RUnlock() - if !m.initialised { - return nil - } + var queueName string + var aclCheck bool var err error for _, checkRule := range m.rules { log.Log(log.Config).Debug("Executing rule for placing application", zap.String("ruleName", checkRule.getName()), zap.String("application", app.ApplicationID)) - queueName, err = checkRule.placeApplication(app, m.queueFn) + queueName, aclCheck, err = checkRule.placeApplication(app, m.queueFn) if err != nil { log.Log(log.Config).Error("rule execution failed", zap.String("ruleName", checkRule.getName()), @@ -164,7 +140,7 @@ func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error { queue = m.queueFn(current) } // Check if the user is allowed to submit to this queueName, if not next rule - if !queue.CheckSubmitAccess(app.GetUser()) { + if aclCheck && !queue.CheckSubmitAccess(app.GetUser()) { log.Log(log.Config).Debug("Submit access denied on queue", zap.String("queueName", queue.GetQueuePath()), zap.String("ruleName", checkRule.getName()), @@ -185,7 +161,7 @@ func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error { continue } // Check if the user is allowed to submit to this queueName, if not next rule - if !queue.CheckSubmitAccess(app.GetUser()) { + if aclCheck && !queue.CheckSubmitAccess(app.GetUser()) { log.Log(log.Config).Debug("Submit access denied on queue", zap.String("queueName", queueName), zap.String("ruleName", checkRule.getName()), diff --git a/pkg/scheduler/placement/placement_test.go b/pkg/scheduler/placement/placement_test.go index 9eec2c178..947fb94f3 100644 --- a/pkg/scheduler/placement/placement_test.go +++ b/pkg/scheduler/placement/placement_test.go @@ -25,57 +25,34 @@ import ( "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-core/pkg/common/security" + "github.com/apache/yunikorn-core/pkg/scheduler/placement/types" ) // basic test to check if no rules leave the manager unusable func TestManagerNew(t *testing.T) { // basic info without rules, manager should not init - man := NewPlacementManager(nil, nil) - if man.initialised { - t.Error("Placement manager marked initialised without rules") - } - if man.IsInitialised() { - t.Error("Placement manager marked initialised without rules") - } - if len(man.rules) != 0 { - t.Error("Placement manager marked initialised without rules") - } -} - -func TestManagerNoFunc(t *testing.T) { - // basic info without rules, manager should not init - rules := []configs.PlacementRule{ - {Name: "test"}, - } - man := NewPlacementManager(rules, nil) - if man.initialised { - t.Error("Placement manager marked initialised without queue func") - } - if man.initialise(rules) == nil { - t.Error("Placement manager should not initialise with nil function") - } - if man.UpdateRules(rules) == nil { - t.Error("Placement manager should not update with nil function") - } + man := NewPlacementManager(nil, queueFunc) + assert.Equal(t, 2, len(man.rules), "wrong rule count for empty placement manager") + assert.Equal(t, types.Provided, man.rules[0].getName(), "wrong name for implicit provided rule") + assert.Equal(t, types.Recovery, man.rules[1].getName(), "wrong name for implicit recovery rule") } func TestManagerInit(t *testing.T) { - // basic info without rules, manager should not init no error + // basic info without rules, manager should implicitly init man := NewPlacementManager(nil, queueFunc) - if man.initialised { - t.Error("Placement manager marked initialised without rules") - } - // try to init with empty list must error + assert.Equal(t, 2, len(man.rules), "wrong rule count for nil placement manager") + + // try to init with empty list should do the same var rules []configs.PlacementRule err := man.initialise(rules) - if err == nil || man.initialised { - t.Error("initialise without rules should have failed") - } + assert.NilError(t, err, "Failed to initialize empty placement rules") + assert.Equal(t, 2, len(man.rules), "wrong rule count for empty placement manager") + rules = []configs.PlacementRule{ {Name: "unknown"}, } err = man.initialise(rules) - if err == nil || man.initialised { + if err == nil { t.Error("initialise with 'unknown' rule list should have failed") } @@ -84,20 +61,18 @@ func TestManagerInit(t *testing.T) { {Name: "test"}, } err = man.initialise(rules) - if err != nil || !man.initialised { - t.Errorf("failed to init existing manager, init state: %t, error: %v", man.initialised, err) - } - // update the manager: remove rules init state is reverted + assert.NilError(t, err, "failed to init existing manager") + + // update the manager: remove rules implicit state is reverted rules = []configs.PlacementRule{} err = man.initialise(rules) - if err == nil || !man.initialised { - t.Errorf("init should have failed with empty list, init state: %t, error: %v", man.initialised, err) - } + assert.NilError(t, err, "Failed to re-initialize empty placement rules") + assert.Equal(t, 2, len(man.rules), "wrong rule count for newly empty placement manager") + // check if we handle a nil list err = man.initialise(nil) - if err == nil || !man.initialised { - t.Errorf("init should have failed with nil list, init state: %t, error: %v", man.initialised, err) - } + assert.NilError(t, err, "Failed to re-initialize nil placement rules") + assert.Equal(t, 2, len(man.rules), "wrong rule count for nil placement manager") } func TestManagerUpdate(t *testing.T) { @@ -108,20 +83,18 @@ func TestManagerUpdate(t *testing.T) { {Name: "test"}, } err := man.UpdateRules(rules) - if err != nil || !man.initialised { - t.Errorf("failed to update existing manager, init state: %t, error: %v", man.initialised, err) - } + assert.NilError(t, err, "failed to update existing manager") + // update the manager: remove rules init state is reverted rules = []configs.PlacementRule{} err = man.UpdateRules(rules) - if err != nil || man.initialised { - t.Errorf("failed to update existing manager, init state: %t, error: %v", man.initialised, err) - } + assert.NilError(t, err, "Failed to re-initialize empty placement rules") + assert.Equal(t, 2, len(man.rules), "wrong rule count for newly empty placement manager") + // check if we handle a nil list err = man.UpdateRules(nil) - if err != nil || man.initialised { - t.Errorf("failed to update existing manager with nil list, init state: %t, error: %v", man.initialised, err) - } + assert.NilError(t, err, "Failed to re-initialize nil placement rules") + assert.Equal(t, 2, len(man.rules), "wrong rule count for nil placement manager") } func TestManagerBuildRule(t *testing.T) { @@ -134,8 +107,8 @@ func TestManagerBuildRule(t *testing.T) { if err != nil { t.Errorf("test rule build should not have failed, err: %v", err) } - if len(ruleObjs) != 1 { - t.Errorf("test rule build should have created 1 rule found: %d", len(ruleObjs)) + if len(ruleObjs) != 2 { + t.Errorf("test rule build should have created 2 rules found: %d", len(ruleObjs)) } // rule with a parent rule should only be 1 rule in the list @@ -147,8 +120,8 @@ func TestManagerBuildRule(t *testing.T) { }, } ruleObjs, err = man.buildRules(rules) - if err != nil || len(ruleObjs) != 1 { - t.Errorf("test rule build should not have failed and created 1 top level rule, err: %v, rules: %v", err, ruleObjs) + if err != nil || len(ruleObjs) != 2 { + t.Errorf("test rule build should not have failed and created 2 top level rule, err: %v, rules: %v", err, ruleObjs) } else { parent := ruleObjs[0].getParent() if parent == nil || parent.getName() != "test" { @@ -162,9 +135,9 @@ func TestManagerBuildRule(t *testing.T) { {Name: "test"}, } ruleObjs, err = man.buildRules(rules) - if err != nil || len(ruleObjs) != 2 { - t.Errorf("rule build should not have failed and created 2 rule, err: %v, rules: %v", err, ruleObjs) - } else if ruleObjs[0].getName() != "user" || ruleObjs[1].getName() != "test" { + if err != nil || len(ruleObjs) != 3 { + t.Errorf("rule build should not have failed and created 3 rules, err: %v, rules: %v", err, ruleObjs) + } else if ruleObjs[0].getName() != "user" || ruleObjs[1].getName() != "test" || ruleObjs[2].getName() != "recovery" { t.Errorf("rule build order is not preserved: %v", ruleObjs) } } @@ -207,9 +180,7 @@ partitions: Create: true}, } err = man.UpdateRules(rules) - if err != nil || !man.initialised { - t.Errorf("failed to update existing manager, init state: %t, error: %v", man.initialised, err) - } + assert.NilError(t, err, "failed to update existing manager") tags := make(map[string]string) user := security.UserGroup{ User: "testchild", diff --git a/pkg/scheduler/placement/provided_rule.go b/pkg/scheduler/placement/provided_rule.go index 203cc28ca..e7fa15f58 100644 --- a/pkg/scheduler/placement/provided_rule.go +++ b/pkg/scheduler/placement/provided_rule.go @@ -52,33 +52,35 @@ func (pr *providedRule) initialise(conf configs.PlacementRule) error { return err } -func (pr *providedRule) placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, error) { +func (pr *providedRule) placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, bool, error) { // since this is the provided rule we must have a queue in the info already queueName := app.GetQueuePath() if queueName == "" { - return "", nil + return "", true, nil } + // before anything run the filter if !pr.filter.allowUser(app.GetUser()) { log.Log(log.Config).Debug("Provided rule filtered", zap.String("application", app.ApplicationID), zap.Any("user", app.GetUser())) - return "", nil + return "", true, nil } var parentName string + var aclCheck = true var err error // if we have a fully qualified queue passed in do not run the parent rule if !strings.HasPrefix(queueName, configs.RootQueue+configs.DOT) { // run the parent rule if set if pr.parent != nil { - parentName, err = pr.parent.placeApplication(app, queueFn) + parentName, aclCheck, err = pr.parent.placeApplication(app, queueFn) // failed parent rule, fail this rule if err != nil { - return "", err + return "", aclCheck, err } // rule did not return a parent: this could be filter or create flag related if parentName == "" { - return "", nil + return "", aclCheck, nil } // check if this is a parent queue and qualify it if !strings.HasPrefix(parentName, configs.RootQueue+configs.DOT) { @@ -87,7 +89,7 @@ func (pr *providedRule) placeApplication(app *objects.Application, queueFn func( // if the parent queue exists it cannot be a leaf parentQueue := queueFn(parentName) if parentQueue != nil && parentQueue.IsLeafQueue() { - return "", fmt.Errorf("parent rule returned a leaf queue: %s", parentName) + return "", aclCheck, fmt.Errorf("parent rule returned a leaf queue: %s", parentName) } } // the parent is set from the rule otherwise set it to the root @@ -104,10 +106,10 @@ func (pr *providedRule) placeApplication(app *objects.Application, queueFn func( queue := queueFn(queueName) // if we cannot create the queue must exist if !pr.create && queue == nil { - return "", nil + return "", aclCheck, nil } log.Log(log.Config).Info("Provided rule application placed", zap.String("application", app.ApplicationID), zap.String("queue", queueName)) - return queueName, nil + return queueName, aclCheck, nil } diff --git a/pkg/scheduler/placement/provided_rule_test.go b/pkg/scheduler/placement/provided_rule_test.go index 5077c834a..e400c9223 100644 --- a/pkg/scheduler/placement/provided_rule_test.go +++ b/pkg/scheduler/placement/provided_rule_test.go @@ -57,22 +57,26 @@ partitions: // queue that does not exists directly under the root appInfo := newApplication("app1", "default", "unknown", user, tags, nil, "") var queue string - queue, err = pr.placeApplication(appInfo, queueFunc) + var aclCheck bool + queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc) if queue != "" || err != nil { t.Errorf("provided rule placed app in incorrect queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place when no queue provided in the app appInfo = newApplication("app1", "default", "", user, tags, nil, "") - queue, err = pr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc) if queue != "" || err != nil { t.Errorf("provided rule placed app in incorrect queue '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a qualified queue that does not exist appInfo = newApplication("app1", "default", "root.unknown", user, tags, nil, "") - queue, err = pr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc) if queue != "" || err != nil { t.Errorf("provided rule placed app in incorrect queue '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // same queue now with create flag conf = configs.PlacementRule{ Name: "provided", @@ -82,10 +86,11 @@ partitions: if err != nil || pr == nil { t.Errorf("provided rule create failed, err %v", err) } - queue, err = pr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc) if queue != "root.unknown" || err != nil { t.Errorf("provided rule placed app in incorrect queue '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") conf = configs.PlacementRule{ Name: "provided", @@ -101,18 +106,20 @@ partitions: // unqualified queue with parent rule that exists directly in hierarchy appInfo = newApplication("app1", "default", "testchild", user, tags, nil, "") - queue, err = pr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc) if queue != "root.testparent.testchild" || err != nil { t.Errorf("provided rule failed to place queue in correct queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // qualified queue with parent rule (parent rule ignored) appInfo = newApplication("app1", "default", "root.testparent", user, tags, nil, "") - queue, err = pr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc) if queue != "root.testparent" || err != nil { t.Errorf("provided rule placed in to be created queue with create false '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") } func TestProvidedRuleParent(t *testing.T) { @@ -142,10 +149,12 @@ func TestProvidedRuleParent(t *testing.T) { appInfo := newApplication("app1", "default", "unknown", user, tags, nil, "") var queue string - queue, err = pr.placeApplication(appInfo, queueFunc) + var aclCheck bool + queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc) if queue != "" || err != nil { t.Errorf("provided rule placed app in incorrect queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a non creatable parent conf = configs.PlacementRule{ @@ -163,10 +172,11 @@ func TestProvidedRuleParent(t *testing.T) { } appInfo = newApplication("app1", "default", "testchild", user, tags, nil, "") - queue, err = pr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc) if queue != "" || err != nil { t.Errorf("provided rule placed app in incorrect queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a creatable parent conf = configs.PlacementRule{ @@ -182,10 +192,11 @@ func TestProvidedRuleParent(t *testing.T) { if err != nil || pr == nil { t.Errorf("provided rule create failed, err %v", err) } - queue, err = pr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc) if queue != nameParentChild || err != nil { t.Errorf("provided rule with non existing parent queue should create '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a parent which is defined as a leaf conf = configs.PlacementRule{ @@ -202,8 +213,9 @@ func TestProvidedRuleParent(t *testing.T) { } appInfo = newApplication("app1", "default", "unknown", user, tags, nil, "") - queue, err = pr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = pr.placeApplication(appInfo, queueFunc) if queue != "" || err == nil { t.Errorf("provided rule placed app in incorrect queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") } diff --git a/pkg/scheduler/placement/recovery_rule.go b/pkg/scheduler/placement/recovery_rule.go new file mode 100644 index 000000000..71f7dc23a --- /dev/null +++ b/pkg/scheduler/placement/recovery_rule.go @@ -0,0 +1,58 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package placement + +import ( + "go.uber.org/zap" + + "github.com/apache/yunikorn-core/pkg/common" + "github.com/apache/yunikorn-core/pkg/common/configs" + "github.com/apache/yunikorn-core/pkg/log" + "github.com/apache/yunikorn-core/pkg/scheduler/objects" + "github.com/apache/yunikorn-core/pkg/scheduler/placement/types" +) + +type recoveryRule struct { + basicRule +} + +// A rule to place an application into the recovery queue if no other rules matched and application submission is forced. +// This rule will be run implicitly after all other placement rules are evaluated to ensure that an application +// corresponding to an already-executing workload can be accepted successfully. +func (rr *recoveryRule) getName() string { + return types.Recovery +} + +func (rr *recoveryRule) initialise(conf configs.PlacementRule) error { + // no configuration needed for the recovery rule + return nil +} + +func (rr *recoveryRule) placeApplication(app *objects.Application, _ func(string) *objects.Queue) (string, bool, error) { + // only forced applications should resolve to the recovery queue + if !app.IsCreateForced() { + return "", false, nil + } + + queueName := common.RecoveryQueueFull + log.Log(log.Config).Info("Recovery rule application placed", + zap.String("application", app.ApplicationID), + zap.String("queue", queueName)) + return queueName, false, nil +} diff --git a/pkg/scheduler/placement/recovery_rule_test.go b/pkg/scheduler/placement/recovery_rule_test.go new file mode 100644 index 000000000..7185396d9 --- /dev/null +++ b/pkg/scheduler/placement/recovery_rule_test.go @@ -0,0 +1,80 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package placement + +import ( + "testing" + + "gotest.tools/v3/assert" + + "github.com/apache/yunikorn-core/pkg/common" + "github.com/apache/yunikorn-core/pkg/common/configs" + "github.com/apache/yunikorn-core/pkg/common/security" + siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common" +) + +func TestRecoveryRuleInitialise(t *testing.T) { + conf := configs.PlacementRule{ + Name: "recovery", + } + rr := &recoveryRule{} + err := rr.initialise(conf) + assert.NilError(t, err, "unexpected error in initialize") +} + +func TestRecoveryRulePlace(t *testing.T) { + rr := &recoveryRule{} + + // Create the structure for the test + data := ` +partitions: + - name: default + queues: + - name: testqueue + - name: testparent + queues: + - name: testchild +` + err := initQueueStructure([]byte(data)) + assert.NilError(t, err, "setting up the queue config failed") + + // verify that non-forced app is not recovered + user := security.UserGroup{ + User: "testuser", + Groups: []string{}, + } + tags := make(map[string]string) + app := newApplication("app1", "default", "ignored", user, tags, nil, "") + + var queue string + var aclCheck bool + queue, aclCheck, err = rr.placeApplication(app, queueFunc) + if queue != "" || err != nil { + t.Errorf("recovery rule did not bypass non-forced application, resolved queue '%s', err %v ", queue, err) + } + assert.Check(t, !aclCheck, "acl check should not be set for recovery rule") + + tags[siCommon.AppTagCreateForce] = "true" + app = newApplication("app1", "default", "ignored", user, tags, nil, "") + queue, aclCheck, err = rr.placeApplication(app, queueFunc) + if queue != common.RecoveryQueueFull || err != nil { + t.Errorf("recovery rule did not place forced application into recovery queue, resolved queue '%s', err %v ", queue, err) + } + assert.Check(t, !aclCheck, "acl check should not be set for recovery rule") +} diff --git a/pkg/scheduler/placement/rule.go b/pkg/scheduler/placement/rule.go index 3120d886e..94a04b451 100644 --- a/pkg/scheduler/placement/rule.go +++ b/pkg/scheduler/placement/rule.go @@ -38,8 +38,9 @@ type rule interface { // Execute the rule and return the queue getName the application is placed in. // Returns the fully qualified queue getName if the rule finds a queue or an empty string if the rule did not match. + // Additionally, returns true if ACLs should be checked or false otherwise. // The error must only be set if there is a failure while executing the rule not if the rule did not match. - placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, error) + placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, bool, error) // Return the getName of the rule which is defined in the rule. // The basicRule provides a "unnamed rule" implementation. diff --git a/pkg/scheduler/placement/rule_test.go b/pkg/scheduler/placement/rule_test.go index f4e741c8e..c6d4e49f7 100644 --- a/pkg/scheduler/placement/rule_test.go +++ b/pkg/scheduler/placement/rule_test.go @@ -61,31 +61,36 @@ func TestPlaceApp(t *testing.T) { } nr, err := newRule(conf) assert.NilError(t, err, "unexpected rule initialisation error") + var aclCheck bool // place application that should fail - _, err = nr.placeApplication(nil, nil) + _, aclCheck, err = nr.placeApplication(nil, nil) if err == nil { t.Error("test rule place application did not fail as expected") } + assert.Check(t, aclCheck, "acls should be checked") var queue string // place application that should not fail and return "test" - queue, err = nr.placeApplication(&objects.Application{}, nil) + queue, aclCheck, err = nr.placeApplication(&objects.Application{}, nil) if err != nil || queue != "test" { t.Errorf("test rule place application did not fail, err: %v, ", err) } + assert.Check(t, aclCheck, "acls should be checked") // place application that should not fail and return the queue in the object app := &objects.Application{} app.SetQueuePath("passedin") - queue, err = nr.placeApplication(app, nil) + queue, aclCheck, err = nr.placeApplication(app, nil) if err != nil || queue != "passedin" { t.Errorf("test rule place application did not fail, err: %v, ", err) } + assert.Check(t, aclCheck, "acls should be checked") // place application that should not fail and return the queue in the object app = &objects.Application{} app.SetQueuePath("user.name") - queue, err = nr.placeApplication(app, nil) + queue, aclCheck, err = nr.placeApplication(app, nil) if err != nil || queue != "user_dot_name" { t.Errorf("test rule place application did not fail, err: %v, ", err) } + assert.Check(t, aclCheck, "acls should be checked") } func TestReplaceDot(t *testing.T) { diff --git a/pkg/scheduler/placement/tag_rule.go b/pkg/scheduler/placement/tag_rule.go index 99030b756..1282a86a9 100644 --- a/pkg/scheduler/placement/tag_rule.go +++ b/pkg/scheduler/placement/tag_rule.go @@ -57,11 +57,11 @@ func (tr *tagRule) initialise(conf configs.PlacementRule) error { return err } -func (tr *tagRule) placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, error) { +func (tr *tagRule) placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, bool, error) { // if the tag is not present we can skipp all other processing tagVal := app.GetTag(tr.tagName) if tagVal == "" { - return "", nil + return "", true, nil } // before anything run the filter if !tr.filter.allowUser(app.GetUser()) { @@ -69,23 +69,24 @@ func (tr *tagRule) placeApplication(app *objects.Application, queueFn func(strin zap.String("application", app.ApplicationID), zap.Any("user", app.GetUser()), zap.String("tagName", tr.tagName)) - return "", nil + return "", true, nil } var parentName string + var aclCheck = true var err error queueName := tagVal // if we have a fully qualified queue in the value do not run the parent rule if !strings.HasPrefix(queueName, configs.RootQueue+configs.DOT) { // run the parent rule if set if tr.parent != nil { - parentName, err = tr.parent.placeApplication(app, queueFn) + parentName, aclCheck, err = tr.parent.placeApplication(app, queueFn) // failed parent rule, fail this rule if err != nil { - return "", err + return "", aclCheck, err } // rule did not match: this could be filter or create flag related if parentName == "" { - return "", nil + return "", aclCheck, nil } // check if this is a parent queue and qualify it if !strings.HasPrefix(parentName, configs.RootQueue+configs.DOT) { @@ -94,7 +95,7 @@ func (tr *tagRule) placeApplication(app *objects.Application, queueFn func(strin // if the parent queue exists it cannot be a leaf parentQueue := queueFn(parentName) if parentQueue != nil && parentQueue.IsLeafQueue() { - return "", fmt.Errorf("parent rule returned a leaf queue: %s", parentName) + return "", aclCheck, fmt.Errorf("parent rule returned a leaf queue: %s", parentName) } } // the parent is set from the rule otherwise set it to the root @@ -110,10 +111,10 @@ func (tr *tagRule) placeApplication(app *objects.Application, queueFn func(strin queue := queueFn(queueName) // if we cannot create the queue it must exist, rule does not match otherwise if !tr.create && queue == nil { - return "", nil + return "", aclCheck, nil } log.Log(log.Config).Info("Tag rule application placed", zap.String("application", app.ApplicationID), zap.String("queue", queueName)) - return queueName, nil + return queueName, aclCheck, nil } diff --git a/pkg/scheduler/placement/tag_rule_test.go b/pkg/scheduler/placement/tag_rule_test.go index 548548dd7..251cf81a7 100644 --- a/pkg/scheduler/placement/tag_rule_test.go +++ b/pkg/scheduler/placement/tag_rule_test.go @@ -23,6 +23,7 @@ import ( "gotest.tools/v3/assert" + "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/common/configs" "github.com/apache/yunikorn-core/pkg/common/security" ) @@ -89,34 +90,48 @@ partitions: tags := make(map[string]string) appInfo := newApplication("app1", "default", "ignored", user, tags, nil, "") var queue string - queue, err = tr.placeApplication(appInfo, queueFunc) + var aclCheck bool + queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc) if queue != "" || err != nil { t.Errorf("tag rule failed with no tag value '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // tag queue that exists directly in hierarchy tags = map[string]string{"label1": "testqueue"} appInfo = newApplication("app1", "default", "ignored", user, tags, nil, "") - queue, err = tr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc) if queue != "root.testqueue" || err != nil { t.Errorf("tag rule failed to place queue in correct queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // tag queue that does not exists tags = map[string]string{"label1": "unknown"} appInfo = newApplication("app1", "default", "ignored", user, tags, nil, "") - queue, err = tr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc) if queue != "" || err != nil { t.Errorf("tag rule placed in queue that does not exists '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // tag queue fully qualified tags = map[string]string{"label1": "root.testparent.testchild"} appInfo = newApplication("app1", "default", "ignored", user, tags, nil, "") - queue, err = tr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc) if queue != "root.testparent.testchild" || err != nil { t.Errorf("tag rule did fail with qualified queue '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") + + // tag queue references recovery + tags = map[string]string{"label1": common.RecoveryQueueFull} + appInfo = newApplication("app1", "default", "ignored", user, tags, nil, "") + queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc) + if queue != "" || err != nil { + t.Errorf("tag rule failed with explicit recovery queue: queue '%s', error %v", queue, err) + } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a parent conf = configs.PlacementRule{ @@ -133,16 +148,18 @@ partitions: } tags = map[string]string{"label1": "testchild"} appInfo = newApplication("app1", "default", "ignored", user, tags, nil, "") - queue, err = tr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc) if queue != "" || err != nil { t.Errorf("tag rule with parent queue should have failed value not set '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") tags = map[string]string{"label1": "testchild", "label2": "testparent"} appInfo = newApplication("app1", "default", "ignored", user, tags, nil, "") - queue, err = tr.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = tr.placeApplication(appInfo, queueFunc) if queue != "root.testparent.testchild" || err != nil { t.Errorf("tag rule with parent queue incorrect queue '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") } func TestTagRuleParent(t *testing.T) { @@ -173,10 +190,12 @@ func TestTagRuleParent(t *testing.T) { tags := map[string]string{"label1": "testchild", "label2": "testparent"} appInfo := newApplication("app1", "default", "unknown", user, tags, nil, "") var queue string - queue, err = ur.placeApplication(appInfo, queueFunc) + var aclCheck bool + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue != "" || err != nil { t.Errorf("tag rule placed app in incorrect queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a non creatable parent conf = configs.PlacementRule{ @@ -196,10 +215,11 @@ func TestTagRuleParent(t *testing.T) { tags = map[string]string{"label1": "testchild", "label2": "testparentnew"} appInfo = newApplication("app1", "default", "unknown", user, tags, nil, "") - queue, err = ur.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue != "" || err != nil { t.Errorf("tag rule placed app in incorrect queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a creatable parent conf = configs.PlacementRule{ @@ -216,10 +236,11 @@ func TestTagRuleParent(t *testing.T) { if err != nil || ur == nil { t.Errorf("tag rule create failed with queue name, err %v", err) } - queue, err = ur.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue != nameParentChild || err != nil { t.Errorf("user rule with non existing parent queue should create '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a parent which is defined as a leaf conf = configs.PlacementRule{ @@ -237,8 +258,9 @@ func TestTagRuleParent(t *testing.T) { } appInfo = newApplication("app1", "default", "unknown", user, tags, nil, "") - queue, err = ur.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue != "" || err == nil { t.Errorf("tag rule placed app in incorrect queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") } diff --git a/pkg/scheduler/placement/testrule.go b/pkg/scheduler/placement/testrule.go index c94bfe876..7554d42da 100644 --- a/pkg/scheduler/placement/testrule.go +++ b/pkg/scheduler/placement/testrule.go @@ -48,12 +48,12 @@ func (tr *testRule) initialise(conf configs.PlacementRule) error { } // Simple test rule that just checks the app passed in and returns fixed queue names. -func (tr *testRule) placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, error) { +func (tr *testRule) placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, bool, error) { if app == nil { - return "", fmt.Errorf("nil app passed in") + return "", true, fmt.Errorf("nil app passed in") } if queuePath := app.GetQueuePath(); queuePath != "" { - return replaceDot(queuePath), nil + return replaceDot(queuePath), true, nil } - return "test", nil + return types.Test, true, nil } diff --git a/pkg/scheduler/placement/types/types.go b/pkg/scheduler/placement/types/types.go index 661493109..6762b987a 100644 --- a/pkg/scheduler/placement/types/types.go +++ b/pkg/scheduler/placement/types/types.go @@ -24,4 +24,5 @@ const ( Provided = "provided" Tag = "tag" Test = "test" + Recovery = "recovery" ) diff --git a/pkg/scheduler/placement/user_rule.go b/pkg/scheduler/placement/user_rule.go index 9ecb9eae4..f7113cf24 100644 --- a/pkg/scheduler/placement/user_rule.go +++ b/pkg/scheduler/placement/user_rule.go @@ -49,27 +49,28 @@ func (ur *userRule) initialise(conf configs.PlacementRule) error { return err } -func (ur *userRule) placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, error) { +func (ur *userRule) placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, bool, error) { // before anything run the filter userName := app.GetUser().User if !ur.filter.allowUser(app.GetUser()) { log.Log(log.Config).Debug("User rule filtered", zap.String("application", app.ApplicationID), zap.Any("user", app.GetUser())) - return "", nil + return "", true, nil } var parentName string + var aclCheck = true var err error // run the parent rule if set if ur.parent != nil { - parentName, err = ur.parent.placeApplication(app, queueFn) + parentName, aclCheck, err = ur.parent.placeApplication(app, queueFn) // failed parent rule, fail this rule if err != nil { - return "", err + return "", aclCheck, err } // rule did not match: this could be filter or create flag related if parentName == "" { - return "", nil + return "", aclCheck, nil } // check if this is a parent queue and qualify it if !strings.HasPrefix(parentName, configs.RootQueue+configs.DOT) { @@ -78,7 +79,7 @@ func (ur *userRule) placeApplication(app *objects.Application, queueFn func(stri // if the parent queue exists it cannot be a leaf parentQueue := queueFn(parentName) if parentQueue != nil && parentQueue.IsLeafQueue() { - return "", fmt.Errorf("parent rule returned a leaf queue: %s", parentName) + return "", aclCheck, fmt.Errorf("parent rule returned a leaf queue: %s", parentName) } } // the parent is set from the rule otherwise set it to the root @@ -93,10 +94,10 @@ func (ur *userRule) placeApplication(app *objects.Application, queueFn func(stri queue := queueFn(queueName) // if we cannot create the queue it must exist, rule does not match otherwise if !ur.create && queue == nil { - return "", nil + return "", aclCheck, nil } log.Log(log.Config).Info("User rule application placed", zap.String("application", app.ApplicationID), zap.String("queue", queueName)) - return queueName, nil + return queueName, aclCheck, nil } diff --git a/pkg/scheduler/placement/user_rule_test.go b/pkg/scheduler/placement/user_rule_test.go index 9e7126e3d..c38a89ff6 100644 --- a/pkg/scheduler/placement/user_rule_test.go +++ b/pkg/scheduler/placement/user_rule_test.go @@ -59,30 +59,34 @@ partitions: t.Errorf("user rule create failed, err %v", err) } var queue string - queue, err = ur.placeApplication(appInfo, queueFunc) + var aclCheck bool + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue != "root.testchild" || err != nil { t.Errorf("user rule failed to place queue in correct queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a parent queue should fail on queue create not in the rule user = security.UserGroup{ User: "testparent", Groups: []string{}, } appInfo = newApplication("app1", "default", "ignored", user, tags, nil, "") - queue, err = ur.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue != "root.testparent" || err != nil { t.Errorf("user rule failed with parent queue '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") user = security.UserGroup{ User: "test.user", Groups: []string{}, } appInfo = newApplication("app1", "default", "ignored", user, tags, nil, "") - queue, err = ur.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue == "" || err != nil { t.Errorf("user rule with dotted user should not have failed '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // user queue that exists directly in hierarchy conf = configs.PlacementRule{ @@ -101,10 +105,11 @@ partitions: if err != nil || ur == nil { t.Errorf("user rule create failed with queue name, err %v", err) } - queue, err = ur.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue != "root.testparent.testchild" || err != nil { t.Errorf("user rule failed to place queue in correct queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // user queue that does not exists user = security.UserGroup{ @@ -121,10 +126,11 @@ partitions: if err != nil || ur == nil { t.Errorf("user rule create failed with queue name, err %v", err) } - queue, err = ur.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue != "root.unknown" || err != nil { t.Errorf("user rule placed in to be created queue with create false '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") } func TestUserRuleParent(t *testing.T) { @@ -154,10 +160,12 @@ func TestUserRuleParent(t *testing.T) { appInfo := newApplication("app1", "default", "unknown", user, tags, nil, "") var queue string - queue, err = ur.placeApplication(appInfo, queueFunc) + var aclCheck bool + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue != "" || err != nil { t.Errorf("user rule placed app in incorrect queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a non creatable parent conf = configs.PlacementRule{ @@ -175,10 +183,11 @@ func TestUserRuleParent(t *testing.T) { } appInfo = newApplication("app1", "default", "unknown", user, tags, nil, "") - queue, err = ur.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue != "" || err != nil { t.Errorf("user rule placed app in incorrect queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a creatable parent conf = configs.PlacementRule{ @@ -194,10 +203,11 @@ func TestUserRuleParent(t *testing.T) { if err != nil || ur == nil { t.Errorf("user rule create failed with queue name, err %v", err) } - queue, err = ur.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue != nameParentChild || err != nil { t.Errorf("user rule with non existing parent queue should create '%s', error %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") // trying to place in a child using a parent which is defined as a leaf conf = configs.PlacementRule{ @@ -214,8 +224,9 @@ func TestUserRuleParent(t *testing.T) { } appInfo = newApplication("app1", "default", "unknown", user, tags, nil, "") - queue, err = ur.placeApplication(appInfo, queueFunc) + queue, aclCheck, err = ur.placeApplication(appInfo, queueFunc) if queue != "" || err == nil { t.Errorf("user rule placed app in incorrect queue '%s', err %v", queue, err) } + assert.Check(t, aclCheck, "acls should be checked") } diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go index d3bf96837..4c2b284e4 100644 --- a/pkg/scheduler/utilities_test.go +++ b/pkg/scheduler/utilities_test.go @@ -422,6 +422,10 @@ func newPlacementPartition() (*PartitionContext, error) { } func newApplication(appID, partition, queueName string) *objects.Application { + return newApplicationTags(appID, partition, queueName, nil) +} + +func newApplicationTags(appID, partition, queueName string, tags map[string]string) *objects.Application { user := security.UserGroup{ User: "testuser", Groups: []string{"testgroup"}, @@ -430,6 +434,7 @@ func newApplication(appID, partition, queueName string) *objects.Application { ApplicationID: appID, QueueName: queueName, PartitionName: partition, + Tags: tags, } return objects.NewApplication(siApp, user, nil, rmID) }