Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2907] Queue config processing log spew #1009

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@
part, ok := cc.partitions[p.Name]
if ok {
// make sure the new info passes all checks
_, err = newPartitionContext(p, rmID, nil)
_, err = newPartitionContextForValidation(p, rmID, nil)

Check warning on line 368 in pkg/scheduler/context.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/context.go#L368

Added line #L368 was not covered by tests
if err != nil {
return err
}
Expand Down
22 changes: 18 additions & 4 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@
// NewConfiguredQueue creates a new queue from scratch based on the configuration
// lock free as it cannot be referenced yet
func NewConfiguredQueue(conf configs.QueueConfig, parent *Queue) (*Queue, error) {
queue, err := newConfiguredQueueInternal(conf, parent)
if queue != nil {
queue.queueEvents = schedEvt.NewQueueEvents(events.GetEventSystem())
log.Log(log.SchedQueue).Info("configured queue added to scheduler",
zap.String("queueName", queue.QueuePath))
queue.queueEvents.SendNewQueueEvent(queue.QueuePath, queue.isManaged)
}
return queue, err
}

// NewConfiguredQueueForValidation is used to validate the queue configuration.
// It works similarly to NewConfiguredQueue but neither logs the queue creation nor sends a queue event.
// lock free as it cannot be referenced yet
func NewConfiguredQueueForValidation(conf configs.QueueConfig, parent *Queue) (*Queue, error) {
return newConfiguredQueueInternal(conf, parent)

Check warning on line 133 in pkg/scheduler/objects/queue.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/objects/queue.go#L132-L133

Added lines #L132 - L133 were not covered by tests
}

func newConfiguredQueueInternal(conf configs.QueueConfig, parent *Queue) (*Queue, error) {
sq := newBlankQueue()
sq.Name = strings.ToLower(conf.Name)
sq.QueuePath = strings.ToLower(conf.Name)
Expand Down Expand Up @@ -144,10 +162,6 @@
} else {
sq.UpdateQueueProperties()
}
sq.queueEvents = schedEvt.NewQueueEvents(events.GetEventSystem())
log.Log(log.SchedQueue).Info("configured queue added to scheduler",
zap.String("queueName", sq.QueuePath))
sq.queueEvents.SendNewQueueEvent(sq.QueuePath, sq.isManaged)

return sq, nil
}
Expand Down
81 changes: 76 additions & 5 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,29 @@
locking.RWMutex
}

// newPartitionContextForValidation initializes a shadow partition based on the configuration.
// The shadow partition is used to validate the configuration, it is not used for scheduling.
func newPartitionContextForValidation(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value *PartitionContext is never used from this function, so you might as well just remove it and just call it validateConfiguration(). Unless of course, you create unit tests which actually use it for some kind of verification... See other comments.

pc, err := newPartitionContextInternal(conf, rmID, cc)
if pc != nil {
if err := pc.initialPartitionFromConfigForValidation(conf); err != nil {
return nil, err
}

Check warning on line 89 in pkg/scheduler/partition.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/partition.go#L84-L89

Added lines #L84 - L89 were not covered by tests
}
return pc, err

Check warning on line 91 in pkg/scheduler/partition.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/partition.go#L91

Added line #L91 was not covered by tests
}

func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) {
pc, err := newPartitionContextInternal(conf, rmID, cc)
if pc != nil {
if err := pc.initialPartitionFromConfig(conf); err != nil {
return nil, err
}
}
return pc, err
}

func newPartitionContextInternal(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) {
if conf.Name == "" || rmID == "" {
log.Log(log.SchedPartition).Info("partition cannot be created",
zap.String("partition name", conf.Name),
Expand All @@ -98,13 +120,41 @@
foreignAllocs: make(map[string]*objects.Allocation),
}
pc.partitionManager = newPartitionManager(pc, cc)
if err := pc.initialPartitionFromConfig(conf); err != nil {
return nil, err
}

return pc, nil
}

// initialPartitionFromConfigForValidation is used to validate the partition configuration.
// It works similarly to initialPartitionFromConfig but neither logs the queue creation, sends a queue event, logs the node sorting policy,
// nor updates user settings.
func (pc *PartitionContext) initialPartitionFromConfigForValidation(conf configs.PartitionConfig) error {
if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
return fmt.Errorf("partition cannot be created without root queue")
}

Check warning on line 133 in pkg/scheduler/partition.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/partition.go#L130-L133

Added lines #L130 - L133 were not covered by tests

// Setup the queue structure: root first it should be the only queue at this level
// Add the rest of the queue structure recursively
queueConf := conf.Queues[0]
var err error
if pc.root, err = objects.NewConfiguredQueueForValidation(queueConf, nil); err != nil {
return err
}

Check warning on line 141 in pkg/scheduler/partition.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/partition.go#L137-L141

Added lines #L137 - L141 were not covered by tests
// recursively add the queues to the root
if err = pc.addQueueForValidation(queueConf.Queues, pc.root); err != nil {
return err
}

Check warning on line 145 in pkg/scheduler/partition.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/partition.go#L143-L145

Added lines #L143 - L145 were not covered by tests

// We need to pass in the locked version of the GetQueue function.
// Placing an application will not have a lock on the partition context.
pc.placementManager = placement.NewPlacementManager(conf.PlacementRules, pc.GetQueue)
// get the user group cache for the partition
pc.userGroupCache = security.GetUserGroupCache("")
pc.updateNodeSortingPolicyForValidation(conf)
pc.updatePreemption(conf)
Comment on lines +147 to +153
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines are unnecessary here. No return values are checked, so whether they run or not is irrelevant.


return nil

Check warning on line 155 in pkg/scheduler/partition.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/partition.go#L149-L155

Added lines #L149 - L155 were not covered by tests
}

// Initialise the partition
func (pc *PartitionContext) initialPartitionFromConfig(conf configs.PartitionConfig) error {
if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
Expand Down Expand Up @@ -138,6 +188,13 @@
return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name)
}

// updateNodeSortingPolicyForValidation is used to validate the partition configuration.
// It works similarly to updateNodeSortingPolicy but without logging.
// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
func (pc *PartitionContext) updateNodeSortingPolicyForValidation(conf configs.PartitionConfig) {
pc.updateNodeSortingPolicyInternal(conf)

Check warning on line 195 in pkg/scheduler/partition.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/partition.go#L194-L195

Added lines #L194 - L195 were not covered by tests
}

// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
func (pc *PartitionContext) updateNodeSortingPolicy(conf configs.PartitionConfig) {
var configuredPolicy policies.SortingPolicy
Expand All @@ -150,6 +207,10 @@
log.Log(log.SchedPartition).Info("NodeSorting policy set from config",
zap.Stringer("policyName", configuredPolicy))
}
pc.updateNodeSortingPolicyInternal(conf)
}

func (pc *PartitionContext) updateNodeSortingPolicyInternal(conf configs.PartitionConfig) {
pc.nodes.SetNodeSortingPolicy(objects.NewNodeSortingPolicy(conf.NodeSortPolicy.Type, conf.NodeSortPolicy.ResourceWeights))
}

Expand Down Expand Up @@ -191,17 +252,27 @@
return ugm.GetUserManager().UpdateConfig(queueConf, conf.Queues[0].Name)
}

func (pc *PartitionContext) addQueueForValidation(conf []configs.QueueConfig, parent *objects.Queue) error {
err := pc.addQueueInternal(conf, parent, objects.NewConfiguredQueueForValidation)
return err

Check warning on line 257 in pkg/scheduler/partition.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/partition.go#L255-L257

Added lines #L255 - L257 were not covered by tests
}

// Process the config structure and create a queue info tree for this partition
func (pc *PartitionContext) addQueue(conf []configs.QueueConfig, parent *objects.Queue) error {
err := pc.addQueueInternal(conf, parent, objects.NewConfiguredQueue)
return err
}

func (pc *PartitionContext) addQueueInternal(conf []configs.QueueConfig, parent *objects.Queue, newQueueFn func(configs.QueueConfig, *objects.Queue) (*objects.Queue, error)) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered passing down a boolean flag validate? All these method duplications - it's a bit dubious to me. For example, passing a function pointer to run code whether it's a validation or not just doesn't seem right.

NewConfiguredQueue() definitely has some callers, but it's not the end of the world. I don't know where others stand on this, but I'm in favor of a flag.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pbacsko Thanks for reviewing! I'm not sure about these method duplications either.

My first thought is like you said that NewConfiguredQueue() has some callers, adding a flag would require all of them to include an additional parameter. But after implementing these method duplications, the code feels redundant, and it turns out that more test cases need to be covered.

I'll try the flag approach. Thanks a lot, my confusion is now cleared!

// create the queue at this level
for _, queueConf := range conf {
thisQueue, err := objects.NewConfiguredQueue(queueConf, parent)
thisQueue, err := newQueueFn(queueConf, parent)
if err != nil {
return err
}
// recursive create the queues below
if len(queueConf.Queues) > 0 {
err = pc.addQueue(queueConf.Queues, thisQueue)
err = pc.addQueueInternal(queueConf.Queues, thisQueue, newQueueFn)
if err != nil {
return err
}
Expand Down
Loading