Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-1793] Handle placement rule and queue changes during initialisation #601

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ package common
const (
Empty = ""

Wildcard = "*"
Separator = ","
Space = " "
Wildcard = "*"
Separator = ","
Space = " "
AnonymousUser = "nobody"
AnonymousGroup = "nogroup"
RecoveryQueue = "@recovery@"
RecoveryQueueFull = "root." + RecoveryQueue
)
18 changes: 15 additions & 3 deletions pkg/common/security/usergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

"go.uber.org/zap"

"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
Expand Down Expand Up @@ -122,14 +123,25 @@
c.ugs = make(map[string]*UserGroup)
}

func (c *UserGroupCache) ConvertUGI(ugi *si.UserGroupInformation) (UserGroup, error) {
func (c *UserGroupCache) ConvertUGI(ugi *si.UserGroupInformation, force bool) (UserGroup, error) {
// check if we have a user to convert
if ugi == nil || ugi.User == "" {
return UserGroup{}, fmt.Errorf("empty user cannot resolve")
if force {
// app creation is forced, so we need to synthesize a user / group
ugi.User = common.AnonymousUser
craigcondit marked this conversation as resolved.
Show resolved Hide resolved
ugi.Groups = []string{common.AnonymousGroup}
} else {
return UserGroup{}, fmt.Errorf("empty user cannot resolve")
}
}
// try to resolve the user if group info is empty otherwise we just convert
if len(ugi.Groups) == 0 {
return c.GetUserGroup(ugi.User)
ug, err := c.GetUserGroup(ugi.User)
if force && (err != nil || ug.failed) {
ugi.Groups = []string{common.AnonymousGroup}

Check warning on line 141 in pkg/common/security/usergroup.go

View check run for this annotation

Codecov / codecov/patch

pkg/common/security/usergroup.go#L141

Added line #L141 was not covered by tests
} else {
return ug, err
}
}
// If groups are already present we should just convert
newUG := UserGroup{User: ugi.User}
Expand Down
14 changes: 10 additions & 4 deletions pkg/common/security/usergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@ func TestConvertUGI(t *testing.T) {
User: "",
Groups: nil,
}
ug, err := testCache.ConvertUGI(ugi)
ug, err := testCache.ConvertUGI(ugi, false)
if err == nil {
t.Errorf("empty user convert should have failed and did not: %v", ug)
}
// try known user without groups
ugi.User = "testuser1"
ug, err = testCache.ConvertUGI(ugi)
ug, err = testCache.ConvertUGI(ugi, false)
if err != nil {
t.Errorf("known user, no groups, convert should not have failed: %v", err)
}
Expand All @@ -213,15 +213,21 @@ func TestConvertUGI(t *testing.T) {
}
// try unknown user without groups
ugi.User = "unknown"
ug, err = testCache.ConvertUGI(ugi)
ug, err = testCache.ConvertUGI(ugi, false)
if err == nil {
t.Errorf("unknown user, no groups, convert should have failed: %v", ug)
}
// try empty user when forced
ugi.User = ""
ug, err = testCache.ConvertUGI(ugi, true)
if err != nil {
t.Errorf("empty user but forced, convert should not have failed: %v", err)
}
// try unknown user with groups
ugi.User = "unknown2"
group := "passedin"
ugi.Groups = []string{group}
ug, err = testCache.ConvertUGI(ugi)
ug, err = testCache.ConvertUGI(ugi, false)
if err != nil {
t.Errorf("unknown user with groups, convert should not have failed: %v", err)
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,29 @@
return policy != nil && policy.AllowPreemptOther
}

// IsAppCreationForced returns true if the application creation is triggered by the shim
// reporting an existing allocation. In this case, it needs to be accepted regardless
// of whether it maps to a valid queue.
func IsAppCreationForced(tags map[string]string) bool {
tagVal := ""
for key, val := range tags {
if strings.EqualFold(key, interfaceCommon.AppTagCreateForce) {
tagVal = val
break
}
}
result, err := strconv.ParseBool(tagVal)
if err != nil {
return false
}
return result
}

// IsRecoveryQueue returns true if the given queue represents the recovery queue
func IsRecoveryQueue(queueName string) bool {
return strings.EqualFold(queueName, RecoveryQueueFull)

Check warning on line 214 in pkg/common/utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/common/utils.go#L213-L214

Added lines #L213 - L214 were not covered by tests
}

// ZeroTimeInUnixNano return the unix nano or nil if the time is zero.
func ZeroTimeInUnixNano(t time.Time) *int64 {
if t.IsZero() {
Expand Down
12 changes: 12 additions & 0 deletions pkg/common/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,18 @@ func TestAllowPreemptOther(t *testing.T) {
assert.Check(t, !IsAllowPreemptOther(&si.PreemptionPolicy{AllowPreemptOther: false}), "Preempt other should not be allowed if policy does not allow")
}

func TestIsAppCreationForced(t *testing.T) {
assert.Check(t, !IsAppCreationForced(nil), "nil tags should not result in forced app creation")
tags := make(map[string]string)
assert.Check(t, !IsAppCreationForced(tags), "empty tags should not result in forced app creation")
tags[common.AppTagCreateForce] = "false"
assert.Check(t, !IsAppCreationForced(tags), "false creation tag should not result in forced app creation")
tags[common.AppTagCreateForce] = "invalid"
assert.Check(t, !IsAppCreationForced(tags), "invalid creation tag should not result in forced app creation")
tags[common.AppTagCreateForce] = "true"
assert.Check(t, IsAppCreationForced(tags), "creation tag should result in forced app creation")
}

func TestConvertSITimeoutWithAdjustment(t *testing.T) {
created := time.Now().Unix() - 600
defaultTimeout := 15 * time.Minute
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@
}
// convert and resolve the user: cache can be set per partition
// need to do this before we create the application
ugi, err := partition.convertUGI(app.Ugi)
ugi, err := partition.convertUGI(app.Ugi, common.IsAppCreationForced(app.Tags))

Check warning on line 527 in pkg/scheduler/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/context.go#L527

Added line #L527 was not covered by tests
if err != nil {
rejectedApps = append(rejectedApps, &si.RejectedApplication{
ApplicationID: app.ApplicationID,
Expand Down
23 changes: 12 additions & 11 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ type StateLogEntry struct {
}

type Application struct {
ApplicationID string
Partition string
SubmissionTime time.Time
ApplicationID string // application ID
Partition string // partition Name
SubmissionTime time.Time // time application was submitted
tags map[string]string // application tags used in scheduling

// Private fields need protection
// Private mutable fields need protection
queuePath string
queue *Queue // queue the application is running in
pending *resources.Resource // pending resources from asks for the app
reservations map[string]*reservation // a map of reservations
requests map[string]*AllocationAsk // a map of asks
sortedRequests sortedRequests
user security.UserGroup // owner of the application
tags map[string]string // application tags used in scheduling
allocatedResource *resources.Resource // total allocated resources
sortedRequests sortedRequests // list of requests pre-sorted
user security.UserGroup // owner of the application
allocatedResource *resources.Resource // total allocated resources

usedResource *resources.UsedResource // keep track of resource usage of the application

Expand Down Expand Up @@ -1874,9 +1874,6 @@ func (sa *Application) GetUser() security.UserGroup {
// Get a tag from the application
// Note: tags are not case sensitive
func (sa *Application) GetTag(tag string) string {
craigcondit marked this conversation as resolved.
Show resolved Hide resolved
sa.RLock()
defer sa.RUnlock()

tagVal := ""
for key, val := range sa.tags {
if strings.EqualFold(key, tag) {
Expand All @@ -1887,6 +1884,10 @@ func (sa *Application) GetTag(tag string) string {
return tagVal
}

func (sa *Application) IsCreateForced() bool {
return common.IsAppCreationForced(sa.tags)
}

func (sa *Application) SetTerminatedCallback(callback func(appID string)) {
sa.Lock()
defer sa.Unlock()
Expand Down
17 changes: 17 additions & 0 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,23 @@ func TestGetTag(t *testing.T) {
assert.Equal(t, tag, "test value", "expected tag value")
}

func TestIsCreateForced(t *testing.T) {
app := newApplicationWithTags(appID1, "default", "root.a", nil)
assert.Check(t, !app.IsCreateForced(), "found forced app but tags nil")
tags := make(map[string]string)
app = newApplicationWithTags(appID1, "default", "root.a", tags)
assert.Check(t, !app.IsCreateForced(), "found forced app but tags empty")
tags[siCommon.AppTagCreateForce] = "false"
app = newApplicationWithTags(appID1, "default", "root.a", tags)
assert.Check(t, !app.IsCreateForced(), "found forced app but forced tag was false")
tags[siCommon.AppTagCreateForce] = "unknown"
app = newApplicationWithTags(appID1, "default", "root.a", tags)
assert.Check(t, !app.IsCreateForced(), "found forced app but forced tag was invalid")
tags[siCommon.AppTagCreateForce] = "true"
app = newApplicationWithTags(appID1, "default", "root.a", tags)
assert.Check(t, app.IsCreateForced(), "found unforced app but forced tag was set")
}

func TestOnStatusChangeCalled(t *testing.T) {
app, testHandler := newApplicationWithHandler(appID1, "default", "root.a")
assert.Equal(t, New.String(), app.CurrentState(), "new app not in New state")
Expand Down
42 changes: 37 additions & 5 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import (
"context"
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -29,6 +30,7 @@
"github.com/looplab/fsm"
"go.uber.org/zap"

"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
Expand All @@ -38,7 +40,7 @@
"github.com/apache/yunikorn-core/pkg/scheduler/objects/template"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-scheduler-interface/lib/go/common"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
)

var (
Expand Down Expand Up @@ -145,6 +147,25 @@
return sq, nil
}

// NewRecoveryQueue creates a recovery queue if it does not exist. The recovery queue
// is a dynamic queue, but has an invalid name so that it cannot be directly referenced.
func NewRecoveryQueue(parent *Queue) (*Queue, error) {
craigcondit marked this conversation as resolved.
Show resolved Hide resolved
if parent == nil {
return nil, errors.New("recovery queue cannot be created with nil parent")
}
if parent.GetQueuePath() != configs.RootQueue {
return nil, fmt.Errorf("recovery queue cannot be created with non-root parent: %s", parent.GetQueuePath())
}
queue, err := newDynamicQueueInternal(common.RecoveryQueue, true, parent)
if err == nil {
queue.Lock()
defer queue.Unlock()
queue.submitACL = security.ACL{}
queue.sortType = policies.FifoSortPolicy
}
return queue, err
}

// NewDynamicQueue creates a new queue to be added to the system based on the placement rules
// A dynamically added queue can never be the root queue so parent must be set
// lock free as it cannot be referenced yet
Expand All @@ -157,6 +178,10 @@
if !configs.QueueNameRegExp.MatchString(name) {
return nil, fmt.Errorf("invalid queue name '%s', a name must only have alphanumeric characters, - or _, and be no longer than 64 characters", name)
}
return newDynamicQueueInternal(name, leaf, parent)
}

func newDynamicQueueInternal(name string, leaf bool, parent *Queue) (*Queue, error) {
sq := newBlankQueue()
sq.Name = strings.ToLower(name)
sq.QueuePath = parent.QueuePath + configs.DOT + sq.Name
Expand Down Expand Up @@ -384,12 +409,15 @@
func (sq *Queue) UpdateQueueProperties() {
sq.Lock()
defer sq.Unlock()

if common.IsRecoveryQueue(sq.QueuePath) {
// recovery queue properties should never be updated
sq.sortType = policies.FifoSortPolicy
return
}
if !sq.isLeaf {
// set the sorting type for parent queues
sq.sortType = policies.FairSortPolicy
}

// walk over all properties and process
var err error
for key, value := range sq.properties {
Expand Down Expand Up @@ -535,6 +563,10 @@
// The check is performed recursively: i.e. access to the parent allows access to this queue.
// This will check both submitACL and adminACL.
func (sq *Queue) CheckSubmitAccess(user security.UserGroup) bool {
if common.IsRecoveryQueue(sq.QueuePath) {
// recovery queue can never pass ACL checks
return false
}

Check warning on line 569 in pkg/scheduler/objects/queue.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/objects/queue.go#L566-L569

Added lines #L566 - L569 were not covered by tests
sq.RLock()
allow := sq.submitACL.CheckAccess(user) || sq.adminACL.CheckAccess(user)
sq.RUnlock()
Expand Down Expand Up @@ -680,9 +712,9 @@
sq.queueEvents.sendNewApplicationEvent(appID)
// YUNIKORN-199: update the quota from the namespace
// get the tag with the quota
quota := app.GetTag(common.AppTagNamespaceResourceQuota)
quota := app.GetTag(siCommon.AppTagNamespaceResourceQuota)
// get the tag with the guaranteed resource
guaranteed := app.GetTag(common.AppTagNamespaceResourceGuaranteed)
guaranteed := app.GetTag(siCommon.AppTagNamespaceResourceGuaranteed)
if quota == "" && guaranteed == "" {
return
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/scheduler/objects/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2302,6 +2302,40 @@ func TestNewConfiguredQueue(t *testing.T) {
assert.Assert(t, childNonLeaf.maxResource == nil)
}

func TestNewRecoveryQueue(t *testing.T) {
var err error
if _, err = NewRecoveryQueue(nil); err == nil {
t.Fatalf("recovery queue creation should fail with nil parent")
}

parent, err := createManagedQueueWithProps(nil, "parent", true, nil, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
if _, err = NewRecoveryQueue(parent); err == nil {
t.Fatalf("recovery queue creation should fail with non-root parent")
}

parentConfig := configs.QueueConfig{
Name: "root",
Parent: true,
Properties: map[string]string{configs.ApplicationSortPolicy: "fair"},
ChildTemplate: configs.ChildTemplate{Properties: map[string]string{configs.ApplicationSortPolicy: "fair"}},
}
parent, err = NewConfiguredQueue(parentConfig, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
recoveryQueue, err := NewRecoveryQueue(parent)
assert.NilError(t, err, "failed to create recovery queue: %v", err)
assert.Equal(t, common.RecoveryQueueFull, recoveryQueue.GetQueuePath(), "wrong queue name")
assert.Equal(t, policies.FifoSortPolicy, recoveryQueue.getSortType(), "wrong sort type")
}

func TestNewDynamicQueueDoesNotCreateRecovery(t *testing.T) {
parent, err := createRootQueue(nil)
assert.NilError(t, err, "failed to create queue: %v", err)
if _, err := NewDynamicQueue(common.RecoveryQueue, true, parent); err == nil {
t.Fatalf("invalid recovery queue %s was created", common.RecoveryQueueFull)
}
}

func TestNewDynamicQueue(t *testing.T) {
parent, err := createManagedQueueWithProps(nil, "parent", true, nil, nil)
assert.NilError(t, err, "failed to create queue: %v", err)
Expand Down
Loading