Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1698 from endocode/dongsu/engine-fix-replace-resc…
Browse files Browse the repository at this point in the history
…hedule

agent,engine: fix bugs in rescheduling for replaced units
  • Loading branch information
Dongsu Park authored Nov 7, 2016
2 parents e98df4c + f382b13 commit 2b2eda2
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 102 deletions.
30 changes: 15 additions & 15 deletions agent/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,48 +275,48 @@ func TestAbleToRun(t *testing.T) {
tests := []struct {
dState *AgentState
job *job.Job
want bool
want job.JobAction
}{
// nothing to worry about
{
dState: NewAgentState(&machine.MachineState{ID: "123"}),
job: &job.Job{Name: "easy-street.service", Unit: unit.UnitFile{}},
want: true,
want: job.JobActionSchedule,
},

// match MachineID
{
dState: NewAgentState(&machine.MachineState{ID: "XYZ"}),
job: newTestJobWithXFleetValues(t, "MachineID=XYZ"),
want: true,
want: job.JobActionSchedule,
},

// mismatch MachineID
{
dState: NewAgentState(&machine.MachineState{ID: "123"}),
job: newTestJobWithXFleetValues(t, "MachineID=XYZ"),
want: false,
want: job.JobActionUnschedule,
},

// match MachineMetadata
{
dState: NewAgentState(&machine.MachineState{ID: "123", Metadata: map[string]string{"region": "us-west"}}),
job: newTestJobWithXFleetValues(t, "MachineMetadata=region=us-west"),
want: true,
want: job.JobActionSchedule,
},

// Machine metadata ignored when no MachineMetadata in Job
{
dState: NewAgentState(&machine.MachineState{ID: "123", Metadata: map[string]string{"region": "us-west"}}),
job: &job.Job{Name: "easy-street.service", Unit: unit.UnitFile{}},
want: true,
want: job.JobActionSchedule,
},

// mismatch MachineMetadata
{
dState: NewAgentState(&machine.MachineState{ID: "123", Metadata: map[string]string{"region": "us-west"}}),
job: newTestJobWithXFleetValues(t, "MachineMetadata=region=us-east"),
want: false,
want: job.JobActionUnschedule,
},

// peer scheduled locally
Expand All @@ -328,7 +328,7 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "MachineOf=pong.service"),
want: true,
want: job.JobActionSchedule,
},

// multiple peers scheduled locally
Expand All @@ -341,14 +341,14 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "MachineOf=pong.service\nMachineOf=ping.service"),
want: true,
want: job.JobActionSchedule,
},

// peer not scheduled locally
{
dState: NewAgentState(&machine.MachineState{ID: "123"}),
job: newTestJobWithXFleetValues(t, "MachineOf=ping.service"),
want: false,
want: job.JobActionUnschedule,
},

// one of multiple peers not scheduled locally
Expand All @@ -360,7 +360,7 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "MachineOf=pong.service\nMachineOf=ping.service"),
want: false,
want: job.JobActionUnschedule,
},

// no conflicts found
Expand All @@ -372,7 +372,7 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "Conflicts=pong.service"),
want: true,
want: job.JobActionSchedule,
},

// conflicts found
Expand All @@ -384,7 +384,7 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "Conflicts=ping.service"),
want: false,
want: job.JobActionUnschedule,
},

// no replaces found
Expand All @@ -396,7 +396,7 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "Replaces=pong.service"),
want: true,
want: job.JobActionSchedule,
},

// replaces found
Expand All @@ -408,7 +408,7 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "Replaces=ping.service"),
want: false,
want: job.JobActionReschedule,
},
}

Expand Down
27 changes: 17 additions & 10 deletions agent/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,36 +124,43 @@ func globMatches(pattern, target string) bool {
// - Agent must have all required Peers of the Job scheduled locally (if any)
// - Job must not conflict with any other Units scheduled to the agent
// - Job must specially handle replaced units to be rescheduled
func (as *AgentState) AbleToRun(j *job.Job) (bool, string) {
func (as *AgentState) AbleToRun(j *job.Job) (jobAction job.JobAction, errstr string) {
if tgt, ok := j.RequiredTarget(); ok && !as.MState.MatchID(tgt) {
return false, fmt.Sprintf("agent ID %q does not match required %q", as.MState.ID, tgt)
return job.JobActionUnschedule, fmt.Sprintf("agent ID %q does not match required %q", as.MState.ID, tgt)
}

metadata := j.RequiredTargetMetadata()
if len(metadata) != 0 {
if !machine.HasMetadata(as.MState, metadata) {
return false, "local Machine metadata insufficient"
return job.JobActionUnschedule, "local Machine metadata insufficient"
}
}

peers := j.Peers()
if len(peers) != 0 {
for _, peer := range peers {
if !as.unitScheduled(peer) {
return false, fmt.Sprintf("required peer Unit(%s) is not scheduled locally", peer)
return job.JobActionUnschedule, fmt.Sprintf("required peer Unit(%s) is not scheduled locally", peer)
}
}
}

if cExists, cJobName := as.HasConflict(j.Name, j.Conflicts()); cExists {
return false, fmt.Sprintf("found conflict with locally-scheduled Unit(%s)", cJobName)
return job.JobActionUnschedule, fmt.Sprintf("found conflict with locally-scheduled Unit(%s)", cJobName)
}

// Handle Replace option specially, by returning a special string
// "jobreschedule" as reason.
if cExists, _ := as.hasReplace(j.Name, j.Replaces()); cExists {
return false, job.JobReschedule
// Handle Replace option specially for rescheduling the unit
if cExists, cJobName := as.hasReplace(j.Name, j.Replaces()); cExists {
return job.JobActionReschedule, fmt.Sprintf("found replace with locally-scheduled Unit(%s)", cJobName)
}

return true, ""
return job.JobActionSchedule, ""
}

func (as *AgentState) GetReplacedUnit(j *job.Job) (string, error) {
cExists, replaced := as.hasReplace(j.Name, j.Replaces())
if !cExists {
return "", fmt.Errorf("cannot find units to be replaced for Unit(%s)", j.Name)
}
return replaced, nil
}
121 changes: 88 additions & 33 deletions engine/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,56 +84,111 @@ func (r *Reconciler) calculateClusterTasks(clust *clusterState, stopchan chan st
return true
}

go func() {
defer close(taskchan)
decide := func(j *job.Job) (jobAction job.JobAction, reason string) {
if j.TargetState == job.JobStateInactive {
return job.JobActionUnschedule, "target state inactive"
}

agents := clust.agents()

as, ok := agents[j.TargetMachineID]
if !ok {
metrics.ReportEngineReconcileFailure(metrics.MachineAway)
return job.JobActionUnschedule, fmt.Sprintf("target Machine(%s) went away", j.TargetMachineID)
}

if act, ableReason := as.AbleToRun(j); act != job.JobActionSchedule {
metrics.ReportEngineReconcileFailure(metrics.RunFailure)
return act, fmt.Sprintf("target Machine(%s) unable to run unit: %v",
j.TargetMachineID, ableReason)
}

return job.JobActionSchedule, ""
}

handle_reschedule := func(j *job.Job, reason string) bool {
isRescheduled := false

agents := clust.agents()

as, ok := agents[j.TargetMachineID]
if !ok {
metrics.ReportEngineReconcileFailure(metrics.MachineAway)
return false
}

for _, cj := range clust.jobs {
if !cj.Scheduled() {
continue
}
if j.Name != cj.Name {
continue
}

replacedUnit, err := as.GetReplacedUnit(j)
if err != nil {
log.Debugf("No unit to reschedule: %v", err)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

if !send(taskTypeUnscheduleUnit, reason, replacedUnit, j.TargetMachineID) {
log.Infof("Job(%s) unschedule send failed", replacedUnit)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

dec, err := r.sched.DecideReschedule(clust, j)
if err != nil {
log.Debugf("Unable to schedule Job(%s): %v", j.Name, err)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

if !send(taskTypeAttemptScheduleUnit, reason, replacedUnit, dec.machineID) {
log.Infof("Job(%s) attemptschedule send failed", replacedUnit)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}
clust.schedule(replacedUnit, dec.machineID)
log.Debugf("rescheduling unit %s to machine %s", replacedUnit, dec.machineID)

clust.schedule(j.Name, j.TargetMachineID)
log.Debugf("scheduling unit %s to machine %s", j.Name, j.TargetMachineID)

isRescheduled = true
}

return isRescheduled
}

go func() {
defer close(taskchan)

for _, j := range clust.jobs {
if !j.Scheduled() {
continue
}

decide := func() (unschedule bool, reason string) {
if j.TargetState == job.JobStateInactive {
unschedule = true
reason = "target state inactive"
return
}

as, ok := agents[j.TargetMachineID]
if !ok {
unschedule = true
reason = fmt.Sprintf("target Machine(%s) went away", j.TargetMachineID)
metrics.ReportEngineReconcileFailure(metrics.MachineAway)
return
}

var able bool
var ableReason string
if able, ableReason = as.AbleToRun(j); !able {
unschedule = true
if ableReason == job.JobReschedule {
reason = ableReason
} else {
reason = fmt.Sprintf("target Machine(%s) unable to run unit", j.TargetMachineID)
metrics.ReportEngineReconcileFailure(metrics.RunFailure)
}
return
}

return
act, reason := decide(j)
if act == job.JobActionReschedule && handle_reschedule(j, reason) {
log.Debugf("Job(%s) is rescheduled: %v", j.Name, reason)
continue
}

unschedule, reason := decide()
if !unschedule {
if act != job.JobActionUnschedule {
log.Debugf("Job(%s) is not to be unscheduled, reason: %v", j.Name, reason)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

if !send(taskTypeUnscheduleUnit, reason, j.Name, j.TargetMachineID) {
log.Infof("Job(%s) send failed.", j.Name)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
return
}

log.Debugf("Job(%s) unscheduling.", j.Name)
clust.unschedule(j.Name)
}

Expand Down
39 changes: 38 additions & 1 deletion engine/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type decision struct {

type Scheduler interface {
Decide(*clusterState, *job.Job) (*decision, error)
DecideReschedule(*clusterState, *job.Job) (*decision, error)
}

type leastLoadedScheduler struct{}
Expand All @@ -41,7 +42,7 @@ func (lls *leastLoadedScheduler) Decide(clust *clusterState, j *job.Job) (*decis

var target *agent.AgentState
for _, as := range agents {
if able, _ := as.AbleToRun(j); !able {
if act, _ := as.AbleToRun(j); act == job.JobActionUnschedule {
continue
}

Expand All @@ -61,6 +62,42 @@ func (lls *leastLoadedScheduler) Decide(clust *clusterState, j *job.Job) (*decis
return &dec, nil
}

// DecideReschedule() decides scheduling in a much simpler way than
// Decide(). It just tries to find out another free machine to be scheduled,
// except for the current target machine. It does not have to run
// as.AbleToRun(), because its job action must have been already decided
// before getting into the function.
func (lls *leastLoadedScheduler) DecideReschedule(clust *clusterState, j *job.Job) (*decision, error) {
agents := lls.sortedAgents(clust)

if len(agents) == 0 {
return nil, fmt.Errorf("zero agents available")
}

found := false
var target *agent.AgentState
for _, as := range agents {
if as.MState.ID == j.TargetMachineID {
continue
}

as := as
target = as
found = true
break
}

if !found {
return nil, fmt.Errorf("no agents able to run job")
}

dec := decision{
machineID: target.MState.ID,
}

return &dec, nil
}

// sortedAgents returns a list of AgentState objects sorted ascending
// by the number of scheduled units
func (lls *leastLoadedScheduler) sortedAgents(clust *clusterState) []*agent.AgentState {
Expand Down
8 changes: 8 additions & 0 deletions functional/fixtures/units/replace-kick0.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[Unit]
Description=Test Unit

[Service]
ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done"

[X-Fleet]
Replaces=replace.0.service
Loading

0 comments on commit 2b2eda2

Please sign in to comment.