Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job revert #2575

Merged
merged 8 commits into from
Apr 27, 2017
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 50 additions & 10 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (j *Jobs) Validate(job *Job, q *WriteOptions) (*JobValidateResponse, *Write
// of the evaluation, along with any errors encountered.
func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) {

var resp registerJobResponse
var resp JobRegisterResponse

req := &RegisterJobRequest{Job: job}
wm, err := j.client.write("/v1/jobs", req, &resp, q)
Expand All @@ -65,7 +65,7 @@ func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) {
// EnforceRegister is used to register a job enforcing its job modify index.
func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (string, *WriteMeta, error) {

var resp registerJobResponse
var resp JobRegisterResponse

req := &RegisterJobRequest{
Job: job,
Expand Down Expand Up @@ -153,7 +153,7 @@ func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *Query
// is deregistered and purged from the system versus still being queryable and
// eventually GC'ed from the system. Most callers should not specify purge.
func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *WriteMeta, error) {
var resp deregisterJobResponse
var resp JobDeregisterResponse
wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", jobID, purge), &resp, q)
if err != nil {
return "", nil, err
Expand All @@ -163,7 +163,7 @@ func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *W

// ForceEvaluate is used to force-evaluate an existing job.
func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp registerJobResponse
var resp JobRegisterResponse
wm, err := j.client.write("/v1/job/"+jobID+"/evaluate", nil, &resp, q)
if err != nil {
return "", nil, err
Expand Down Expand Up @@ -223,6 +223,25 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string,
return &resp, wm, nil
}

// Revert is used to revert the given job to the passed version. If
// enforceVersion is set, the job is only reverted if the current version is at
// the passed version.
func (j *Jobs) Revert(jobID string, version uint64, enforcePriorVersion *uint64,
q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {

var resp JobRegisterResponse
req := &JobRevertRequest{
JobID: jobID,
JobVersion: version,
EnforcePriorVersion: enforcePriorVersion,
}
wm, err := j.client.write("/v1/job/"+jobID+"/revert", req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}

// periodicForceResponse is used to deserialize a force response
type periodicForceResponse struct {
EvalID string
Expand Down Expand Up @@ -539,6 +558,21 @@ type JobValidateResponse struct {
Error string
}

// JobRevertRequest is used to revert a job to a prior version.
type JobRevertRequest struct {
// JobID is the ID of the job being reverted
JobID string

// JobVersion the version to revert to.
JobVersion uint64

// EnforcePriorVersion if set will enforce that the job is at the given
// version before reverting.
EnforcePriorVersion *uint64

WriteRequest
}

// JobUpdateRequest is used to update a job
type JobRegisterRequest struct {
Job *Job
Expand All @@ -558,14 +592,20 @@ type RegisterJobRequest struct {
JobModifyIndex uint64 `json:",omitempty"`
}

// registerJobResponse is used to deserialize a job response
type registerJobResponse struct {
EvalID string
// JobRegisterResponse is used to respond to a job registration
type JobRegisterResponse struct {
EvalID string
EvalCreateIndex uint64
JobModifyIndex uint64
QueryMeta
}

// deregisterJobResponse is used to decode a deregister response
type deregisterJobResponse struct {
EvalID string
// JobDeregisterResponse is used to respond to a job deregistration
type JobDeregisterResponse struct {
EvalID string
EvalCreateIndex uint64
JobModifyIndex uint64
QueryMeta
}

type JobPlanRequest struct {
Expand Down
48 changes: 48 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,54 @@ func TestJobs_EnforceRegister(t *testing.T) {
assertWriteMeta(t, wm)
}

func TestJobs_Revert(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()

// Register twice
job := testJob()
eval, wm, err := jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)

eval, wm, err = jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)

// Fail revert at incorrect enforce
_, wm, err = jobs.Revert(*job.ID, 0, helper.Uint64ToPtr(10), nil)
if err == nil || !strings.Contains(err.Error(), "enforcing version") {
t.Fatalf("expected enforcement error: %v", err)
}

// Works at correct index
revertResp, wm, err := jobs.Revert(*job.ID, 0, helper.Uint64ToPtr(1), nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if revertResp.EvalID == "" {
t.Fatalf("missing eval id")
}
if revertResp.EvalCreateIndex == 0 {
t.Fatalf("bad eval create index")
}
if revertResp.JobModifyIndex == 0 {
t.Fatalf("bad job modify index")
}
assertWriteMeta(t, wm)
}

func TestJobs_Info(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
Expand Down
32 changes: 32 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/versions"):
jobName := strings.TrimSuffix(path, "/versions")
return s.jobVersions(resp, req, jobName)
case strings.HasSuffix(path, "/revert"):
jobName := strings.TrimSuffix(path, "/revert")
return s.jobRevert(resp, req, jobName)
default:
return s.jobCRUD(resp, req, path)
}
Expand Down Expand Up @@ -360,6 +363,35 @@ func (s *HTTPServer) jobVersions(resp http.ResponseWriter, req *http.Request,
return out.Versions, nil
}

func (s *HTTPServer) jobRevert(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {

if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
}

var revertRequest structs.JobRevertRequest
if err := decodeBody(req, &revertRequest); err != nil {
return nil, CodedError(400, err.Error())
}
if revertRequest.JobID == "" {
return nil, CodedError(400, "JobID must be specified")
}
if revertRequest.JobID != jobName {
return nil, CodedError(400, "Job ID does not match")
}

s.parseRegion(req, &revertRequest.Region)

var out structs.JobRegisterResponse
if err := s.agent.RPC("Job.Revert", &revertRequest, &out); err != nil {
return nil, err
}

setMeta(resp, &out.QueryMeta)
return out, nil
}

func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Request, name string) (interface{}, error) {
args := structs.JobSummaryRequest{
JobID: name,
Expand Down
50 changes: 50 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,56 @@ func TestHTTP_JobDispatch(t *testing.T) {
})
}

func TestHTTP_JobRevert(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Create the job and register it twice
job := mock.Job()
regReq := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var regResp structs.JobRegisterResponse
if err := s.Agent.RPC("Job.Register", &regReq, &regResp); err != nil {
t.Fatalf("err: %v", err)
}

if err := s.Agent.RPC("Job.Register", &regReq, &regResp); err != nil {
t.Fatalf("err: %v", err)
}

args := structs.JobRevertRequest{
JobID: job.ID,
JobVersion: 0,
WriteRequest: structs.WriteRequest{Region: "global"},
}
buf := encodeReq(args)

// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/job/"+job.ID+"/revert", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()

// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}

// Check the response
revertResp := obj.(structs.JobRegisterResponse)
if revertResp.EvalID == "" {
t.Fatalf("bad: %v", revertResp)
}

// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
})
}

func TestJobs_ApiJobToStructsJob(t *testing.T) {
apiJob := &api.Job{
Stop: helper.BoolToPtr(true),
Expand Down
64 changes: 62 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ func (j *Job) Summary(args *structs.JobSummaryRequest,
}

// Validate validates a job
func (j *Job) Validate(args *structs.JobValidateRequest,
reply *structs.JobValidateResponse) error {
func (j *Job) Validate(args *structs.JobValidateRequest, reply *structs.JobValidateResponse) error {
defer metrics.MeasureSince([]string{"nomad", "job", "validate"}, time.Now())

if err := validateJob(args.Job); err != nil {
if merr, ok := err.(*multierror.Error); ok {
Expand All @@ -300,10 +300,70 @@ func (j *Job) Validate(args *structs.JobValidateRequest,
reply.Error = err.Error()
}
}

reply.DriverConfigValidated = true
return nil
}

// Revert is used to revert the job to a prior version
func (j *Job) Revert(args *structs.JobRevertRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Revert", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "revert"}, time.Now())

// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
}

// Lookup the job by version
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}

ws := memdb.NewWatchSet()
jobV, err := snap.JobByIDAndVersion(ws, args.JobID, args.JobVersion)
if err != nil {
return err
}
if jobV == nil {
return fmt.Errorf("job %q at version %d not found", args.JobID, args.JobVersion)
}

cur, err := snap.JobByID(ws, args.JobID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would check this first, since you expect to get the "job not found" error before the "job at version" not found error.

if err != nil {
return err
}
if cur == nil {
return fmt.Errorf("job %q not found", args.JobID)
}

if args.JobVersion == cur.Version {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure JobByID's docs explain it returns the "latest" or "current" version of a Job ID now that an ID doesn't uniquely identify exactly one *structs.Job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! Done.

return fmt.Errorf("can't revert to current version")
}

// Build the register request
reg := &structs.JobRegisterRequest{
Job: jobV.Copy(),
WriteRequest: args.WriteRequest,
}

// If the request is enforcing the existing version do a check.
if args.EnforcePriorVersion != nil {
if cur.Version != *args.EnforcePriorVersion {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is racy, and probably should be evaluated inside the FSM. Effectively this is a check-and-set, but multiple concurrent calls to this API will be allowed to make progress depending on the timing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I realized that when implementing this. The register endpoint should push the logic from the RPC layer down into the FSM. This will fix this issue for both this and the register. Just out of scope for this PR. I will put a comment on that part of the code though

return fmt.Errorf("Current job has version %d; enforcing version %d", cur.Version, *args.EnforcePriorVersion)
}

reg.EnforceIndex = true
reg.JobModifyIndex = cur.JobModifyIndex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modify index will get overwritten at Upsert time right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those two are the way to do the CAS operation on the register

}

// Register the version.
return j.Register(reg, reply)
}

// Evaluate is used to force a job for re-evaluation
func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done {
Expand Down
Loading