Skip to content

Commit

Permalink
Reset dealbot tasks when dealbot comes on line (#231)
Browse files Browse the repository at this point in the history
* feat(state): reset worker tasks method

add method to reset all of a workers tasks to be unassigned (as in start over) in case the worker
was shut down without draining

* feat(controller): endpoint for reset workers

add controller endpoint for reset workers state operation

* feat(engine): reset worker tasks on startup

run ResetWorker when a worker starts up
  • Loading branch information
hannahhoward authored Jun 18, 2021
1 parent 4f6a5fa commit 637aeb2
Show file tree
Hide file tree
Showing 10 changed files with 398 additions and 9 deletions.
13 changes: 13 additions & 0 deletions controller/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,19 @@ func (c *Client) Drain(ctx context.Context, worker string) error {
return nil
}

func (c *Client) ResetWorker(ctx context.Context, worker string) error {
resp, err := c.request(ctx, "POST", "/reset-worker/"+worker, nil)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return ErrRequestFailed{resp.StatusCode}
}
return nil
}

func (c *Client) Complete(ctx context.Context, worker string) error {
resp, err := c.request(ctx, "POST", "/complete/"+worker, nil)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func NewWithDependencies(listener, graphqlListener net.Listener, gqlToken string
return nil, err
}
r.HandleFunc("/drain/{workedby}", srv.drainHandler).Methods("POST")
r.HandleFunc("/reset-worker/{workedby}", srv.resetWorkerHandler).Methods("POST")
r.HandleFunc("/complete/{workedby}", srv.completeHandler).Methods("POST")
r.HandleFunc("/pop-task", srv.popTaskHandler).Methods("POST")
r.HandleFunc("/tasks", srv.getTasksHandler).Methods("GET")
Expand Down
131 changes: 127 additions & 4 deletions controller/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
Expand Down Expand Up @@ -32,6 +33,8 @@ func TestControllerHTTPInterface(t *testing.T) {
ctx := context.Background()
testCases := map[string]func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder){
"list and update tasks": func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder) {
require.NoError(t, populateTestTasksFromFile(ctx, jsonTestDeals, apiClient))

currentTasks, err := apiClient.ListTasks(ctx)
require.NoError(t, err)
require.Len(t, currentTasks, 4)
Expand Down Expand Up @@ -83,6 +86,8 @@ func TestControllerHTTPInterface(t *testing.T) {
recorder.AssertExactObservedStatuses(t, mustString(currentTasks[1].UUID.AsString()), tasks.Successful)
},
"pop a task": func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder) {
require.NoError(t, populateTestTasksFromFile(ctx, jsonTestDeals, apiClient))

updatedTask, err := apiClient.PopTask(ctx, tasks.Type.PopTask.Of("dealbot 1", tasks.InProgress))
require.NoError(t, err)
require.Equal(t, *tasks.InProgress, updatedTask.Status)
Expand All @@ -101,6 +106,8 @@ func TestControllerHTTPInterface(t *testing.T) {
}
},
"creating tasks": func(ctx context.Context, t *testing.T, apiClient *client.Client, _ *testrecorder.TestMetricsRecorder) {
require.NoError(t, populateTestTasksFromFile(ctx, jsonTestDeals, apiClient))

newStorageTask := tasks.Type.StorageTask.Of("t01000",
100000000000000000, // 0.10 FIL
2048, // 1kb
Expand Down Expand Up @@ -135,6 +142,8 @@ func TestControllerHTTPInterface(t *testing.T) {
require.Len(t, currentTasks, 6)
},
"export finished tasks": func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder) {
require.NoError(t, populateTestTasksFromFile(ctx, jsonTestDeals, apiClient))

// dealbot1 takes a task.
task, err := apiClient.PopTask(ctx, tasks.Type.PopTask.Of("dealbot1", tasks.InProgress))
require.NoError(t, err)
Expand All @@ -157,6 +166,118 @@ func TestControllerHTTPInterface(t *testing.T) {
_, err = carContents.Next()
require.NoError(t, err)
},
"reset worker tasks": func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder) {

var resetWorkerTasks = `
[{"Miner":"t01000","PayloadCID":"bafk2bzacedli6qxp43sf54feczjd26jgeyfxv4ucwylujd3xo5s6cohcqbg36","CARExport":false},
{"Miner":"t01000","PayloadCID":"bafk2bzacecettil4umy443e4ferok7jbxiqqseef7soa3ntelflf3zkvvndbg","CARExport":false},
{"Miner":"f0127896","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false},
{"Miner":"f0127897","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false},
{"Miner":"f0127898","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false},
{"Miner":"f0127899","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false},
{"Miner":"f0127900","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false}]
`
require.NoError(t, populateTestTasks(ctx, bytes.NewReader([]byte(resetWorkerTasks)), apiClient))

worker := fmt.Sprintf("tester")
otherWorker := fmt.Sprintf("other-worker")

// pop two tasks and leave them in progress
req := tasks.Type.PopTask.Of(worker, tasks.InProgress)
inProgressTask1, err := apiClient.PopTask(ctx, req)
require.NoError(t, err)
inProgressTask2, err := apiClient.PopTask(ctx, req)
require.NoError(t, err)

// add some stage logs to the second task
stage1 := tasks.Type.StageDetails.Of("Doing Stuff", "A good long while").WithLog("stuff happened")
stage2 := tasks.Type.StageDetails.Of("Doing More Stuff", "A good long while").WithLog("more stuff happened")

_, err = apiClient.UpdateTask(ctx, inProgressTask2.GetUUID(),
tasks.Type.UpdateTask.OfStage(inProgressTask2.WorkedBy.Must().String(), tasks.InProgress, "", "Stage1", stage1, 1))
require.NoError(t, err)
_, err = apiClient.UpdateTask(ctx, inProgressTask2.GetUUID(),
tasks.Type.UpdateTask.OfStage(inProgressTask2.WorkedBy.Must().String(), tasks.InProgress, "", "Stage2", stage2, 1))
require.NoError(t, err)

// pop a task and set it failed
req = tasks.Type.PopTask.Of(worker, tasks.Failed)
failedTask, err := apiClient.PopTask(ctx, req)
require.NoError(t, err)

// pop a task and set it successful
req = tasks.Type.PopTask.Of(worker, tasks.Successful)
successfulTask, err := apiClient.PopTask(ctx, req)
require.NoError(t, err)

// pop two tasks to the other worker and leave them in progress
req = tasks.Type.PopTask.Of(otherWorker, tasks.InProgress)
otherWorkerTask1, err := apiClient.PopTask(ctx, req)
require.NoError(t, err)
otherWorkerTask2, err := apiClient.PopTask(ctx, req)
require.NoError(t, err)

allTasks, err := apiClient.ListTasks(ctx)
require.NoError(t, err)

// find the remaining unassigned task
var unassignedTask tasks.Task
for _, task := range allTasks {
if task.Status == *tasks.Available {
unassignedTask = task
break
}
}
require.NotNil(t, unassignedTask)

apiClient.ResetWorker(ctx, worker)

// in progress task should now be available and unassigned
inProgressTask1, err = apiClient.GetTask(ctx, inProgressTask1.GetUUID())
require.Equal(t, *tasks.Available, inProgressTask1.Status)
require.Equal(t, "", inProgressTask1.WorkedBy.Must().String())

// in progress task should now be available and unassigned,
// and stage logs should be wiped
inProgressTask2, err = apiClient.GetTask(ctx, inProgressTask2.GetUUID())
require.Equal(t, *tasks.Available, inProgressTask2.Status)
require.Equal(t, "", inProgressTask2.WorkedBy.Must().String())
require.Equal(t, "", inProgressTask2.Stage.String())
require.False(t, inProgressTask2.CurrentStageDetails.Exists())
require.False(t, inProgressTask2.PastStageDetails.Exists())

// successful and failed records should not change
successfulTask, err = apiClient.GetTask(ctx, successfulTask.GetUUID())
require.Equal(t, *tasks.Successful, successfulTask.Status)
require.Equal(t, worker, successfulTask.WorkedBy.Must().String())
failedTask, err = apiClient.GetTask(ctx, failedTask.GetUUID())
require.Equal(t, *tasks.Failed, failedTask.Status)
require.Equal(t, worker, failedTask.WorkedBy.Must().String())

// tasks for other worker should not change
otherWorkerTask1, err = apiClient.GetTask(ctx, otherWorkerTask1.GetUUID())
require.Equal(t, *tasks.InProgress, otherWorkerTask1.Status)
require.Equal(t, otherWorker, otherWorkerTask1.WorkedBy.Must().String())
otherWorkerTask2, err = apiClient.GetTask(ctx, otherWorkerTask2.GetUUID())
require.Equal(t, *tasks.InProgress, otherWorkerTask2.Status)
require.Equal(t, otherWorker, otherWorkerTask2.WorkedBy.Must().String())

// unassigned task should not chang
unassignedTask, err = apiClient.GetTask(ctx, unassignedTask.GetUUID())
require.Equal(t, *tasks.Available, unassignedTask.Status)
require.Equal(t, "", unassignedTask.WorkedBy.Must().String())

// try assigning a task -- should reassign first newly available task
req = tasks.Type.PopTask.Of(otherWorker, tasks.InProgress)
newInProgressTask1, err := apiClient.PopTask(ctx, req)
require.NoError(t, err)
require.Equal(t, inProgressTask1.GetUUID(), newInProgressTask1.GetUUID())

req = tasks.Type.PopTask.Of(worker, tasks.InProgress)
newInProgressTask2, err := apiClient.PopTask(ctx, req)
require.NoError(t, err)
require.Equal(t, inProgressTask2.GetUUID(), newInProgressTask2.GetUUID())
},
}

for testCase, run := range testCases {
Expand Down Expand Up @@ -206,18 +327,20 @@ func newHarness(ctx context.Context, t *testing.T) *harness {
}
}()

// populate test tasks
require.NoError(t, populateTestTasks(ctx, jsonTestDeals, h.apiClient))

return h
}

func populateTestTasks(ctx context.Context, jsonTests string, apiClient *client.Client) error {
func populateTestTasksFromFile(ctx context.Context, jsonTests string, apiClient *client.Client) error {
sampleTaskFile, err := os.Open(jsonTestDeals)
if err != nil {
return err
}
defer sampleTaskFile.Close()
return populateTestTasks(ctx, sampleTaskFile, apiClient)
}

func populateTestTasks(ctx context.Context, sampleTaskFile io.Reader, apiClient *client.Client) error {

sampleTasks, err := ioutil.ReadAll(sampleTaskFile)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions controller/state/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type State interface {
NewRetrievalTask(ctx context.Context, retrievalTask tasks.RetrievalTask) (tasks.Task, error)
DrainWorker(ctx context.Context, worker string) error
PublishRecordsFrom(ctx context.Context, worker string) error
ResetWorkerTasks(ctx context.Context, worker string) error
Store(ctx context.Context) Store
}

Expand Down
60 changes: 60 additions & 0 deletions controller/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,3 +803,63 @@ func (s *stateDB) DrainWorker(ctx context.Context, worker string) error {
}
return nil
}

// ResetWorkerTasks finds all in progress tasks for a worker and resets them to as if they had never been run
func (s *stateDB) ResetWorkerTasks(ctx context.Context, worker string) error {
var resetTasks []tasks.Task
err := s.transact(ctx, func(tx *sql.Tx) error {
inProgressWorkerTasks, err := tx.QueryContext(ctx, workerTasksByStatusSQL, worker, tasks.InProgress.Int())
if err != nil {
return err
}

for inProgressWorkerTasks.Next() {
var uuid, serialized string
err := inProgressWorkerTasks.Scan(&uuid, &serialized)
if err != nil {
return err
}

tp := tasks.Type.Task.NewBuilder()
if err = dagjson.Decoder(tp, bytes.NewBufferString(serialized)); err != nil {
return err
}
task := tp.Build().(tasks.Task)

updatedTask := task.Reset()

lnk, data, err := serializeToJSON(ctx, updatedTask.Representation())
if err != nil {
return err
}

// save the update back to DB
_, err = tx.ExecContext(ctx, unassignTaskSQL, uuid, data, lnk.String())
if err != nil {
return err
}

// reset the task in the task status ledger
_, err = tx.ExecContext(ctx, upsertTaskStatusSQL, uuid, updatedTask.Status.Int(), updatedTask.Stage.String(), 0, time.Now())
if err != nil {
return err
}
resetTasks = append(resetTasks, updatedTask)
}

return nil
})

if err != nil {
return err
}

if s.recorder != nil && len(resetTasks) > 0 {
for _, resetTask := range resetTasks {
if err = s.recorder.ObserveTask(resetTask); err != nil {
return err
}
}
}
return nil
}
11 changes: 11 additions & 0 deletions controller/state/statedb_dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,15 @@ const (
queryHeadSQL = `
SELECT cid FROM record_updates WHERE status = $1 AND worked_by = $2
`

workerTasksByStatusSQL = `
SELECT tasks.uuid, tasks.data FROM tasks
INNER JOIN task_status_ledger ON tasks.uuid=task_status_ledger.uuid
WHERE tasks.worked_by = $1 AND task_status_ledger.status = $2
`

unassignTaskSQL = `
UPDATE tasks SET data = $2, worked_by = NULL, cid = $3
WHERE uuid = $1
`
)
Loading

0 comments on commit 637aeb2

Please sign in to comment.