Skip to content

Commit

Permalink
Merge branch 'master' into sendToGitHub/cancel-preempt-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
MustafaI authored Nov 11, 2024
2 parents 4d45901 + fb4d939 commit 4d0b456
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 40 deletions.
30 changes: 15 additions & 15 deletions internal/scheduler/scheduling/context/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/internaltypes"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)

// QueueSchedulingContext captures the decisions made by the scheduler during one invocation
Expand Down Expand Up @@ -48,9 +47,9 @@ type QueueSchedulingContext struct {
// Includes jobs scheduled during this invocation of the scheduler.
AllocatedByPriorityClass map[string]internaltypes.ResourceList
// Resources assigned to this queue during this scheduling cycle.
ScheduledResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
ScheduledResourcesByPriorityClass map[string]internaltypes.ResourceList
// Resources evicted from this queue during this scheduling cycle.
EvictedResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
EvictedResourcesByPriorityClass map[string]internaltypes.ResourceList
// Job scheduling contexts associated with successful scheduling attempts.
SuccessfulJobSchedulingContexts map[string]*JobSchedulingContext
// Job scheduling contexts associated with unsuccessful scheduling attempts.
Expand Down Expand Up @@ -82,10 +81,10 @@ func (qctx *QueueSchedulingContext) ReportString(verbosity int32) string {
fmt.Fprintf(w, "Time:\t%s\n", qctx.Created)
fmt.Fprintf(w, "Queue:\t%s\n", qctx.Queue)
}
fmt.Fprintf(w, "Scheduled resources:\t%s\n", qctx.ScheduledResourcesByPriorityClass.AggregateByResource().CompactString())
fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", qctx.ScheduledResourcesByPriorityClass.String())
fmt.Fprintf(w, "Preempted resources:\t%s\n", qctx.EvictedResourcesByPriorityClass.AggregateByResource().CompactString())
fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", qctx.EvictedResourcesByPriorityClass.String())
fmt.Fprintf(w, "Scheduled resources:\t%s\n", internaltypes.RlMapSumValues(qctx.ScheduledResourcesByPriorityClass).String())
fmt.Fprintf(w, "Scheduled resources (by priority):\t%s\n", internaltypes.RlMapToString(qctx.ScheduledResourcesByPriorityClass))
fmt.Fprintf(w, "Preempted resources:\t%s\n", internaltypes.RlMapSumValues(qctx.EvictedResourcesByPriorityClass).String())
fmt.Fprintf(w, "Preempted resources (by priority):\t%s\n", internaltypes.RlMapToString(qctx.EvictedResourcesByPriorityClass))
if verbosity >= 0 {
fmt.Fprintf(w, "Total allocated resources after scheduling:\t%s\n", qctx.Allocated.String())
for pc, res := range qctx.AllocatedByPriorityClass {
Expand Down Expand Up @@ -171,17 +170,18 @@ func (qctx *QueueSchedulingContext) addJobSchedulingContext(jctx *JobSchedulingC
// Always update ResourcesByPriority.
// Since ResourcesByPriority is used to order queues by fraction of fair share.
pcName := jctx.Job.PriorityClassName()
qctx.AllocatedByPriorityClass[pcName] = qctx.AllocatedByPriorityClass[pcName].Add(jctx.Job.AllResourceRequirements())
qctx.Allocated = qctx.Allocated.Add(jctx.Job.AllResourceRequirements())
rl := jctx.Job.AllResourceRequirements()
qctx.AllocatedByPriorityClass[pcName] = qctx.AllocatedByPriorityClass[pcName].Add(rl)
qctx.Allocated = qctx.Allocated.Add(rl)

// Only if the job is not evicted, update ScheduledResourcesByPriority.
// Since ScheduledResourcesByPriority is used to control per-round scheduling constraints.
if evictedInThisRound {
delete(qctx.EvictedJobsById, jctx.JobId)
qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
qctx.EvictedResourcesByPriorityClass[pcName] = qctx.EvictedResourcesByPriorityClass[pcName].Subtract(rl)
} else {
qctx.SuccessfulJobSchedulingContexts[jctx.JobId] = jctx
qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
qctx.ScheduledResourcesByPriorityClass[pcName] = qctx.ScheduledResourcesByPriorityClass[pcName].Add(rl)
}
} else {
qctx.UnsuccessfulJobSchedulingContexts[jctx.JobId] = jctx
Expand All @@ -197,16 +197,16 @@ func (qctx *QueueSchedulingContext) evictJob(job *jobdb.Job) (bool, error) {
if _, ok := qctx.EvictedJobsById[jobId]; ok {
return false, errors.Errorf("failed evicting job %s from queue: job already marked evicted", jobId)
}
rl := job.ResourceRequirements().Requests
pcName := job.PriorityClassName()
rl := job.AllResourceRequirements()
_, scheduledInThisRound := qctx.SuccessfulJobSchedulingContexts[jobId]
if scheduledInThisRound {
qctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(job.PriorityClassName(), rl)
qctx.ScheduledResourcesByPriorityClass[pcName] = qctx.ScheduledResourcesByPriorityClass[pcName].Subtract(rl)
delete(qctx.SuccessfulJobSchedulingContexts, jobId)
} else {
qctx.EvictedResourcesByPriorityClass.AddV1ResourceList(job.PriorityClassName(), rl)
qctx.EvictedResourcesByPriorityClass[pcName] = qctx.EvictedResourcesByPriorityClass[pcName].Add(rl)
qctx.EvictedJobsById[jobId] = true
}
pcName := job.PriorityClassName()
qctx.AllocatedByPriorityClass[pcName] = qctx.AllocatedByPriorityClass[pcName].Subtract(job.AllResourceRequirements())
qctx.Allocated = qctx.Allocated.Subtract(job.AllResourceRequirements())

Expand Down
43 changes: 18 additions & 25 deletions internal/scheduler/scheduling/context/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ type SchedulingContext struct {
// Allocated resources across all clusters in this pool
Allocated internaltypes.ResourceList
// Resources assigned across all queues during this scheduling cycle.
ScheduledResources internaltypes.ResourceList
ScheduledResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
ScheduledResources internaltypes.ResourceList
// Resources evicted across all queues during this scheduling cycle.
EvictedResources schedulerobjects.ResourceList
EvictedResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
EvictedResources internaltypes.ResourceList
// Total number of successfully scheduled jobs.
NumScheduledJobs int
// Total number of successfully scheduled gangs.
Expand All @@ -71,17 +69,16 @@ func NewSchedulingContext(
totalResources internaltypes.ResourceList,
) *SchedulingContext {
return &SchedulingContext{
Started: time.Now(),
Pool: pool,
FairnessCostProvider: fairnessCostProvider,
Limiter: limiter,
QueueSchedulingContexts: make(map[string]*QueueSchedulingContext),
TotalResources: totalResources,
ScheduledResources: internaltypes.ResourceList{},
ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
EvictedResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
SchedulingKeyGenerator: schedulerobjects.NewSchedulingKeyGenerator(),
UnfeasibleSchedulingKeys: make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext),
Started: time.Now(),
Pool: pool,
FairnessCostProvider: fairnessCostProvider,
Limiter: limiter,
QueueSchedulingContexts: make(map[string]*QueueSchedulingContext),
TotalResources: totalResources,
ScheduledResources: internaltypes.ResourceList{},
EvictedResources: internaltypes.ResourceList{},
SchedulingKeyGenerator: schedulerobjects.NewSchedulingKeyGenerator(),
UnfeasibleSchedulingKeys: make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext),
}
}

Expand Down Expand Up @@ -125,8 +122,8 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext(
Demand: demand,
CappedDemand: cappedDemand,
AllocatedByPriorityClass: initialAllocatedByPriorityClass,
ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
EvictedResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
ScheduledResourcesByPriorityClass: make(map[string]internaltypes.ResourceList),
EvictedResourcesByPriorityClass: make(map[string]internaltypes.ResourceList),
SuccessfulJobSchedulingContexts: make(map[string]*JobSchedulingContext),
UnsuccessfulJobSchedulingContexts: make(map[string]*JobSchedulingContext),
EvictedJobsById: make(map[string]bool),
Expand Down Expand Up @@ -221,7 +218,7 @@ func (sctx *SchedulingContext) ReportString(verbosity int32) string {
fmt.Fprintf(w, "Termination reason:\t%s\n", sctx.TerminationReason)
fmt.Fprintf(w, "Total capacity:\t%s\n", sctx.TotalResources.String())
fmt.Fprintf(w, "Scheduled resources:\t%s\n", sctx.ScheduledResources.String())
fmt.Fprintf(w, "Preempted resources:\t%s\n", sctx.EvictedResources.CompactString())
fmt.Fprintf(w, "Preempted resources:\t%s\n", sctx.EvictedResources.String())
fmt.Fprintf(w, "Number of gangs scheduled:\t%d\n", sctx.NumScheduledGangs)
fmt.Fprintf(w, "Number of jobs scheduled:\t%d\n", sctx.NumScheduledJobs)
fmt.Fprintf(w, "Number of jobs preempted:\t%d\n", sctx.NumEvictedJobs)
Expand Down Expand Up @@ -293,12 +290,10 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex
}
if jctx.IsSuccessful() {
if evictedInThisRound {
sctx.EvictedResources.SubV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests)
sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
sctx.EvictedResources = sctx.EvictedResources.Subtract(jctx.Job.AllResourceRequirements())
sctx.NumEvictedJobs--
} else {
sctx.ScheduledResources = sctx.ScheduledResources.Add(jctx.Job.AllResourceRequirements())
sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests)
sctx.NumScheduledJobs++
}
sctx.Allocated = sctx.Allocated.Add(jctx.Job.AllResourceRequirements())
Expand Down Expand Up @@ -340,14 +335,12 @@ func (sctx *SchedulingContext) EvictJob(jctx *JobSchedulingContext) (bool, error
if err != nil {
return false, err
}
rl := jctx.Job.ResourceRequirements().Requests

if scheduledInThisRound {
sctx.ScheduledResources = sctx.ScheduledResources.Subtract(jctx.Job.AllResourceRequirements())
sctx.ScheduledResourcesByPriorityClass.SubV1ResourceList(jctx.Job.PriorityClassName(), rl)
sctx.NumScheduledJobs--
} else {
sctx.EvictedResources.AddV1ResourceList(rl)
sctx.EvictedResourcesByPriorityClass.AddV1ResourceList(jctx.Job.PriorityClassName(), rl)
sctx.EvictedResources = sctx.EvictedResources.Add(jctx.Job.AllResourceRequirements())
sctx.NumEvictedJobs++
}
sctx.Allocated = sctx.Allocated.Subtract(jctx.Job.AllResourceRequirements())
Expand Down

0 comments on commit 4d0b456

Please sign in to comment.