Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YUNIKORN-2978] Fix handling of reserved allocations where node differs #996

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions pkg/scheduler/objects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,12 @@ func (sn *Node) RemoveAllocation(allocationKey string) *Allocation {
}
sn.availableResource.AddTo(alloc.GetAllocatedResource())
sn.nodeEvents.SendAllocationRemovedEvent(sn.NodeID, alloc.allocationKey, alloc.GetAllocatedResource())
log.Log(log.SchedNode).Info("node allocation removed",
zap.String("appID", alloc.GetApplicationID()),
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.Stringer("allocatedResource", alloc.GetAllocatedResource()),
zap.Bool("placeholder", alloc.IsPlaceholder()),
zap.String("targetNode", sn.NodeID))
return alloc
}

Expand Down Expand Up @@ -382,6 +388,10 @@ func (sn *Node) UpdateForeignAllocation(alloc *Allocation) *Allocation {
delta := resources.Sub(newResource, existingResource)
delta.Prune()

log.Log(log.SchedNode).Info("node foreign allocation updated",
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.Stringer("deltaResource", delta),
zap.String("targetNode", sn.NodeID))
sn.occupiedResource.AddTo(delta)
sn.occupiedResource.Prune()
sn.refreshAvailableResource()
Expand Down Expand Up @@ -416,6 +426,12 @@ func (sn *Node) addAllocationInternal(alloc *Allocation, force bool) bool {
sn.availableResource.SubFrom(res)
sn.availableResource.Prune()
sn.nodeEvents.SendAllocationAddedEvent(sn.NodeID, alloc.allocationKey, res)
log.Log(log.SchedNode).Info("node allocation processed",
zap.String("appID", alloc.GetApplicationID()),
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.Stringer("allocatedResource", alloc.GetAllocatedResource()),
zap.Bool("placeholder", alloc.IsPlaceholder()),
zap.String("targetNode", sn.NodeID))
result = true
return result
}
Expand All @@ -440,6 +456,12 @@ func (sn *Node) ReplaceAllocation(allocationKey string, replace *Allocation, del
sn.allocatedResource.AddTo(delta)
sn.availableResource.SubFrom(delta)
sn.availableResource.Prune()
log.Log(log.SchedNode).Info("node allocation replaced",
zap.String("appID", replace.GetApplicationID()),
zap.String("allocationKey", replace.GetAllocationKey()),
zap.Stringer("allocatedResource", replace.GetAllocatedResource()),
zap.String("placeholderKey", allocationKey),
zap.String("targetNode", sn.NodeID))
if !before.FitIn(sn.allocatedResource) {
log.Log(log.SchedNode).Warn("unexpected increase in node usage after placeholder replacement",
zap.String("placeholder allocationKey", allocationKey),
Expand Down
51 changes: 31 additions & 20 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,44 +869,55 @@
// find the node make sure it still exists
// if the node was passed in use that ID instead of the one from the allocation
// the node ID is set when a reservation is allocated on a non-reserved node
var nodeID string
alloc := result.Request
if result.ReservedNodeID == "" {
nodeID = result.NodeID
} else {
nodeID = result.ReservedNodeID
log.Log(log.SchedPartition).Debug("Reservation allocated on different node",
zap.String("current node", result.NodeID),
zap.String("reserved node", nodeID),
zap.String("appID", appID))
}
node := pc.GetNode(nodeID)
if node == nil {
log.Log(log.SchedPartition).Info("Node was removed while allocating",
zap.String("nodeID", nodeID),
targetNodeID := result.NodeID
targetNode := pc.GetNode(targetNodeID)
if targetNode == nil {
log.Log(log.SchedPartition).Info("Target node was removed while allocating",
zap.String("nodeID", targetNodeID),

Check warning on line 877 in pkg/scheduler/partition.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/partition.go#L876-L877

Added lines #L876 - L877 were not covered by tests
zap.String("appID", appID))

// attempt to deallocate
if alloc.IsAllocated() {
allocKey := alloc.GetAllocationKey()
if _, err := app.DeallocateAsk(allocKey); err != nil {
log.Log(log.SchedPartition).Warn("Failed to unwind allocation",
zap.String("nodeID", nodeID),
zap.String("nodeID", targetNodeID),

Check warning on line 885 in pkg/scheduler/partition.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/partition.go#L885

Added line #L885 was not covered by tests
zap.String("appID", appID),
zap.String("allocationKey", allocKey),
zap.Error(err))
}
}
return nil
}

// reservation
if result.ResultType == objects.Reserved {
pc.reserve(app, node, result.Request)
pc.reserve(app, targetNode, result.Request)
return nil
}

// unreserve
if result.ResultType == objects.Unreserved || result.ResultType == objects.AllocatedReserved {
pc.unReserve(app, node, result.Request)
var reservedNodeID string
if result.ReservedNodeID == "" {
reservedNodeID = result.NodeID
} else {
reservedNodeID = result.ReservedNodeID
log.Log(log.SchedPartition).Debug("Reservation allocated on different node",
zap.String("current node", result.NodeID),
zap.String("reserved node", reservedNodeID),
zap.String("appID", appID))
}

reservedNode := pc.GetNode(reservedNodeID)
if reservedNode != nil {
pc.unReserve(app, reservedNode, result.Request)
} else {
log.Log(log.SchedPartition).Info("Reserved node was removed while allocating",
zap.String("nodeID", reservedNodeID),
zap.String("appID", appID))
}

Check warning on line 920 in pkg/scheduler/partition.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/partition.go#L917-L920

Added lines #L917 - L920 were not covered by tests
if result.ResultType == objects.Unreserved {
return nil
}
Expand All @@ -915,8 +926,8 @@
}

alloc.SetBindTime(time.Now())
alloc.SetNodeID(nodeID)
alloc.SetInstanceType(node.GetInstanceType())
alloc.SetNodeID(targetNodeID)
alloc.SetInstanceType(targetNode.GetInstanceType())

// track the number of allocations
pc.updateAllocationCount(1)
Expand All @@ -929,7 +940,7 @@
zap.String("allocationKey", result.Request.GetAllocationKey()),
zap.Stringer("allocatedResource", result.Request.GetAllocatedResource()),
zap.Bool("placeholder", result.Request.IsPlaceholder()),
zap.String("targetNode", alloc.GetNodeID()))
zap.String("targetNode", targetNodeID))
// pass the allocation result back to the RM via the cluster context
return result
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2369,6 +2369,9 @@ func TestAllocReserveNewNode(t *testing.T) {
assert.Equal(t, 0, len(node1.GetReservationKeys()), "old node should have no more reservations")
assert.Equal(t, 0, len(app.GetReservations()), "ask should have been reserved")
assertLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 16000}))
alloc2 := node2.GetAllocation("alloc-2")
assert.Assert(t, alloc2 != nil, "alloc was nil")
assert.Equal(t, nodeID2, alloc2.GetNodeID(), "wrong node id")
}

func TestTryAllocateReserve(t *testing.T) {
Expand Down