Skip to content

Commit

Permalink
MRD: move 'job stop -global' handling into RPC (#8776)
Browse files Browse the repository at this point in the history
The initial implementation of global job stop for MRD looped over all the
regions in the CLI for expedience. This changeset includes the OSS parts of
moving this into the RPC layer so that API consumers don't have to implement
this logic themselves.
  • Loading branch information
tgross authored Aug 28, 2020
1 parent 2adcf54 commit 5a01ab3
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 20 deletions.
24 changes: 24 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,30 @@ func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *W
return resp.EvalID, wm, nil
}

// DeregisterOptions is used to pass through job deregistration parameters
type DeregisterOptions struct {
// If Purge is set to true, the job is deregistered and purged from the
// system versus still being queryable and eventually GC'ed from the
// system. Most callers should not specify purge.
Purge bool

// If Global is set to true, all regions of a multiregion job will be
// stopped.
Global bool
}

// DeregisterOpts is used to remove an existing job. See DeregisterOptions
// for parameters.
func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOptions) (string, *WriteMeta, error) {
var resp JobDeregisterResponse
wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t&global=%t",
url.PathEscape(jobID), opts.Purge, opts.Global), &resp, q)
if err != nil {
return "", nil, err
}
return resp.EvalID, wm, nil
}

// ForceEvaluate is used to force-evaluate an existing job.
func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp JobRegisterResponse
Expand Down
15 changes: 13 additions & 2 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,20 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
}
}

globalStr := req.URL.Query().Get("global")
var globalBool bool
if globalStr != "" {
var err error
globalBool, err = strconv.ParseBool(globalStr)
if err != nil {
return nil, fmt.Errorf("Failed to parse value of %q (%v) as a bool: %v", "global", globalStr, err)
}
}

args := structs.JobDeregisterRequest{
JobID: jobName,
Purge: purgeBool,
JobID: jobName,
Purge: purgeBool,
Global: globalBool,
}
s.parseWriteRequest(req, &args.WriteRequest)

Expand Down
20 changes: 2 additions & 18 deletions command/job_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,26 +185,10 @@ func (c *JobStopCommand) Run(args []string) int {
}
}

// Scatter-gather job stop for multi-region jobs
if global && job.IsMultiregion() {
for _, region := range job.Multiregion.Regions {
// Invoke the stop
wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace, Region: region.Name}
evalID, _, err := client.Jobs().Deregister(*job.ID, purge, wq)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job in %q: %s", region.Name, err))
return 1
}
if evalID != "" {
c.Ui.Output(evalID)
}
}
return 0
}

// Invoke the stop
opts := &api.DeregisterOptions{Purge: purge, Global: global}
wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace}
evalID, _, err := client.Jobs().Deregister(*job.ID, purge, wq)
evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err))
return 1
Expand Down
5 changes: 5 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,11 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
reply.Index = evalIndex
}

err = j.multiregionStop(job, args, reply)
if err != nil {
return err
}

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions nomad/job_endpoint_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ func (j *Job) multiregionDrop(args *structs.JobRegisterRequest, reply *structs.J
return nil
}

// multiregionStop is used to fan-out Job.Deregister RPCs to all regions if
// the global flag is passed to Job.Deregister
func (j *Job) multiregionStop(job *structs.Job, args *structs.JobDeregisterRequest, reply *structs.JobDeregisterResponse) error {
return nil
}

// interpolateMultiregionFields interpolates a job for a specific region
func (j *Job) interpolateMultiregionFields(args *structs.JobPlanRequest) error {
return nil
Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,10 @@ type JobDeregisterRequest struct {
// garbage collector
Purge bool

// Global controls whether all regions of a multi-region job are
// deregistered. It is ignored for single-region jobs.
Global bool

// Eval is the evaluation to create that's associated with job deregister
Eval *Evaluation

Expand Down
24 changes: 24 additions & 0 deletions vendor/github.com/hashicorp/nomad/api/jobs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5a01ab3

Please sign in to comment.