Skip to content

Commit

Permalink
engine: fix bugs in rescheduling for replaced units
Browse files Browse the repository at this point in the history
While the 'Replaces' option has been supported since
coreos#1572, the engine didn't actually
unschedule units to be replaced. It was a bug.

So let's implement GetReplacedUnit() to expose the replaced unit from
AgentState to the engine reconciler. And make the engine reconciler
unschedule the replaced unit, and schedule the current unit.
The engine scheduler's decision structure needs to have a helper for
the rescheduling case, by simply scheduling the replaced unit to a
free machine.
  • Loading branch information
Dongsu Park committed Nov 4, 2016
1 parent ec664a4 commit 13ba083
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 1 deletion.
8 changes: 8 additions & 0 deletions agent/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,11 @@ func (as *AgentState) AbleToRun(j *job.Job) (jobAction job.JobAction, errstr str

return job.JobActionSchedule, ""
}

func (as *AgentState) GetReplacedUnit(j *job.Job) (string, error) {
cExists, replaced := as.hasReplace(j.Name, j.Replaces())
if !cExists {
return "", fmt.Errorf("cannot find units to be replaced for Unit(%s)", j.Name)
}
return replaced, nil
}
66 changes: 66 additions & 0 deletions engine/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,62 @@ func (r *Reconciler) calculateClusterTasks(clust *clusterState, stopchan chan st
return job.JobActionSchedule, ""
}

handle_reschedule := func(j *job.Job, reason string) bool {
isRescheduled := false

agents := clust.agents()

as, ok := agents[j.TargetMachineID]
if !ok {
metrics.ReportEngineReconcileFailure(metrics.MachineAway)
return false
}

for _, cj := range clust.jobs {
if !cj.Scheduled() {
continue
}
if j.Name != cj.Name {
continue
}

replacedUnit, err := as.GetReplacedUnit(j)
if err != nil {
log.Debugf("No unit to reschedule: %v", err)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

if !send(taskTypeUnscheduleUnit, reason, replacedUnit, j.TargetMachineID) {
log.Infof("Job(%s) unschedule send failed", replacedUnit)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

dec, err := r.sched.DecideReschedule(clust, j)
if err != nil {
log.Debugf("Unable to schedule Job(%s): %v", j.Name, err)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

if !send(taskTypeAttemptScheduleUnit, reason, replacedUnit, dec.machineID) {
log.Infof("Job(%s) attemptschedule send failed", replacedUnit)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}
clust.schedule(replacedUnit, dec.machineID)
log.Debugf("rescheduling unit %s to machine %s", replacedUnit, dec.machineID)

clust.schedule(j.Name, j.TargetMachineID)
log.Debugf("scheduling unit %s to machine %s", j.Name, j.TargetMachineID)

isRescheduled = true
}

return isRescheduled
}

go func() {
defer close(taskchan)

Expand All @@ -115,14 +171,24 @@ func (r *Reconciler) calculateClusterTasks(clust *clusterState, stopchan chan st
}

act, reason := decide(j)
if act == job.JobActionReschedule && handle_reschedule(j, reason) {
log.Debugf("Job(%s) is rescheduled: %v", j.Name, reason)
continue
}

if act != job.JobActionUnschedule {
log.Debugf("Job(%s) is not to be unscheduled, reason: %v", j.Name, reason)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

if !send(taskTypeUnscheduleUnit, reason, j.Name, j.TargetMachineID) {
log.Infof("Job(%s) send failed.", j.Name)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
return
}

log.Debugf("Job(%s) unscheduling.", j.Name)
clust.unschedule(j.Name)
}

Expand Down
39 changes: 38 additions & 1 deletion engine/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type decision struct {

type Scheduler interface {
Decide(*clusterState, *job.Job) (*decision, error)
DecideReschedule(*clusterState, *job.Job) (*decision, error)
}

type leastLoadedScheduler struct{}
Expand All @@ -41,7 +42,7 @@ func (lls *leastLoadedScheduler) Decide(clust *clusterState, j *job.Job) (*decis

var target *agent.AgentState
for _, as := range agents {
if act, _ := as.AbleToRun(j); act != job.JobActionSchedule {
if act, _ := as.AbleToRun(j); act == job.JobActionUnschedule {
continue
}

Expand All @@ -61,6 +62,42 @@ func (lls *leastLoadedScheduler) Decide(clust *clusterState, j *job.Job) (*decis
return &dec, nil
}

// DecideReschedule() decides scheduling in a much simpler way than
// Decide(). It just tries to find out another free machine to be scheduled,
// except for the current target machine. It does not have to run
// as.AbleToRun(), because its job action must have been already decided
// before getting into the function.
func (lls *leastLoadedScheduler) DecideReschedule(clust *clusterState, j *job.Job) (*decision, error) {
agents := lls.sortedAgents(clust)

if len(agents) == 0 {
return nil, fmt.Errorf("zero agents available")
}

found := false
var target *agent.AgentState
for _, as := range agents {
if as.MState.ID == j.TargetMachineID {
continue
}

as := as
target = as
found = true
break
}

if !found {
return nil, fmt.Errorf("no agents able to run job")
}

dec := decision{
machineID: target.MState.ID,
}

return &dec, nil
}

// sortedAgents returns a list of AgentState objects sorted ascending
// by the number of scheduled units
func (lls *leastLoadedScheduler) sortedAgents(clust *clusterState) []*agent.AgentState {
Expand Down

0 comments on commit 13ba083

Please sign in to comment.