-
Notifications
You must be signed in to change notification settings - Fork 234
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Refactored CreateRecoveryQueue / CreateDynamicQueue implementation
- Loading branch information
1 parent
909ecb3
commit 89a304d
Showing
2 changed files
with
371 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
/* | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package placement | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
|
||
"go.uber.org/zap" | ||
|
||
"github.com/apache/yunikorn-core/pkg/common/configs" | ||
"github.com/apache/yunikorn-core/pkg/log" | ||
"github.com/apache/yunikorn-core/pkg/scheduler/objects" | ||
"github.com/apache/yunikorn-core/pkg/scheduler/placement/types" | ||
) | ||
|
||
type fixedRule struct { | ||
basicRule | ||
queue string | ||
qualified bool | ||
} | ||
|
||
// A rule to place an application based on the queue in the configuration. | ||
// If the queue provided is fully qualified, starts with "root.", the parent rule is skipped and the queue is created as | ||
// configured. If the queue is not qualified all "." characters will be replaced and the parent rule run before making | ||
// the queue name fully qualified. | ||
func (fr *fixedRule) getName() string { | ||
return types.Fixed | ||
} | ||
|
||
func (fr *fixedRule) initialise(conf configs.PlacementRule) error { | ||
fr.queue = normalise(conf.Value) | ||
if fr.queue == "" { | ||
return fmt.Errorf("a fixed queue rule must have a queue name set") | ||
} | ||
fr.create = conf.Create | ||
fr.filter = newFilter(conf.Filter) | ||
// if we have a fully qualified queue name already we should not have a parent | ||
fr.qualified = strings.HasPrefix(fr.queue, configs.RootQueue) | ||
if fr.qualified && conf.Parent != nil { | ||
return fmt.Errorf("cannot have a fixed queue rule with qualified queue getName and a parent rule: %v", conf) | ||
} | ||
var err = error(nil) | ||
if conf.Parent != nil { | ||
fr.parent, err = newRule(*conf.Parent) | ||
} | ||
return err | ||
} | ||
|
||
func (fr *fixedRule) placeApplication(app *objects.Application, queueFn func(string) *objects.Queue) (string, error) { | ||
// before anything run the filter | ||
if !fr.filter.allowUser(app.GetUser()) { | ||
log.Log(log.Config).Debug("Fixed rule filtered", | ||
zap.String("application", app.ApplicationID), | ||
zap.Any("user", app.GetUser()), | ||
zap.String("queueName", fr.queue)) | ||
return "", nil | ||
} | ||
var parentName string | ||
var err error | ||
queueName := fr.queue | ||
// if the fixed queue is already fully qualified skip the parent check | ||
if !fr.qualified { | ||
// run the parent rule if set | ||
if fr.parent != nil { | ||
parentName, err = fr.parent.placeApplication(app, queueFn) | ||
// failed parent rule, fail this rule | ||
if err != nil { | ||
return "", err | ||
} | ||
// rule did not return a parent: this could be filter or create flag related | ||
if parentName == "" { | ||
return "", nil | ||
} | ||
// check if this is a parent queue and qualify it | ||
if !strings.HasPrefix(parentName, configs.RootQueue+configs.DOT) { | ||
parentName = configs.RootQueue + configs.DOT + parentName | ||
} | ||
// if the parent queue exists it cannot be a leaf | ||
parentQueue := queueFn(parentName) | ||
if parentQueue != nil && parentQueue.IsLeafQueue() { | ||
return "", fmt.Errorf("parent rule returned a leaf queue: %s", parentName) | ||
} | ||
} | ||
// the parent is set from the rule otherwise set it to the root | ||
if parentName == "" { | ||
parentName = configs.RootQueue | ||
} | ||
queueName = parentName + configs.DOT + fr.queue | ||
} | ||
// Log the result before we really create | ||
log.Log(log.Config).Debug("Fixed rule intermediate result", | ||
zap.String("application", app.ApplicationID), | ||
zap.String("queue", queueName)) | ||
// get the queue object | ||
queue := queueFn(queueName) | ||
// if we cannot create the queue must exist | ||
if !fr.create && queue == nil { | ||
return "", nil | ||
} | ||
log.Log(log.Config).Info("Fixed rule application placed", | ||
zap.String("application", app.ApplicationID), | ||
zap.String("queue", queueName)) | ||
return queueName, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,250 @@ | ||
/* | ||
Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package placement | ||
|
||
import ( | ||
"testing" | ||
|
||
"gotest.tools/v3/assert" | ||
|
||
"github.com/apache/yunikorn-core/pkg/common/configs" | ||
"github.com/apache/yunikorn-core/pkg/common/security" | ||
) | ||
|
||
func TestFixedRule(t *testing.T) { | ||
conf := configs.PlacementRule{ | ||
Name: "fixed", | ||
} | ||
fr, err := newRule(conf) | ||
if err == nil || fr != nil { | ||
t.Errorf("fixed rule create did not fail without queue name, err 'nil', rule: %v", fr) | ||
} | ||
conf = configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "testqueue", | ||
} | ||
fr, err = newRule(conf) | ||
if err != nil || fr == nil { | ||
t.Errorf("fixed rule create failed with queue name, err %v", err) | ||
} | ||
// trying to create using a parent with a fully qualified child | ||
conf = configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "root.testchild", | ||
Parent: &configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "testparent", | ||
}, | ||
} | ||
fr, err = newRule(conf) | ||
if err == nil || fr != nil { | ||
t.Errorf("fixed rule create did not fail with parent rule and qualified child queue name, err 'nil', rule: %v", fr) | ||
} | ||
} | ||
|
||
func TestFixedRulePlace(t *testing.T) { | ||
// Create the structure for the test | ||
data := ` | ||
partitions: | ||
- name: default | ||
queues: | ||
- name: testqueue | ||
- name: testparent | ||
queues: | ||
- name: testchild | ||
` | ||
err := initQueueStructure([]byte(data)) | ||
assert.NilError(t, err, "setting up the queue config failed") | ||
|
||
user := security.UserGroup{ | ||
User: "testuser", | ||
Groups: []string{}, | ||
} | ||
tags := make(map[string]string) | ||
app := newApplication("app1", "default", "ignored", user, tags, nil, "") | ||
|
||
// fixed queue that exists directly under the root | ||
conf := configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "testqueue", | ||
} | ||
var fr rule | ||
fr, err = newRule(conf) | ||
if err != nil || fr == nil { | ||
t.Errorf("fixed rule create failed with queue name, err %v", err) | ||
} | ||
var queue string | ||
queue, err = fr.placeApplication(app, queueFunc) | ||
if queue != "root.testqueue" || err != nil { | ||
t.Errorf("fixed rule failed to place queue in correct queue '%s', err %v", queue, err) | ||
} | ||
|
||
// fixed queue that exists directly in hierarchy | ||
conf = configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "root.testparent.testchild", | ||
} | ||
fr, err = newRule(conf) | ||
if err != nil || fr == nil { | ||
t.Errorf("fixed rule create failed with queue name, err %v", err) | ||
} | ||
queue, err = fr.placeApplication(app, queueFunc) | ||
if queue != "root.testparent.testchild" || err != nil { | ||
t.Errorf("fixed rule failed to place queue in correct queue '%s', err %v", queue, err) | ||
} | ||
|
||
// fixed queue that does not exists | ||
conf = configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "newqueue", | ||
Create: true, | ||
} | ||
fr, err = newRule(conf) | ||
if err != nil || fr == nil { | ||
t.Errorf("fixed rule create failed with queue name, err %v", err) | ||
} | ||
queue, err = fr.placeApplication(app, queueFunc) | ||
if queue != "root.newqueue" || err != nil { | ||
t.Errorf("fixed rule failed to place queue in to be created queue '%s', err %v", queue, err) | ||
} | ||
|
||
// trying to place in a parent queue should not fail: failure happens on create in this case | ||
conf = configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "root.testparent", | ||
} | ||
fr, err = newRule(conf) | ||
if err != nil || fr == nil { | ||
t.Errorf("fixed rule create failed with queue name, err %v", err) | ||
} | ||
queue, err = fr.placeApplication(app, queueFunc) | ||
if queue != "root.testparent" || err != nil { | ||
t.Errorf("fixed rule did fail with parent queue '%s', error %v", queue, err) | ||
} | ||
|
||
// trying to place in a child using a parent | ||
conf = configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "testchild", | ||
Parent: &configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "testparent", | ||
}, | ||
} | ||
fr, err = newRule(conf) | ||
if err != nil || fr == nil { | ||
t.Errorf("fixed rule create failed with queue name, err %v", err) | ||
} | ||
queue, err = fr.placeApplication(app, queueFunc) | ||
if queue != "root.testparent.testchild" || err != nil { | ||
t.Errorf("fixed rule with parent queue should not have failed '%s', error %v", queue, err) | ||
} | ||
} | ||
|
||
func TestFixedRuleParent(t *testing.T) { | ||
err := initQueueStructure([]byte(confParentChild)) | ||
assert.NilError(t, err, "setting up the queue config failed") | ||
|
||
user := security.UserGroup{ | ||
User: "testuser", | ||
Groups: []string{}, | ||
} | ||
tags := make(map[string]string) | ||
app := newApplication("app1", "default", "ignored", user, tags, nil, "") | ||
|
||
// trying to place in a child using a parent, fail to create child | ||
conf := configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "nonexist", | ||
Create: false, | ||
Parent: &configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "testparent", | ||
}, | ||
} | ||
var fr rule | ||
fr, err = newRule(conf) | ||
if err != nil || fr == nil { | ||
t.Errorf("fixed rule create failed with queue name, err %v", err) | ||
} | ||
var queue string | ||
queue, err = fr.placeApplication(app, queueFunc) | ||
if queue != "" || err != nil { | ||
t.Errorf("fixed rule with create false for child should have failed and gave '%s', error %v", queue, err) | ||
} | ||
|
||
// trying to place in a child using a non creatable parent | ||
conf = configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "testchild", | ||
Create: true, | ||
Parent: &configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "testparentnew", | ||
Create: false, | ||
}, | ||
} | ||
fr, err = newRule(conf) | ||
if err != nil || fr == nil { | ||
t.Errorf("fixed rule create failed with queue name, err %v", err) | ||
} | ||
queue, err = fr.placeApplication(app, queueFunc) | ||
if queue != "" || err != nil { | ||
t.Errorf("fixed rule with non existing parent queue should have failed '%s', error %v", queue, err) | ||
} | ||
|
||
// trying to place in a child using a creatable parent | ||
conf = configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "testchild", | ||
Create: true, | ||
Parent: &configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "testparentnew", | ||
Create: true, | ||
}, | ||
} | ||
fr, err = newRule(conf) | ||
if err != nil || fr == nil { | ||
t.Errorf("fixed rule create failed with queue name, err %v", err) | ||
} | ||
queue, err = fr.placeApplication(app, queueFunc) | ||
if queue != nameParentChild || err != nil { | ||
t.Errorf("fixed rule with non existing parent queue should created '%s', error %v", queue, err) | ||
} | ||
|
||
// trying to place in a child using a parent which is defined as a leaf | ||
conf = configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "nonexist", | ||
Create: true, | ||
Parent: &configs.PlacementRule{ | ||
Name: "fixed", | ||
Value: "testchild", | ||
}, | ||
} | ||
fr, err = newRule(conf) | ||
if err != nil || fr == nil { | ||
t.Errorf("fixed rule create failed with queue name, err %v", err) | ||
} | ||
queue, err = fr.placeApplication(app, queueFunc) | ||
if queue != "" || err == nil { | ||
t.Errorf("fixed rule with parent declared as leaf should have failed '%s', error %v", queue, err) | ||
} | ||
} |