Skip to content

Commit

Permalink
Merge pull request #2575 from hashicorp/f-job-revert
Browse files Browse the repository at this point in the history
Job revert
  • Loading branch information
dadgar authored Apr 27, 2017
2 parents af5a678 + 8d01a4b commit 404626c
Show file tree
Hide file tree
Showing 9 changed files with 458 additions and 22 deletions.
60 changes: 50 additions & 10 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (j *Jobs) Validate(job *Job, q *WriteOptions) (*JobValidateResponse, *Write
// of the evaluation, along with any errors encountered.
func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) {

var resp registerJobResponse
var resp JobRegisterResponse

req := &RegisterJobRequest{Job: job}
wm, err := j.client.write("/v1/jobs", req, &resp, q)
Expand All @@ -65,7 +65,7 @@ func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) {
// EnforceRegister is used to register a job enforcing its job modify index.
func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (string, *WriteMeta, error) {

var resp registerJobResponse
var resp JobRegisterResponse

req := &RegisterJobRequest{
Job: job,
Expand Down Expand Up @@ -153,7 +153,7 @@ func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *Query
// is deregistered and purged from the system versus still being queryable and
// eventually GC'ed from the system. Most callers should not specify purge.
func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *WriteMeta, error) {
var resp deregisterJobResponse
var resp JobDeregisterResponse
wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", jobID, purge), &resp, q)
if err != nil {
return "", nil, err
Expand All @@ -163,7 +163,7 @@ func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *W

// ForceEvaluate is used to force-evaluate an existing job.
func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp registerJobResponse
var resp JobRegisterResponse
wm, err := j.client.write("/v1/job/"+jobID+"/evaluate", nil, &resp, q)
if err != nil {
return "", nil, err
Expand Down Expand Up @@ -223,6 +223,25 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string,
return &resp, wm, nil
}

// Revert is used to revert the given job to the passed version. If
// enforceVersion is set, the job is only reverted if the current version is at
// the passed version.
func (j *Jobs) Revert(jobID string, version uint64, enforcePriorVersion *uint64,
q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {

var resp JobRegisterResponse
req := &JobRevertRequest{
JobID: jobID,
JobVersion: version,
EnforcePriorVersion: enforcePriorVersion,
}
wm, err := j.client.write("/v1/job/"+jobID+"/revert", req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}

// periodicForceResponse is used to deserialize a force response
type periodicForceResponse struct {
EvalID string
Expand Down Expand Up @@ -539,6 +558,21 @@ type JobValidateResponse struct {
Error string
}

// JobRevertRequest is used to revert a job to a prior version.
type JobRevertRequest struct {
// JobID is the ID of the job being reverted
JobID string

// JobVersion the version to revert to.
JobVersion uint64

// EnforcePriorVersion if set will enforce that the job is at the given
// version before reverting.
EnforcePriorVersion *uint64

WriteRequest
}

// JobUpdateRequest is used to update a job
type JobRegisterRequest struct {
Job *Job
Expand All @@ -558,14 +592,20 @@ type RegisterJobRequest struct {
JobModifyIndex uint64 `json:",omitempty"`
}

// registerJobResponse is used to deserialize a job response
type registerJobResponse struct {
EvalID string
// JobRegisterResponse is used to respond to a job registration
type JobRegisterResponse struct {
EvalID string
EvalCreateIndex uint64
JobModifyIndex uint64
QueryMeta
}

// deregisterJobResponse is used to decode a deregister response
type deregisterJobResponse struct {
EvalID string
// JobDeregisterResponse is used to respond to a job deregistration
type JobDeregisterResponse struct {
EvalID string
EvalCreateIndex uint64
JobModifyIndex uint64
QueryMeta
}

type JobPlanRequest struct {
Expand Down
48 changes: 48 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,54 @@ func TestJobs_EnforceRegister(t *testing.T) {
assertWriteMeta(t, wm)
}

func TestJobs_Revert(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()

// Register twice
job := testJob()
eval, wm, err := jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)

eval, wm, err = jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)

// Fail revert at incorrect enforce
_, wm, err = jobs.Revert(*job.ID, 0, helper.Uint64ToPtr(10), nil)
if err == nil || !strings.Contains(err.Error(), "enforcing version") {
t.Fatalf("expected enforcement error: %v", err)
}

// Works at correct index
revertResp, wm, err := jobs.Revert(*job.ID, 0, helper.Uint64ToPtr(1), nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if revertResp.EvalID == "" {
t.Fatalf("missing eval id")
}
if revertResp.EvalCreateIndex == 0 {
t.Fatalf("bad eval create index")
}
if revertResp.JobModifyIndex == 0 {
t.Fatalf("bad job modify index")
}
assertWriteMeta(t, wm)
}

func TestJobs_Info(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
Expand Down
32 changes: 32 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/versions"):
jobName := strings.TrimSuffix(path, "/versions")
return s.jobVersions(resp, req, jobName)
case strings.HasSuffix(path, "/revert"):
jobName := strings.TrimSuffix(path, "/revert")
return s.jobRevert(resp, req, jobName)
default:
return s.jobCRUD(resp, req, path)
}
Expand Down Expand Up @@ -360,6 +363,35 @@ func (s *HTTPServer) jobVersions(resp http.ResponseWriter, req *http.Request,
return out.Versions, nil
}

func (s *HTTPServer) jobRevert(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {

if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
}

var revertRequest structs.JobRevertRequest
if err := decodeBody(req, &revertRequest); err != nil {
return nil, CodedError(400, err.Error())
}
if revertRequest.JobID == "" {
return nil, CodedError(400, "JobID must be specified")
}
if revertRequest.JobID != jobName {
return nil, CodedError(400, "Job ID does not match")
}

s.parseRegion(req, &revertRequest.Region)

var out structs.JobRegisterResponse
if err := s.agent.RPC("Job.Revert", &revertRequest, &out); err != nil {
return nil, err
}

setMeta(resp, &out.QueryMeta)
return out, nil
}

func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Request, name string) (interface{}, error) {
args := structs.JobSummaryRequest{
JobID: name,
Expand Down
50 changes: 50 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,56 @@ func TestHTTP_JobDispatch(t *testing.T) {
})
}

func TestHTTP_JobRevert(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Create the job and register it twice
job := mock.Job()
regReq := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var regResp structs.JobRegisterResponse
if err := s.Agent.RPC("Job.Register", &regReq, &regResp); err != nil {
t.Fatalf("err: %v", err)
}

if err := s.Agent.RPC("Job.Register", &regReq, &regResp); err != nil {
t.Fatalf("err: %v", err)
}

args := structs.JobRevertRequest{
JobID: job.ID,
JobVersion: 0,
WriteRequest: structs.WriteRequest{Region: "global"},
}
buf := encodeReq(args)

// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/job/"+job.ID+"/revert", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()

// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}

// Check the response
revertResp := obj.(structs.JobRegisterResponse)
if revertResp.EvalID == "" {
t.Fatalf("bad: %v", revertResp)
}

// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
})
}

func TestJobs_ApiJobToStructsJob(t *testing.T) {
apiJob := &api.Job{
Stop: helper.BoolToPtr(true),
Expand Down
63 changes: 61 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ func (j *Job) Summary(args *structs.JobSummaryRequest,
}

// Validate validates a job
func (j *Job) Validate(args *structs.JobValidateRequest,
reply *structs.JobValidateResponse) error {
func (j *Job) Validate(args *structs.JobValidateRequest, reply *structs.JobValidateResponse) error {
defer metrics.MeasureSince([]string{"nomad", "job", "validate"}, time.Now())

if err := validateJob(args.Job); err != nil {
if merr, ok := err.(*multierror.Error); ok {
Expand All @@ -300,10 +300,69 @@ func (j *Job) Validate(args *structs.JobValidateRequest,
reply.Error = err.Error()
}
}

reply.DriverConfigValidated = true
return nil
}

// Revert is used to revert the job to a prior version
func (j *Job) Revert(args *structs.JobRevertRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Revert", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "revert"}, time.Now())

// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
}

// Lookup the job by version
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}

ws := memdb.NewWatchSet()
cur, err := snap.JobByID(ws, args.JobID)
if err != nil {
return err
}
if cur == nil {
return fmt.Errorf("job %q not found", args.JobID)
}
if args.JobVersion == cur.Version {
return fmt.Errorf("can't revert to current version")
}

jobV, err := snap.JobByIDAndVersion(ws, args.JobID, args.JobVersion)
if err != nil {
return err
}
if jobV == nil {
return fmt.Errorf("job %q at version %d not found", args.JobID, args.JobVersion)
}

// Build the register request
reg := &structs.JobRegisterRequest{
Job: jobV.Copy(),
WriteRequest: args.WriteRequest,
}

// If the request is enforcing the existing version do a check.
if args.EnforcePriorVersion != nil {
if cur.Version != *args.EnforcePriorVersion {
return fmt.Errorf("Current job has version %d; enforcing version %d", cur.Version, *args.EnforcePriorVersion)
}

reg.EnforceIndex = true
reg.JobModifyIndex = cur.JobModifyIndex
}

// Register the version.
return j.Register(reg, reply)
}

// Evaluate is used to force a job for re-evaluation
func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done {
Expand Down
Loading

0 comments on commit 404626c

Please sign in to comment.