diff --git a/pkg/common/configs/configvalidator.go b/pkg/common/configs/configvalidator.go index b4c25e451..0905a5886 100644 --- a/pkg/common/configs/configvalidator.go +++ b/pkg/common/configs/configvalidator.go @@ -37,10 +37,12 @@ import ( ) const ( - RootQueue = "root" - DOT = "." - DotReplace = "_dot_" - DefaultPartition = "default" + RootQueue = "root" + DOT = "." + DotReplace = "_dot_" + DefaultPartition = "default" + RecoveryQueue = "@recovery@" + RecoveryQueueFull = RootQueue + DOT + RecoveryQueue ApplicationSortPolicy = "application.sort.policy" ApplicationSortPriority = "application.sort.priority" diff --git a/pkg/common/constants.go b/pkg/common/constants.go index b0b3a5840..aa8151bdf 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -21,7 +21,9 @@ package common const ( Empty = "" - Wildcard = "*" - Separator = "," - Space = " " + Wildcard = "*" + Separator = "," + Space = " " + AnonymousUser = "nobody" + AnonymousGroup = "nogroup" ) diff --git a/pkg/common/security/usergroup.go b/pkg/common/security/usergroup.go index e381204db..430904bc9 100644 --- a/pkg/common/security/usergroup.go +++ b/pkg/common/security/usergroup.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" + "github.com/apache/yunikorn-core/pkg/common" "github.com/apache/yunikorn-core/pkg/log" "github.com/apache/yunikorn-scheduler-interface/lib/go/si" ) @@ -122,10 +123,16 @@ func (c *UserGroupCache) resetCache() { c.ugs = make(map[string]*UserGroup) } -func (c *UserGroupCache) ConvertUGI(ugi *si.UserGroupInformation) (UserGroup, error) { +func (c *UserGroupCache) ConvertUGI(ugi *si.UserGroupInformation, force bool) (UserGroup, error) { // check if we have a user to convert if ugi == nil || ugi.User == "" { - return UserGroup{}, fmt.Errorf("empty user cannot resolve") + if force { + // app creation is forced, so we need to synthesize a user / group + ugi.User = common.AnonymousUser + ugi.Groups = []string{common.AnonymousGroup} + } else { + return UserGroup{}, fmt.Errorf("empty user cannot resolve") + } } // try to resolve the user if group info is empty otherwise we just convert if len(ugi.Groups) == 0 { diff --git a/pkg/common/security/usergroup_test.go b/pkg/common/security/usergroup_test.go index 279dadf2c..9eef2c6e2 100644 --- a/pkg/common/security/usergroup_test.go +++ b/pkg/common/security/usergroup_test.go @@ -198,13 +198,13 @@ func TestConvertUGI(t *testing.T) { User: "", Groups: nil, } - ug, err := testCache.ConvertUGI(ugi) + ug, err := testCache.ConvertUGI(ugi, false) if err == nil { t.Errorf("empty user convert should have failed and did not: %v", ug) } // try known user without groups ugi.User = "testuser1" - ug, err = testCache.ConvertUGI(ugi) + ug, err = testCache.ConvertUGI(ugi, false) if err != nil { t.Errorf("known user, no groups, convert should not have failed: %v", err) } @@ -213,15 +213,21 @@ func TestConvertUGI(t *testing.T) { } // try unknown user without groups ugi.User = "unknown" - ug, err = testCache.ConvertUGI(ugi) + ug, err = testCache.ConvertUGI(ugi, false) if err == nil { t.Errorf("unknown user, no groups, convert should have failed: %v", ug) } + // try empty user when forced + ugi.User = "" + ug, err = testCache.ConvertUGI(ugi, true) + if err != nil { + t.Errorf("empty user but forced, convert should not have failed: %v", err) + } // try unknown user with groups ugi.User = "unknown2" group := "passedin" ugi.Groups = []string{group} - ug, err = testCache.ConvertUGI(ugi) + ug, err = testCache.ConvertUGI(ugi, false) if err != nil { t.Errorf("unknown user with groups, convert should not have failed: %v", err) } diff --git a/pkg/common/utils.go b/pkg/common/utils.go index 690dc83f7..d49aef1b3 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -191,6 +191,24 @@ func IsAllowPreemptOther(policy *si.PreemptionPolicy) bool { return policy != nil && policy.AllowPreemptOther } +// IsAppCreationForced returns true if the application creation is triggered by the shim +// reporting an existing allocation. In this case, it needs to be accepted regardless +// of whether it maps to a valid queue. +func IsAppCreationForced(tags map[string]string) bool { + tagVal := "" + for key, val := range tags { + if strings.EqualFold(key, interfaceCommon.AppTagCreateForce) { + tagVal = val + break + } + } + result, err := strconv.ParseBool(tagVal) + if err != nil { + return false + } + return result +} + // ZeroTimeInUnixNano return the unix nano or nil if the time is zero. func ZeroTimeInUnixNano(t time.Time) *int64 { if t.IsZero() { diff --git a/pkg/common/utils_test.go b/pkg/common/utils_test.go index 86dc227d9..ee4611e2b 100644 --- a/pkg/common/utils_test.go +++ b/pkg/common/utils_test.go @@ -157,6 +157,18 @@ func TestAllowPreemptOther(t *testing.T) { assert.Check(t, !IsAllowPreemptOther(&si.PreemptionPolicy{AllowPreemptOther: false}), "Preempt other should not be allowed if policy does not allow") } +func TestIsAppCreationForced(t *testing.T) { + assert.Check(t, !IsAppCreationForced(nil), "nil tags should not result in forced app creation") + tags := make(map[string]string) + assert.Check(t, !IsAppCreationForced(tags), "empty tags should not result in forced app creation") + tags[common.AppTagCreateForce] = "false" + assert.Check(t, !IsAppCreationForced(tags), "false creation tag should not result in forced app creation") + tags[common.AppTagCreateForce] = "invalid" + assert.Check(t, !IsAppCreationForced(tags), "invalid creation tag should not result in forced app creation") + tags[common.AppTagCreateForce] = "true" + assert.Check(t, IsAppCreationForced(tags), "creation tag should result in forced app creation") +} + func TestConvertSITimeoutWithAdjustment(t *testing.T) { created := time.Now().Unix() - 600 defaultTimeout := 15 * time.Minute diff --git a/pkg/scheduler/context.go b/pkg/scheduler/context.go index e3f818720..f83a3c57b 100644 --- a/pkg/scheduler/context.go +++ b/pkg/scheduler/context.go @@ -524,7 +524,7 @@ func (cc *ClusterContext) handleRMUpdateApplicationEvent(event *rmevent.RMUpdate } // convert and resolve the user: cache can be set per partition // need to do this before we create the application - ugi, err := partition.convertUGI(app.Ugi) + ugi, err := partition.convertUGI(app.Ugi, common.IsAppCreationForced(app.Tags)) if err != nil { rejectedApps = append(rejectedApps, &si.RejectedApplication{ ApplicationID: app.ApplicationID, diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 100f6905f..3ba680241 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -70,20 +70,20 @@ type StateLogEntry struct { } type Application struct { - ApplicationID string - Partition string - SubmissionTime time.Time + ApplicationID string // application ID + Partition string // partition Name + SubmissionTime time.Time // time application was submitted + tags map[string]string // application tags used in scheduling - // Private fields need protection + // Private mutable fields need protection queuePath string queue *Queue // queue the application is running in pending *resources.Resource // pending resources from asks for the app reservations map[string]*reservation // a map of reservations requests map[string]*AllocationAsk // a map of asks - sortedRequests sortedRequests - user security.UserGroup // owner of the application - tags map[string]string // application tags used in scheduling - allocatedResource *resources.Resource // total allocated resources + sortedRequests sortedRequests // list of requests pre-sorted + user security.UserGroup // owner of the application + allocatedResource *resources.Resource // total allocated resources usedResource *resources.UsedResource // keep track of resource usage of the application @@ -1875,9 +1875,6 @@ func (sa *Application) GetUser() security.UserGroup { // Get a tag from the application // Note: tags are not case sensitive func (sa *Application) GetTag(tag string) string { - sa.RLock() - defer sa.RUnlock() - tagVal := "" for key, val := range sa.tags { if strings.EqualFold(key, tag) { @@ -1888,6 +1885,10 @@ func (sa *Application) GetTag(tag string) string { return tagVal } +func (sa *Application) IsCreateForced() bool { + return common.IsAppCreationForced(sa.tags) +} + func (sa *Application) SetTerminatedCallback(callback func(appID string)) { sa.Lock() defer sa.Unlock() diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 02593af7f..72d0916d2 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -1218,6 +1218,23 @@ func TestGetTag(t *testing.T) { assert.Equal(t, tag, "test value", "expected tag value") } +func TestIsCreateForced(t *testing.T) { + app := newApplicationWithTags(appID1, "default", "root.a", nil) + assert.Check(t, !app.IsCreateForced(), "found forced app but tags nil") + tags := make(map[string]string) + app = newApplicationWithTags(appID1, "default", "root.a", tags) + assert.Check(t, !app.IsCreateForced(), "found forced app but tags empty") + tags[siCommon.AppTagCreateForce] = "false" + app = newApplicationWithTags(appID1, "default", "root.a", tags) + assert.Check(t, !app.IsCreateForced(), "found forced app but forced tag was false") + tags[siCommon.AppTagCreateForce] = "unknown" + app = newApplicationWithTags(appID1, "default", "root.a", tags) + assert.Check(t, !app.IsCreateForced(), "found forced app but forced tag was invalid") + tags[siCommon.AppTagCreateForce] = "true" + app = newApplicationWithTags(appID1, "default", "root.a", tags) + assert.Check(t, app.IsCreateForced(), "found unforced app but forced tag was set") +} + func TestOnStatusChangeCalled(t *testing.T) { app, testHandler := newApplicationWithHandler(appID1, "default", "root.a") assert.Equal(t, New.String(), app.CurrentState(), "new app not in New state") diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index 32255d013..cfab4e761 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -145,16 +145,26 @@ func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue) (*Queue, error) return sq, nil } +// NewRecoveryQueue creates a recovery queue if it does not exist. The recovery queue +// is a dynamic queue, but has an invalid name so that it cannot be directly referenced. +func NewRecoveryQueue(parent *Queue) (*Queue, error) { + return newDynamicQueueInternal(configs.RecoveryQueue, true, parent, false) +} + // NewDynamicQueue creates a new queue to be added to the system based on the placement rules // A dynamically added queue can never be the root queue so parent must be set // lock free as it cannot be referenced yet func NewDynamicQueue(name string, leaf bool, parent *Queue) (*Queue, error) { + return newDynamicQueueInternal(name, leaf, parent, true) +} + +func newDynamicQueueInternal(name string, leaf bool, parent *Queue, validateName bool) (*Queue, error) { // fail without a parent if parent == nil { return nil, fmt.Errorf("dynamic queue can not be added without parent: %s", name) } // name might not be checked do it here - if !configs.QueueNameRegExp.MatchString(name) { + if validateName && !configs.QueueNameRegExp.MatchString(name) { return nil, fmt.Errorf("invalid queue name '%s', a name must only have alphanumeric characters, - or _, and be no longer than 64 characters", name) } sq := newBlankQueue() diff --git a/pkg/scheduler/objects/queue_test.go b/pkg/scheduler/objects/queue_test.go index e59c18247..28d1e2703 100644 --- a/pkg/scheduler/objects/queue_test.go +++ b/pkg/scheduler/objects/queue_test.go @@ -2302,6 +2302,22 @@ func TestNewConfiguredQueue(t *testing.T) { assert.Assert(t, childNonLeaf.maxResource == nil) } +func TestNewRecoveryQueue(t *testing.T) { + parent, err := createRootQueue(nil) + assert.NilError(t, err, "failed to create queue: %v", err) + recoveryQueue, err := NewRecoveryQueue(parent) + assert.NilError(t, err, "failed to create recovery queue: %v", err) + assert.Equal(t, configs.RecoveryQueueFull, recoveryQueue.GetQueuePath(), "wrong queue name") +} + +func TestNewDynamicQueueDoesNotCreateRecovery(t *testing.T) { + parent, err := createRootQueue(nil) + assert.NilError(t, err, "failed to create queue: %v", err) + if _, err := NewDynamicQueue(configs.RecoveryQueue, true, parent); err == nil { + t.Fatalf("invalid recovery queue %s was created", configs.RecoveryQueueFull) + } +} + func TestNewDynamicQueue(t *testing.T) { parent, err := createManagedQueueWithProps(nil, "parent", true, nil, nil) assert.NilError(t, err, "failed to create queue: %v", err) diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 4c50ce6b5..0d46857a8 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -305,39 +305,85 @@ func (pc *PartitionContext) AddApplication(app *objects.Application) error { return fmt.Errorf("adding application %s to partition %s, but application already existed", appID, pc.Name) } + // Check if app creation is forced + forceCreate := app.IsCreateForced() + // Put app under the queue queueName := app.GetQueuePath() + + // applications are not allowed to reference the recovery queue directly + if strings.EqualFold(queueName, configs.RecoveryQueueFull) { + if forceCreate { + // empty out queue name for now in case a matching rule can work + app.SetQueuePath("") + } else { + return fmt.Errorf("invalid queue %s for application %s", queueName, appID) + } + } + pm := pc.getPlacementManager() if pm.IsInitialised() { err := pm.PlaceApplication(app) if err != nil { - return fmt.Errorf("failed to place application %s: %v", appID, err) + if forceCreate { + queueName = configs.RecoveryQueueFull + app.SetQueuePath(queueName) + } else { + return fmt.Errorf("failed to place application %s: %v", appID, err) + } } queueName = app.GetQueuePath() if queueName == "" { - return fmt.Errorf("application rejected by placement rules: %s", appID) + if forceCreate { + queueName = configs.RecoveryQueueFull + app.SetQueuePath(configs.RecoveryQueueFull) + } else { + return fmt.Errorf("application rejected by placement rules: %s", appID) + } } } + // lock the partition and make the last change: we need to do this before creating the queues. // queue cleanup might otherwise remove the queue again before we can add the application pc.Lock() defer pc.Unlock() // we have a queue name either from placement or direct, get the queue queue := pc.getQueueInternal(queueName) - if queue == nil { - // queue must exist if not using placement rules - if !pm.IsInitialised() { - return fmt.Errorf("application '%s' rejected, cannot create queue '%s' without placement rules", appID, queueName) - } - // with placement rules the hierarchy might not exist so try and create it + + // attempt to create the queue if we are using the placement manager and not recovery queue + if queue == nil && pm.IsInitialised() && !strings.EqualFold(queueName, configs.RecoveryQueueFull) { var err error queue, err = pc.createQueue(queueName, app.GetUser()) - if err != nil { + if err != nil && !forceCreate { return fmt.Errorf("failed to create rule based queue %s for application %s", queueName, appID) } } + + // if queue is still not found, use the recovery queue if app creation is forced + if queue == nil && forceCreate { + var err error + queueName = configs.RecoveryQueueFull + app.SetQueuePath(queueName) + + // attempt to use the already-created recovery queue + queue = pc.getQueueInternal(queueName) + if queue == nil { + // create the queue since it doesn't yet exist + queue, err = pc.createRecoveryQueue() + if err != nil { + return fmt.Errorf("failed to create recovery queue %s for application %s", configs.RecoveryQueueFull, appID) + } + } + } + + // all attempts to resolve queue have failed, reject application + if queue == nil { + // queue must exist if not using placement rules and not a forced app + return fmt.Errorf("application '%s' rejected, cannot create queue '%s' without placement rules", appID, queueName) + } + // check the queue: is a leaf queue with submit access - if !queue.IsLeafQueue() || !queue.CheckSubmitAccess(app.GetUser()) { + if !queue.IsLeafQueue() || (!forceCreate && !queue.CheckSubmitAccess(app.GetUser())) { return fmt.Errorf("failed to find queue %s for application %s", queueName, appID) } @@ -484,6 +530,11 @@ func (pc *PartitionContext) GetPartitionQueues() dao.PartitionQueueDAOInfo { return PartitionQueueDAOInfo } +// Create the recovery queue. +func (pc *PartitionContext) createRecoveryQueue() (*objects.Queue, error) { + return objects.NewRecoveryQueue(pc.root) +} + // Create a queue with full hierarchy. This is called when a new queue is created from a placement rule. // The final leaf queue does not exist otherwise we would not get here. // This means that at least 1 queue (a leaf queue) will be created @@ -1165,10 +1216,10 @@ func (pc *PartitionContext) addAllocation(alloc *objects.Allocation) error { return nil } -func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation) (security.UserGroup, error) { +func (pc *PartitionContext) convertUGI(ugi *si.UserGroupInformation, forced bool) (security.UserGroup, error) { pc.RLock() defer pc.RUnlock() - return pc.userGroupCache.ConvertUGI(ugi) + return pc.userGroupCache.ConvertUGI(ugi, forced) } // calculate overall nodes resource usage and returns a map as the result, diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 49bd067fa..1465e0a18 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -925,6 +925,115 @@ func TestAddApp(t *testing.T) { } } +func TestAddAppForced(t *testing.T) { + partition, err := newBasePartition() + assert.NilError(t, err, "partition create failed") + + // add a new app to an invalid queue + app := newApplication(appID1, "default", "root.invalid") + err = partition.AddApplication(app) + if err == nil || partition.getApplication(appID1) != nil { + t.Fatalf("add application to nonexistent queue should have failed but did not") + } + + // re-add the app, but mark it as forced. this should create the recovery queue and assign the app to it + app = newApplicationTags(appID1, "default", "root.invalid", map[string]string{siCommon.AppTagCreateForce: "true"}) + err = partition.AddApplication(app) + assert.NilError(t, err, "app create failed") + partApp := partition.getApplication(appID1) + if partApp == nil { + t.Fatalf("app not found after adding to partition") + } + recoveryQueue := partition.GetQueue(configs.RecoveryQueueFull) + if recoveryQueue == nil { + t.Fatalf("recovery queue not found") + } + assert.Equal(t, configs.RecoveryQueueFull, partApp.GetQueuePath(), "wrong queue path for app2") + assert.Check(t, recoveryQueue == partApp.GetQueue(), "wrong queue for app") + assert.Equal(t, 1, len(recoveryQueue.GetCopyOfApps()), "wrong queue length") + + // add second forced app. this should use the existing recovery queue rather than recreating it + app2 := newApplicationTags(appID2, "default", "root.invalid2", map[string]string{siCommon.AppTagCreateForce: "true"}) + err = partition.AddApplication(app2) + assert.NilError(t, err, "app2 create failed") + partApp2 := partition.getApplication(appID2) + if partApp2 == nil { + t.Fatalf("app2 not found after adding to partition") + } + assert.Equal(t, configs.RecoveryQueueFull, partApp2.GetQueuePath(), "wrong queue path for app2") + assert.Check(t, recoveryQueue == partApp2.GetQueue(), "wrong queue for app2") + assert.Equal(t, 2, len(recoveryQueue.GetCopyOfApps()), "wrong queue length") + + // add third app (not forced), but referencing the recovery queue. this should fail. + app3 := newApplication(appID3, "default", configs.RecoveryQueueFull) + err = partition.AddApplication(app3) + if err == nil || partition.getApplication(appID3) != nil { + t.Fatalf("add app3 to recovery queue should have failed but did not") + } + + // re-add third app, but forced. This should succeed. + app3 = newApplicationTags(appID3, "default", configs.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}) + err = partition.AddApplication(app3) + assert.NilError(t, err, "app3 create failed") + partApp3 := partition.getApplication(appID3) + if partApp3 == nil { + t.Fatalf("app3 not found after adding to partition") + } + assert.Equal(t, configs.RecoveryQueueFull, partApp3.GetQueuePath(), "wrong queue path for app3") + assert.Check(t, recoveryQueue == partApp3.GetQueue(), "wrong queue for app3") + assert.Equal(t, 3, len(recoveryQueue.GetCopyOfApps()), "wrong queue length") +} + +func TestAddAppForcedWithPlacement(t *testing.T) { + confWith := configs.PartitionConfig{ + Name: "test", + Queues: []configs.QueueConfig{ + { + Name: "root", + Parent: true, + SubmitACL: "*", + Queues: nil, + }, + }, + PlacementRules: []configs.PlacementRule{ + { + Name: "tag", + Value: "queue", + Create: true, + }, + }, + Limits: nil, + NodeSortPolicy: configs.NodeSortingPolicy{}, + } + partition, err := newPartitionContext(confWith, rmID, nil) + assert.NilError(t, err, "test partition create failed with error") + + // add a new app using tag rule + app := newApplicationTags(appID1, "default", "", map[string]string{"queue": "root.test"}) + err = partition.AddApplication(app) + assert.NilError(t, err, "failed to add app to tagged queue") + assert.Equal(t, "root.test", app.GetQueuePath(), "app assigned to wrong queue") + + // add a second app without a tag rule + app2 := newApplicationTags(appID2, "default", "root.untagged", map[string]string{}) + err = partition.AddApplication(app2) + if err == nil || partition.getApplication(appID2) != nil { + t.Fatalf("add app2 to fixed queue should have failed but did not") + } + + // attempt to add the app again, but with forced addition + app2 = newApplicationTags(appID2, "default", "root.untagged", map[string]string{siCommon.AppTagCreateForce: "true"}) + err = partition.AddApplication(app2) + assert.NilError(t, err, "failed to add app2 to tagged queue") + assert.Equal(t, configs.RecoveryQueueFull, app2.GetQueuePath(), "app2 assigned to wrong queue") + + // add a third app, but with the recovery queue tagged + app3 := newApplicationTags(appID3, "default", configs.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}) + err = partition.AddApplication(app3) + assert.NilError(t, err, "failed to add app3 to tagged queue") + assert.Equal(t, configs.RecoveryQueueFull, app3.GetQueuePath(), "app2 assigned to wrong queue") +} + func TestAddAppTaskGroup(t *testing.T) { partition, err := newBasePartition() assert.NilError(t, err, "partition create failed") diff --git a/pkg/scheduler/utilities_test.go b/pkg/scheduler/utilities_test.go index 3233c61df..0616a6e3a 100644 --- a/pkg/scheduler/utilities_test.go +++ b/pkg/scheduler/utilities_test.go @@ -421,6 +421,10 @@ func newPlacementPartition() (*PartitionContext, error) { } func newApplication(appID, partition, queueName string) *objects.Application { + return newApplicationTags(appID, partition, queueName, nil) +} + +func newApplicationTags(appID, partition, queueName string, tags map[string]string) *objects.Application { user := security.UserGroup{ User: "testuser", Groups: []string{"testgroup"}, @@ -429,6 +433,7 @@ func newApplication(appID, partition, queueName string) *objects.Application { ApplicationID: appID, QueueName: queueName, PartitionName: partition, + Tags: tags, } return objects.NewApplication(siApp, user, nil, rmID) }