Skip to content

Commit

Permalink
[YUNIKORN-1888] Allow allocations to be added to existing nodes
Browse files Browse the repository at this point in the history
Adds the ability to assign existing allocations dynamically to an
already created node. This allows "late" events to be processed
properly.
  • Loading branch information
craigcondit committed Aug 3, 2023
1 parent 91bf166 commit ab89786
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module github.com/apache/yunikorn-core
go 1.20

require (
github.com/apache/yunikorn-scheduler-interface v0.0.0-20230731151735-8931af4dda26
github.com/apache/yunikorn-scheduler-interface v0.0.0-20230802152856-11ee347daef3
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20230731151735-8931af4dda26 h1:78Ow8OCXd7eSKN6tnpG27sdP2rUh//vcxFHaRDYc31Y=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20230731151735-8931af4dda26/go.mod h1:/n67iTTOytyVor6wETVjleqzsp/NxCUmPslHcTvJ+Nw=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20230802152856-11ee347daef3 h1:48h8KOMqvJjadcRGZhcYJMjGELSTOGsUJsjMqo7BHaU=
github.com/apache/yunikorn-scheduler-interface v0.0.0-20230802152856-11ee347daef3/go.mod h1:/n67iTTOytyVor6wETVjleqzsp/NxCUmPslHcTvJ+Nw=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
5 changes: 5 additions & 0 deletions pkg/rmproxy/rmevent/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ type RMRejectedAllocationAskEvent struct {
RejectedAllocationAsks []*si.RejectedAllocationAsk
}

type RMRejectedAllocationEvent struct {
RmID string
RejectedAllocations []*si.RejectedAllocation
}

type RMReleaseAllocationEvent struct {
RmID string
ReleasedAllocations []*si.AllocationRelease
Expand Down
26 changes: 24 additions & 2 deletions pkg/rmproxy/rmproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (rmp *RMProxy) processRMReleaseAllocationAskEvent(event *rmevent.RMReleaseA
rmp.triggerUpdateAllocation(event.RmID, response)
}

func (rmp *RMProxy) processUpdatePartitionConfigsEvent(event *rmevent.RMRejectedAllocationAskEvent) {
func (rmp *RMProxy) processRMRejectedAllocationAskEvent(event *rmevent.RMRejectedAllocationAskEvent) {
rmp.RLock()
defer rmp.RUnlock()
if len(event.RejectedAllocationAsks) == 0 {
Expand All @@ -192,6 +192,19 @@ func (rmp *RMProxy) processUpdatePartitionConfigsEvent(event *rmevent.RMRejected
metrics.GetSchedulerMetrics().AddRejectedContainers(len(event.RejectedAllocationAsks))
}

func (rmp *RMProxy) processRMRejectedAllocationEvent(event *rmevent.RMRejectedAllocationEvent) {
rmp.RLock()
defer rmp.RUnlock()
if len(event.RejectedAllocations) == 0 {
return
}
response := &si.AllocationResponse{
RejectedAllocations: event.RejectedAllocations,
}
rmp.triggerUpdateAllocation(event.RmID, response)
metrics.GetSchedulerMetrics().AddRejectedContainers(len(event.RejectedAllocations))
}

func (rmp *RMProxy) processRMNodeUpdateEvent(event *rmevent.RMNodeUpdateEvent) {
rmp.RLock()
defer rmp.RUnlock()
Expand Down Expand Up @@ -224,7 +237,9 @@ func (rmp *RMProxy) handleRMEvents() {
case *rmevent.RMReleaseAllocationEvent:
rmp.processRMReleaseAllocationEvent(v)
case *rmevent.RMRejectedAllocationAskEvent:
rmp.processUpdatePartitionConfigsEvent(v)
rmp.processRMRejectedAllocationAskEvent(v)
case *rmevent.RMRejectedAllocationEvent:
rmp.processRMRejectedAllocationEvent(v)
case *rmevent.RMNodeUpdateEvent:
rmp.processRMNodeUpdateEvent(v)
case *rmevent.RMReleaseAllocationAskEvent:
Expand Down Expand Up @@ -294,6 +309,13 @@ func (rmp *RMProxy) UpdateAllocation(request *si.AllocationRequest) error {
return fmt.Errorf("received AllocationRequest, but RmID=\"%s\" not registered", request.RmID)
}
go func() {
// Update allocations
if len(request.Allocations) > 0 {
for _, alloc := range request.Allocations {
alloc.PartitionName = common.GetNormalizedPartitionName(alloc.PartitionName, request.RmID)
}
}

// Update asks
if len(request.Asks) > 0 {
for _, ask := range request.Asks {
Expand Down
69 changes: 69 additions & 0 deletions pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,11 +747,15 @@ func (cc *ClusterContext) updateNode(nodeInfo *si.NodeInfo) {
}

// Process an ask and allocation update request.
// - Add new allocations for the application(s).
// - Add new asks and remove released asks for the application(s).
// - Release allocations for the application(s).
// Lock free call, all updates occur on the underlying application which is locked or via events.
func (cc *ClusterContext) handleRMUpdateAllocationEvent(event *rmevent.RMUpdateAllocationEvent) {
request := event.Request
if len(request.Allocations) != 0 {
cc.processAllocations(request)
}
if len(request.Asks) != 0 {
cc.processAsks(request)
}
Expand All @@ -765,6 +769,71 @@ func (cc *ClusterContext) handleRMUpdateAllocationEvent(event *rmevent.RMUpdateA
}
}

func (cc *ClusterContext) processAllocations(request *si.AllocationRequest) {
// Send rejected allocations back to RM
rejectedAllocs := make([]*si.RejectedAllocation, 0)

// Send to scheduler
for _, siAlloc := range request.Allocations {
// try to get ApplicationInfo
partition := cc.GetPartition(siAlloc.PartitionName)
if partition == nil {
msg := fmt.Sprintf("Failed to find partition %s, for application %s and allocation %s", siAlloc.PartitionName, siAlloc.ApplicationID, siAlloc.AllocationKey)
log.Log(log.SchedContext).Error("Invalid allocation add requested by shim, partition not found",
zap.String("partition", siAlloc.PartitionName),
zap.String("nodeID", siAlloc.NodeID),
zap.String("applicationID", siAlloc.ApplicationID),
zap.String("allocationKey", siAlloc.AllocationKey))
rejectedAllocs = append(rejectedAllocs, &si.RejectedAllocation{
AllocationKey: siAlloc.AllocationKey,
ApplicationID: siAlloc.ApplicationID,
Reason: msg,
})
continue
}

// try adding to app
node := cc.GetNode(siAlloc.GetNodeID(), siAlloc.GetPartitionName())
if node == nil {
msg := fmt.Sprintf("Failed to find node %s, for application %s and allocation %s", siAlloc.NodeID, siAlloc.ApplicationID, siAlloc.AllocationKey)
log.Log(log.SchedContext).Error("Invalid allocation add requested by shim, node not found",
zap.String("partition", siAlloc.PartitionName),
zap.String("nodeID", siAlloc.NodeID),
zap.String("applicationID", siAlloc.ApplicationID),
zap.String("allocationKey", siAlloc.AllocationKey))
rejectedAllocs = append(rejectedAllocs, &si.RejectedAllocation{
AllocationKey: siAlloc.AllocationKey,
ApplicationID: siAlloc.ApplicationID,
Reason: msg,
})
continue
}

alloc := objects.NewAllocationFromSI(siAlloc, node.GetInstanceType())
if err := partition.addAllocation(alloc); err != nil {
rejectedAllocs = append(rejectedAllocs, &si.RejectedAllocation{
AllocationKey: siAlloc.AllocationKey,
ApplicationID: siAlloc.ApplicationID,
Reason: err.Error(),
})
log.Log(log.SchedContext).Error("Invalid allocation add requested by shim",
zap.String("partition", siAlloc.PartitionName),
zap.String("nodeID", siAlloc.NodeID),
zap.String("applicationID", siAlloc.ApplicationID),
zap.String("allocationKey", siAlloc.AllocationKey),
zap.Error(err))
}
}

// Reject allocs returned to RM proxy for the apps and partitions not found
if len(rejectedAllocs) > 0 {
cc.rmEventHandler.HandleEvent(&rmevent.RMRejectedAllocationEvent{
RmID: request.RmID,
RejectedAllocations: rejectedAllocs,
})
}
}

func (cc *ClusterContext) processAsks(request *si.AllocationRequest) {
// Send rejects back to RM
rejectedAsks := make([]*si.RejectedAllocationAsk, 0)
Expand Down

0 comments on commit ab89786

Please sign in to comment.