Skip to content

Commit

Permalink
Support marking jobs as failed & unretryable
Browse files Browse the repository at this point in the history
Adds a new "retryable" argument to HandleStatusCallback to indicate that the
failure is not retryable, and the job shouldn't be re-enqueued. Adds a test at
the services and at the controller level to test this. Moves the record status
endpoint to its own file, and adds a basic test for it.

Fixes #1.
  • Loading branch information
Kevin Burke committed May 30, 2016
1 parent fb780e9 commit dba4e9a
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 87 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Version 0.34

- The 0.33 git tag doesn't compile due to the error fixed here:
https://github.com/Shyp/bump_version/commit/2dc60a73949ae5e42468d475a90e76619dbc67a6.
Adds regression tests to ensure this doesn't happen again.

## Version 0.33

- All uses of `Id` have been renamed to `ID`, per the Go Code Review Comments
Expand Down
91 changes: 91 additions & 0 deletions server/record_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package server

import (
"encoding/json"
"fmt"
"net/http"

"github.com/Shyp/rickover/Godeps/_workspace/src/github.com/Shyp/go-simple-metrics"
"github.com/Shyp/rickover/models"
"github.com/Shyp/rickover/models/queued_jobs"
"github.com/Shyp/rickover/rest"
"github.com/Shyp/rickover/services"
)

// jobStatusUpdater satisfies the Handler interface.
type jobStatusUpdater struct{}

// The body of a POST request to /v1/jobs/:job-name/:job-id, recording the
// status of a job.
type JobStatusRequest struct {
// Should be "succeeded" or "failed".
Status models.JobStatus `json:"status"`

// Attempt is sent to ensure we don't attempt a null write.
Attempt *uint8 `json:"attempt"` // pointer to distinguish between null/omitted value and 0.

// Retryable indicates whether a failure is retryable. The default is true.
// Set to false to avoid retrying a particular failure.
Retryable *bool `json:"retryable"` // pointer to distinguish between null value and false.
}

// POST /v1/jobs/:name/:id
//
// Update a job's status with success or failure
func (j *jobStatusUpdater) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Body == nil {
badRequest(w, r, createEmptyErr("status", r.URL.Path))
return
}
defer r.Body.Close()
var jsr JobStatusRequest
err := json.NewDecoder(r.Body).Decode(&jsr)
if err != nil {
badRequest(w, r, &rest.Error{
ID: "invalid_request",
Title: "Invalid request: bad JSON. Double check the types of the fields you sent",
})
return
}
if jsr.Status == "" {
badRequest(w, r, createEmptyErr("status", r.URL.Path))
return
}
if jsr.Attempt == nil {
badRequest(w, r, createEmptyErr("attempt", r.URL.Path))
return
}
if jsr.Status != models.StatusSucceeded && jsr.Status != models.StatusFailed {
badRequest(w, r, &rest.Error{
ID: "invalid_status",
Title: fmt.Sprintf("Invalid job status: %s", jsr.Status),
Instance: r.URL.Path,
})
return
}
name := jobIdRoute.FindStringSubmatch(r.URL.Path)[1]
idStr := jobIdRoute.FindStringSubmatch(r.URL.Path)[2]
id, wroteResponse := getId(w, r, idStr)
if wroteResponse == true {
return
}
if jsr.Retryable == nil {
// http://stackoverflow.com/q/30716354/329700
jsr.Retryable = func() *bool { b := true; return &b }()
}
err = services.HandleStatusCallback(id, name, jsr.Status, *jsr.Attempt, *jsr.Retryable)
if err == nil {
w.WriteHeader(http.StatusOK)
} else if err == queued_jobs.ErrNotFound {
badRequest(w, r, &rest.Error{
ID: "duplicate_status_request",
Title: "This job has already been archived, or was never queued",
Instance: r.URL.Path,
})
metrics.Increment("status_callback.duplicate")
return
} else {
writeServerError(w, r, err)
metrics.Increment("status_callback.error")
}
}
24 changes: 24 additions & 0 deletions server/record_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package server

import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"github.com/Shyp/rickover/rest"
"github.com/Shyp/rickover/test"
)

func TestNoBody400(t *testing.T) {
t.Parallel()
req, _ := http.NewRequest("POST", "/v1/jobs/echo/job_123", nil)
req.SetBasicAuth("test", "password")
w := httptest.NewRecorder()
Get(u).ServeHTTP(w, req)
test.AssertEquals(t, w.Code, http.StatusBadRequest)
var err rest.Error
e := json.Unmarshal(w.Body.Bytes(), &err)
test.AssertNotError(t, e, "unmarshaling body")
test.AssertEquals(t, err.ID, "missing_parameter")
}
71 changes: 0 additions & 71 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/Shyp/rickover/models/jobs"
"github.com/Shyp/rickover/models/queued_jobs"
"github.com/Shyp/rickover/rest"
"github.com/Shyp/rickover/services"
)

// TODO(burke) use http.LimitedBytesReader.
Expand Down Expand Up @@ -391,76 +390,6 @@ func (j *jobStatusGetter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
go metrics.Increment("job.get.archived.success")
}

// jobStatusUpdater satisfies the Handler interface.
type jobStatusUpdater struct{}

// The body of a POST request to /v1/jobs/:job-name/:job-id, recording the
// status of a job.
type JobStatusRequest struct {
// Should be "succeeded" or "failed".
Status models.JobStatus `json:"status"`

// Attempt is sent to ensure we don't attempt a null write.
Attempt *uint8 `json:"attempt"` // pointer to distinguish between null/omitted value and 0.
}

// POST /v1/jobs/:name/:id
//
// Update a job's status with success or failure
func (j *jobStatusUpdater) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Body == nil {
badRequest(w, r, createEmptyErr("status", r.URL.Path))
return
}
defer r.Body.Close()
var jsr JobStatusRequest
err := json.NewDecoder(r.Body).Decode(&jsr)
if err != nil {
badRequest(w, r, &rest.Error{
ID: "invalid_request",
Title: "Invalid request: bad JSON. Double check the types of the fields you sent",
})
return
}
if jsr.Status == "" {
badRequest(w, r, createEmptyErr("status", r.URL.Path))
return
}
if jsr.Attempt == nil {
badRequest(w, r, createEmptyErr("attempt", r.URL.Path))
return
}
if jsr.Status != models.StatusSucceeded && jsr.Status != models.StatusFailed {
badRequest(w, r, &rest.Error{
ID: "invalid_status",
Title: fmt.Sprintf("Invalid job status: %s", jsr.Status),
Instance: r.URL.Path,
})
return
}
name := jobIdRoute.FindStringSubmatch(r.URL.Path)[1]
idStr := jobIdRoute.FindStringSubmatch(r.URL.Path)[2]
id, wroteResponse := getId(w, r, idStr)
if wroteResponse == true {
return
}
err = services.HandleStatusCallback(id, name, jsr.Status, *jsr.Attempt)
if err == nil {
w.WriteHeader(http.StatusOK)
} else if err == queued_jobs.ErrNotFound {
badRequest(w, r, &rest.Error{
ID: "duplicate_status_request",
Title: "This job has already been archived, or was never queued",
Instance: r.URL.Path,
})
metrics.Increment("status_callback.duplicate")
return
} else {
writeServerError(w, r, err)
metrics.Increment("status_callback.error")
}
}

// jobEnqueuer satisfies the Handler interface.
type jobEnqueuer struct{}

Expand Down
2 changes: 1 addition & 1 deletion services/fail_stuck_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func ArchiveStuckJobs(olderThan time.Duration) error {
return err
}
for _, qj := range jobs {
err = HandleStatusCallback(qj.ID, qj.Name, models.StatusFailed, qj.Attempts)
err = HandleStatusCallback(qj.ID, qj.Name, models.StatusFailed, qj.Attempts, true)
if err == nil {
log.Printf("Found stuck job %s and marked it as failed", qj.ID.String())
} else {
Expand Down
4 changes: 2 additions & 2 deletions services/process_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (jp *JobProcessor) DoWork(qj *models.QueuedJob) error {
// requests until the new server is ready, and we see a timeout.
return waitForJob(qj, jp.Timeout)
} else {
return HandleStatusCallback(qj.ID, qj.Name, models.StatusFailed, qj.Attempts)
return HandleStatusCallback(qj.ID, qj.Name, models.StatusFailed, qj.Attempts, true)
}
}
return waitForJob(qj, jp.Timeout)
Expand Down Expand Up @@ -157,7 +157,7 @@ func waitForJob(qj *models.QueuedJob, failTimeout time.Duration) error {
case <-timeoutChan:
go metrics.Increment(fmt.Sprintf("wait_for_job.%s.timeout", name))
log.Printf("5 minutes elapsed, marking %s (type %s) as failed", idStr, name)
err := HandleStatusCallback(qj.ID, name, models.StatusFailed, currentAttemptCount)
err := HandleStatusCallback(qj.ID, name, models.StatusFailed, currentAttemptCount, true)
go metrics.Increment(fmt.Sprintf("wait_for_job.%s.failed", name))
log.Printf("job %s (type %s) timed out after %v", idStr, name, time.Since(start))
if err == sql.ErrNoRows {
Expand Down
8 changes: 4 additions & 4 deletions services/status_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// already exists, the queued job no longer exists by the time you attempt to
// delete it, the number of attempts for the queued job don't match up with the
// passed in value (slow)
func HandleStatusCallback(id types.PrefixUUID, name string, status models.JobStatus, attempt uint8) error {
func HandleStatusCallback(id types.PrefixUUID, name string, status models.JobStatus, attempt uint8, retryable bool) error {
if status == models.StatusSucceeded {
err := createAndDelete(id, name, models.StatusSucceeded, attempt)
if err != nil {
Expand All @@ -40,7 +40,7 @@ func HandleStatusCallback(id types.PrefixUUID, name string, status models.JobSta
}
return err
} else if status == models.StatusFailed {
err := handleFailedCallback(id, name, attempt)
err := handleFailedCallback(id, name, attempt, retryable)
if err != nil {
go metrics.Increment("archived_job.create.failed.error")
} else {
Expand Down Expand Up @@ -88,9 +88,9 @@ func getRunAfter(totalAttempts, remainingAttempts uint8) time.Time {
return time.Now().UTC().Add(time.Duration(math.Pow(2, float64(backoff))) * time.Second)
}

func handleFailedCallback(id types.PrefixUUID, name string, attempt uint8) error {
func handleFailedCallback(id types.PrefixUUID, name string, attempt uint8, retryable bool) error {
remainingAttempts := attempt - 1
if remainingAttempts == 0 {
if retryable == false || remainingAttempts == 0 {
return createAndDelete(id, name, models.StatusFailed, remainingAttempts)
}
job, err := jobs.GetRetry(name, 3)
Expand Down
2 changes: 1 addition & 1 deletion test/queued_jobs/queued_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestEnqueueUnknownJobTypeErrNoRows(t *testing.T) {
func TestEnqueueWithExistingArchivedJobFails(t *testing.T) {
qj := factory.CreateQueuedJob(t, factory.EmptyData)
defer test.TearDown(t)
err := services.HandleStatusCallback(qj.ID, qj.Name, models.StatusSucceeded, qj.Attempts)
err := services.HandleStatusCallback(qj.ID, qj.Name, models.StatusSucceeded, qj.Attempts, true)
test.AssertNotError(t, err, "")
expiresAt := types.NullTime{Valid: false}
runAfter := time.Now().UTC()
Expand Down
28 changes: 28 additions & 0 deletions test/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (

"github.com/Shyp/rickover/Godeps/_workspace/src/github.com/Shyp/go-types"
"github.com/Shyp/rickover/models"
"github.com/Shyp/rickover/models/archived_jobs"
"github.com/Shyp/rickover/models/jobs"
"github.com/Shyp/rickover/models/queued_jobs"
"github.com/Shyp/rickover/server"
"github.com/Shyp/rickover/test"
"github.com/Shyp/rickover/test/factory"
Expand All @@ -38,6 +40,32 @@ func TestGoodRequestReturns200(t *testing.T) {
test.AssertEquals(t, w.Code, http.StatusOK)
}

func TestFailedUnretryableArchivesJob(t *testing.T) {
t.Parallel()
defer test.TearDown(t)
qj := factory.CreateQJ(t)
w := httptest.NewRecorder()
jsr := &server.JobStatusRequest{
Status: "failed",
Retryable: func() *bool { b := false; return &b }(),
Attempt: &qj.Attempts,
}
b := new(bytes.Buffer)
json.NewEncoder(b).Encode(jsr)
path := fmt.Sprintf("/v1/jobs/%s/%s", qj.Name, qj.ID.String())
req, _ := http.NewRequest("POST", path, b)
req.SetBasicAuth("foo", "bar")
server.Get(u).ServeHTTP(w, req)
test.AssertEquals(t, w.Code, 200)

_, err := queued_jobs.Get(qj.ID)
test.AssertEquals(t, err, queued_jobs.ErrNotFound)
aj, err := archived_jobs.Get(qj.ID)
test.AssertNotError(t, err, "finding archived job")
test.AssertEquals(t, aj.Status, models.StatusFailed)
test.AssertEquals(t, aj.Attempts, 22)
}

var validRequest = server.CreateJobRequest{
Name: "email-signup",
DeliveryStrategy: models.StrategyAtLeastOnce,
Expand Down
6 changes: 3 additions & 3 deletions test/services/process_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestWorkerRetriesJSON503(t *testing.T) {
test.AssertNotError(t, err, "")

// Cheating, hit the internal success callback.
callbackErr := services.HandleStatusCallback(qj.ID, "echo", models.StatusSucceeded, uint8(5))
callbackErr := services.HandleStatusCallback(qj.ID, "echo", models.StatusSucceeded, uint8(5), true)
test.AssertNotError(t, callbackErr, "")
}
}))
Expand All @@ -126,7 +126,7 @@ func TestWorkerWaitsConnectTimeout(t *testing.T) {

qj := factory.CreateQueuedJob(t, factory.EmptyData)
go func() {
err := services.HandleStatusCallback(qj.ID, qj.Name, models.StatusSucceeded, qj.Attempts)
err := services.HandleStatusCallback(qj.ID, qj.Name, models.StatusSucceeded, qj.Attempts, true)
test.AssertNotError(t, err, "")
}()

Expand Down Expand Up @@ -156,7 +156,7 @@ func TestWorkerWaitsRequestTimeout(t *testing.T) {

qj := factory.CreateQueuedJob(t, factory.EmptyData)
go func() {
err := services.HandleStatusCallback(qj.ID, qj.Name, models.StatusSucceeded, qj.Attempts)
err := services.HandleStatusCallback(qj.ID, qj.Name, models.StatusSucceeded, qj.Attempts, true)
test.AssertNotError(t, err, "")
}()

Expand Down
Loading

0 comments on commit dba4e9a

Please sign in to comment.