Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix AllocRunner not capturing destroy signal and tests #755

Merged
merged 3 commits into from
Feb 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,15 +326,8 @@ func (r *AllocRunner) Run() {
defer close(r.waitCh)
go r.dirtySyncState()

// Check if the allocation is in a terminal status
alloc := r.alloc
if alloc.TerminalStatus() {
r.logger.Printf("[DEBUG] client: aborting runner for alloc '%s', terminal status", r.alloc.ID)
return
}
r.logger.Printf("[DEBUG] client: starting runner for alloc '%s'", r.alloc.ID)

// Find the task group to run in the allocation
alloc := r.alloc
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
Expand All @@ -353,7 +346,18 @@ func (r *AllocRunner) Run() {
r.ctx = driver.NewExecContext(allocDir, r.alloc.ID)
}

// Check if the allocation is in a terminal status. In this case, we don't
// start any of the task runners and directly wait for the destroy signal to
// clean up the allocation.
if alloc.TerminalStatus() {
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.alloc.ID)
r.handleDestroy()
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
return
}

// Start the task runners
r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.alloc.ID)
r.taskLock.Lock()
for _, task := range tg.Tasks {
if _, ok := r.restored[task.Name]; ok {
Expand Down Expand Up @@ -415,7 +419,6 @@ OUTER:

// Destroy each sub-task
r.taskLock.Lock()
defer r.taskLock.Unlock()
for _, tr := range r.tasks {
tr.Destroy()
}
Expand All @@ -424,12 +427,21 @@ OUTER:
for _, tr := range r.tasks {
<-tr.WaitCh()
}
r.taskLock.Unlock()

// Final state sync
r.retrySyncState(nil)

// Check if we should destroy our state
if r.destroy {
// Block until we should destroy the state of the alloc
r.handleDestroy()
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
}

// handleDestroy blocks till the AllocRunner should be destroyed and does the
// necessary cleanup.
func (r *AllocRunner) handleDestroy() {
select {
case <-r.destroyCh:
if err := r.DestroyContext(); err != nil {
r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v",
r.alloc.ID, err)
Expand All @@ -439,7 +451,6 @@ OUTER:
r.alloc.ID, err)
}
}
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
}

// Update is used to update the allocation of the context
Expand Down
217 changes: 214 additions & 3 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,95 @@ func TestAllocRunner_SimpleRun(t *testing.T) {
})
}

func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus == structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Update the alloc to be terminal which should cause the alloc runner to
// stop the tasks and wait for a destroy.
update := ar.alloc.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}

// Check the status has changed.
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}

// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
}

// Check the alloc directory still exists
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil {
return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Send the destroy signal and ensure the AllocRunner cleans up.
ar.Destroy()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}

// Check the status has changed.
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}

// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}

// Check the alloc directory was cleaned
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestAllocRunner_Destroy(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)
Expand All @@ -83,10 +172,30 @@ func TestAllocRunner_Destroy(t *testing.T) {
if upd.Count == 0 {
return false, nil
}

// Check the status has changed.
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}

// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}

// Check the alloc directory was cleaned
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
t.Fatalf("err: %v", err)
})

if time.Since(start) > 15*time.Second {
Expand Down Expand Up @@ -129,7 +238,6 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()
defer ar.Destroy()

// Snapshot state
testutil.WaitForResult(func() (bool, error) {
Expand Down Expand Up @@ -171,3 +279,106 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
t.Fatalf("took too long to terminate")
}
}

func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
ctestutil.ExecCompatible(t)
upd, ar := testAllocRunner(false)

// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = []string{"10"}
go ar.Run()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus == structs.AllocClientStatusRunning {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Update the alloc to be terminal which should cause the alloc runner to
// stop the tasks and wait for a destroy.
update := ar.alloc.Copy()
update.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(update)

testutil.WaitForResult(func() (bool, error) {
return ar.alloc.DesiredStatus == structs.AllocDesiredStatusStop, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

err := ar.SaveState()
if err != nil {
t.Fatalf("err: %v", err)
}

// Ensure both alloc runners don't destroy
ar.destroy = true

// Create a new alloc runner
consulClient, err := NewConsulService(&consulServiceConfig{ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID}, consulClient)
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
}
go ar2.Run()

testutil.WaitForResult(func() (bool, error) {
// Check the state still exists
if _, err := os.Stat(ar.stateFilePath()); err != nil {
return false, fmt.Errorf("state file destroyed: %v", err)
}

// Check the alloc directory still exists
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil {
return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
})

// Send the destroy signal and ensure the AllocRunner cleans up.
ar2.Destroy()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}

// Check the status has changed.
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusDead {
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
}

// Check the state was cleaned
if _, err := os.Stat(ar.stateFilePath()); err == nil {
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}

// Check the alloc directory was cleaned
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("stat err: %v", err)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
6 changes: 3 additions & 3 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type AllocDir struct {
TaskDirs map[string]string

// A list of locations the shared alloc has been mounted to.
mounted []string
Mounted []string
}

// AllocFileInfo holds information about a file inside the AllocDir
Expand Down Expand Up @@ -64,7 +64,7 @@ func NewAllocDir(allocDir string) *AllocDir {
// Tears down previously build directory structure.
func (d *AllocDir) Destroy() error {
// Unmount all mounted shared alloc dirs.
for _, m := range d.mounted {
for _, m := range d.Mounted {
if err := d.unmountSharedDir(m); err != nil {
return fmt.Errorf("Failed to unmount shared directory: %v", err)
}
Expand Down Expand Up @@ -233,7 +233,7 @@ func (d *AllocDir) MountSharedDir(task string) error {
return fmt.Errorf("Failed to mount shared directory for task %v: %v", task, err)
}

d.mounted = append(d.mounted, taskLoc)
d.Mounted = append(d.Mounted, taskLoc)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion testutil/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type testFn func() (bool, error)
type errorFn func(error)

func WaitForResult(test testFn, error errorFn) {
WaitForResultRetries(1000*TestMultiplier(), test, error)
WaitForResultRetries(2000*TestMultiplier(), test, error)
}

func WaitForResultRetries(retries int64, test testFn, error errorFn) {
Expand Down