Skip to content

Commit

Permalink
fix: various agent pool and job bugs (#659)
Browse files Browse the repository at this point in the history
* Workspace agent pool select form would incorrectly show current agent
pool when there are multiple pools
* Jobs could incorrectly transition state, e.g. from canceled to
allocated
* Agent would, upon failing to start an allocated job, re-attempt to
start the job repeatedly without backing off
  • Loading branch information
leg100 authored Dec 6, 2023
1 parent 5424565 commit ed9b1fd
Show file tree
Hide file tree
Showing 9 changed files with 141 additions and 97 deletions.
86 changes: 44 additions & 42 deletions internal/agent/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,57 +221,59 @@ func (d *daemon) Start(ctx context.Context) error {
// fetch jobs allocated to this agent and launch workers to do jobs; also
// handle cancelation signals for jobs
for {
// block on waiting for jobs
var jobs []*Job
getJobs := func() (err error) {
processJobs := func() (err error) {
d.poolLogger.Info("waiting for next job")
jobs, err = d.getAgentJobs(ctx, agent.ID)
return err
// block on waiting for jobs
jobs, err := d.getAgentJobs(ctx, agent.ID)
if err != nil {
return err
}
for _, j := range jobs {
if j.Status == JobAllocated {
d.poolLogger.Info("received job", "job", j)
// start job and receive job token in return
token, err := d.startJob(ctx, j.Spec)
if err != nil {
if ctx.Err() != nil {
return nil
}
d.poolLogger.Error(err, "starting job")
continue
}
d.poolLogger.V(0).Info("started job")
op := newOperation(newOperationOptions{
logger: d.poolLogger.WithValues("job", j),
client: d.client,
config: d.config,
job: j,
downloader: d.downloader,
envs: d.envs,
token: token,
})
// check operation in with the terminator, so that if a cancelation signal
// arrives it can be handled accordingly for the duration of the operation.
terminator.checkIn(j.Spec, op)
op.V(0).Info("started job")
g.Go(func() error {
op.doAndFinish()
terminator.checkOut(op.job.Spec)
return nil
})
} else if j.Signaled != nil {
d.poolLogger.Info("received cancelation signal", "force", *j.Signaled, "job", j)
terminator.cancel(j.Spec, *j.Signaled, true)
}
}
return nil
}
policy := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
_ = backoff.RetryNotify(getJobs, policy, func(err error, next time.Duration) {
_ = backoff.RetryNotify(processJobs, policy, func(err error, next time.Duration) {
d.poolLogger.Error(err, "waiting for next job", "backoff", next)
})
// only stop retrying if context is canceled
if ctx.Err() != nil {
return nil
}
for _, j := range jobs {
if j.Status == JobAllocated {
d.poolLogger.Info("received job", "job", j)
// start job and receive job token in return
token, err := d.startJob(ctx, j.Spec)
if err != nil {
if ctx.Err() != nil {
return nil
}
d.poolLogger.Error(err, "starting job")
continue
}
d.poolLogger.V(0).Info("started job")
op := newOperation(newOperationOptions{
logger: d.poolLogger.WithValues("job", j),
client: d.client,
config: d.config,
job: j,
downloader: d.downloader,
envs: d.envs,
token: token,
})
// check operation in with the terminator, so that if a cancelation signal
// arrives it can be handled accordingly for the duration of the operation.
terminator.checkIn(j.Spec, op)
op.V(0).Info("started job")
g.Go(func() error {
op.doAndFinish()
terminator.checkOut(op.job.Spec)
return nil
})
} else if j.Signaled != nil {
d.poolLogger.Info("received cancelation signal", "force", *j.Signaled, "job", j)
terminator.cancel(j.Spec, *j.Signaled, true)
}
}
}
})
return g.Wait()
Expand Down
65 changes: 39 additions & 26 deletions internal/agent/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (j *Job) LogValue() slog.Value {

func (j *Job) Organizations() []string { return nil }

func (j *Job) IsSiteAdmin() bool { return true }
func (j *Job) IsOwner(string) bool { return true }
func (j *Job) IsSiteAdmin() bool { return false }
func (j *Job) IsOwner(string) bool { return false }

func (j *Job) CanAccessSite(action rbac.Action) bool {
return false
Expand Down Expand Up @@ -176,10 +176,9 @@ func (j *Job) CanAccessTeam(rbac.Action, string) bool {
}

func (j *Job) allocate(agentID string) error {
if j.Status != JobUnallocated {
return errors.New("job can only be allocated when it is in the unallocated state")
if err := j.updateStatus(JobAllocated); err != nil {
return err
}
j.Status = JobAllocated
j.AgentID = &agentID
return nil
}
Expand All @@ -195,10 +194,8 @@ func (j *Job) reallocate(agentID string) error {
// cancel job based on current state of its parent run - depending on its state,
// the job is signaled and/or its state is updated too.
func (j *Job) cancel(run *otfrun.Run) (*bool, error) {
var (
// whether job be signaled
signal *bool
)
// whether job be signaled
var signal *bool
switch run.Status {
case otfrun.RunPlanning, otfrun.RunApplying:
if run.CancelSignaledAt != nil {
Expand All @@ -207,17 +204,16 @@ func (j *Job) cancel(run *otfrun.Run) (*bool, error) {
signal = internal.Bool(false)
}
case otfrun.RunCanceled:
// run has been canceled so immediately cancel job too unless already
// canceled
if j.Status != JobCanceled {
j.Status = JobCanceled
// run has been canceled so immediately cancel job too
if err := j.updateStatus(JobCanceled); err != nil {
return nil, err
}
case otfrun.RunForceCanceled:
// run has been forceably canceled, so both signal job to forcefully
// cancel current operation, and immediately cancel job.
signal = internal.Bool(true)
if j.Status != JobCanceled {
j.Status = JobCanceled
if err := j.updateStatus(JobCanceled); err != nil {
return nil, err
}
}
if signal != nil {
Expand All @@ -230,19 +226,36 @@ func (j *Job) cancel(run *otfrun.Run) (*bool, error) {
return nil, nil
}

func (j *Job) startJob() error {
return j.updateStatus(JobRunning)
}

func (j *Job) finishJob(to JobStatus) error {
return j.updateStatus(to)
}

func (j *Job) updateStatus(to JobStatus) error {
switch to {
case JobRunning:
if j.Status != JobAllocated {
return ErrInvalidJobStateTransition
var isValid bool
switch j.Status {
case JobUnallocated:
switch to {
case JobAllocated, JobCanceled:
isValid = true
}
case JobFinished, JobErrored, JobCanceled:
if j.Status != JobRunning {
return ErrInvalidJobStateTransition
case JobAllocated:
switch to {
case JobRunning, JobCanceled:
isValid = true
}
case JobRunning:
switch to {
case JobFinished, JobCanceled, JobErrored:
isValid = true
}
default:
return ErrInvalidJobStateTransition
}
j.Status = to
return nil
if isValid {
j.Status = to
return nil
}
return ErrInvalidJobStateTransition
}
25 changes: 25 additions & 0 deletions internal/agent/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,28 @@ func Test_jobSpecFromString(t *testing.T) {
})
}
}

func TestJob_updateStatus(t *testing.T) {
tests := []struct {
name string
from JobStatus
to JobStatus
want error
}{
{"allocate job", JobUnallocated, JobAllocated, nil},
{"start job", JobAllocated, JobRunning, nil},
{"finish job", JobRunning, JobFinished, nil},
{"finish with error", JobRunning, JobErrored, nil},
{"cancel unstarted job", JobAllocated, JobCanceled, nil},
{"cancel running job", JobRunning, JobCanceled, nil},
{"cannot allocate canceled job", JobCanceled, JobAllocated, ErrInvalidJobStateTransition},
{"cannot allocate finished job", JobCanceled, JobFinished, ErrInvalidJobStateTransition},
{"cannot allocate errored job", JobCanceled, JobErrored, ErrInvalidJobStateTransition},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
j := &Job{Status: tt.from}
assert.Equal(t, tt.want, j.updateStatus(tt.to))
})
}
}
10 changes: 5 additions & 5 deletions internal/agent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,8 @@ func (s *service) createJob(ctx context.Context, run *otfrun.Run) error {
}

// cancelJob is called when a user cancels a run - cancelJob determines whether
// the corresponding job is signaled and what type of signal, and/or whether the job
// should be canceled.
// the corresponding job is signaled and what type of signal, and/or whether the
// job should be canceled.
func (s *service) cancelJob(ctx context.Context, run *otfrun.Run) error {
var (
spec = JobSpec{RunID: run.ID, Phase: run.Phase()}
Expand All @@ -525,7 +525,7 @@ func (s *service) cancelJob(ctx context.Context, run *otfrun.Run) error {
})
if err != nil {
if errors.Is(err, internal.ErrResourceNotFound) {
// ignore when there is no job corresponding to a run phase yet.
// ignore when no job has yet been created for the run.
return nil
}
s.Error(err, "canceling job", "spec", spec)
Expand Down Expand Up @@ -640,7 +640,7 @@ func (s *service) startJob(ctx context.Context, spec JobSpec) ([]byte, error) {
if job.AgentID == nil || *job.AgentID != subject.String() {
return internal.ErrAccessNotPermitted
}
if err := job.updateStatus(JobRunning); err != nil {
if err := job.startJob(); err != nil {
return err
}
// start corresponding run phase too
Expand Down Expand Up @@ -692,7 +692,7 @@ func (s *service) finishJob(ctx context.Context, spec JobSpec, opts finishJobOpt
if err != nil {
return err
}
return job.updateStatus(opts.Status)
return job.finishJob(opts.Status)
})
if err != nil {
s.Error(err, "finishing job", "spec", spec)
Expand Down
9 changes: 7 additions & 2 deletions internal/agent/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ func (h *webHandlers) listAllowedPools(w http.ResponseWriter, r *http.Request) {
h.Error(w, err.Error(), http.StatusUnprocessableEntity)
return
}

ws, err := h.workspaceService.GetWorkspace(r.Context(), workspaceID)
if err != nil {
h.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -318,7 +317,13 @@ func (h *webHandlers) listAllowedPools(w http.ResponseWriter, r *http.Request) {
return
}

h.Render("agent_pools_list_allowed.tmpl", w, pools)
h.Render("agent_pools_list_allowed.tmpl", w, struct {
Pools []*Pool
CurrentPoolID string
}{
Pools: pools,
CurrentPoolID: r.URL.Query().Get("agent_pool_id"),
})
}

// agent token handlers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@
</div>
<div>
{{ template "identifier" . }}
<form action="{{ deleteAgentPoolPath .ID }}" method="POST">
<button class="btn-danger" onclick="return confirm('Are you sure you want to delete?')">delete</button>
</form>
</div>
</div>
{{ end }}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{{ range . }}
<option value="{{ .ID }}">{{ .Name }}</option>
<select id="agent-pool-id" name="agent_pool_id">
{{ range .Pools }}
<option value="{{ .ID }}" {{ selected $.CurrentPoolID .ID }}>{{ .Name }}</option>
{{ end }}
</select>
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<div class="col-start-2 hidden peer-checked:flex flex-col mt-2 bg-gray-100 p-2 gap-2">
<div class="flex items-center gap-2">
<label class="text-md" for="agent-pool-id">Agent pool</label>
<select hx-get="{{ poolsWorkspacePath .Workspace.ID }}" hx-trigger="load" hx-swap="innerHTML" id="agent-pool-id" name="agent_pool_id"></select>
<div hx-get="{{ poolsWorkspacePath .Workspace.ID }}?agent_pool_id={{ default "" .Workspace.AgentPoolID }}" hx-trigger="load" hx-swap="innerHTML"></div>
</div>
<span class="description">Select an agent pool. If no pools are listed then you either need to create a pool or you need to configure at least one pool to grant access to your workspace. Manage agent pools <a id="agent-pools-link" class="underline" href="{{ agentPoolsPath .Workspace.Organization }}">here</a>.</span>
</div>
Expand Down
32 changes: 16 additions & 16 deletions internal/workspace/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,15 +427,15 @@ func (h *webHandlers) editWorkspace(w http.ResponseWriter, r *http.Request) {

func (h *webHandlers) updateWorkspace(w http.ResponseWriter, r *http.Request) {
var params struct {
AgentPoolID *string `schema:"agent_pool_id"`
AutoApply bool `schema:"auto_apply"`
Name *string
Description *string
ExecutionMode *ExecutionMode `schema:"execution_mode"`
TerraformVersion *string `schema:"terraform_version"`
WorkingDirectory *string `schema:"working_directory"`
WorkspaceID string `schema:"workspace_id,required"`
GlobalRemoteState bool `schema:"global_remote_state"`
AgentPoolID string `schema:"agent_pool_id"`
AutoApply bool `schema:"auto_apply"`
Name string
Description string
ExecutionMode ExecutionMode `schema:"execution_mode"`
TerraformVersion string `schema:"terraform_version"`
WorkingDirectory string `schema:"working_directory"`
WorkspaceID string `schema:"workspace_id,required"`
GlobalRemoteState bool `schema:"global_remote_state"`

// VCS connection
VCSTriggerStrategy string `schema:"vcs_trigger"`
Expand All @@ -459,11 +459,11 @@ func (h *webHandlers) updateWorkspace(w http.ResponseWriter, r *http.Request) {

opts := UpdateOptions{
AutoApply: &params.AutoApply,
Name: params.Name,
Description: params.Description,
ExecutionMode: params.ExecutionMode,
TerraformVersion: params.TerraformVersion,
WorkingDirectory: params.WorkingDirectory,
Name: &params.Name,
Description: &params.Description,
ExecutionMode: &params.ExecutionMode,
TerraformVersion: &params.TerraformVersion,
WorkingDirectory: &params.WorkingDirectory,
GlobalRemoteState: &params.GlobalRemoteState,
}
if ws.Connection != nil {
Expand All @@ -490,8 +490,8 @@ func (h *webHandlers) updateWorkspace(w http.ResponseWriter, r *http.Request) {
}
}
// only set agent pool ID if execution mode is set to agent
if opts.ExecutionMode != nil && *opts.ExecutionMode == AgentExecutionMode {
opts.AgentPoolID = params.AgentPoolID
if params.ExecutionMode == AgentExecutionMode {
opts.AgentPoolID = &params.AgentPoolID
}

ws, err = h.svc.UpdateWorkspace(r.Context(), params.WorkspaceID, opts)
Expand Down

0 comments on commit ed9b1fd

Please sign in to comment.