diff --git a/controller/client/client.go b/controller/client/client.go index 821a5b2f..895f799d 100644 --- a/controller/client/client.go +++ b/controller/client/client.go @@ -171,6 +171,19 @@ func (c *Client) Drain(ctx context.Context, worker string) error { return nil } +func (c *Client) ResetWorker(ctx context.Context, worker string) error { + resp, err := c.request(ctx, "POST", "/reset-worker/"+worker, nil) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return ErrRequestFailed{resp.StatusCode} + } + return nil +} + func (c *Client) Complete(ctx context.Context, worker string) error { resp, err := c.request(ctx, "POST", "/complete/"+worker, nil) if err != nil { diff --git a/controller/controller.go b/controller/controller.go index e8ce224b..4db3b69a 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -140,6 +140,7 @@ func NewWithDependencies(listener, graphqlListener net.Listener, gqlToken string return nil, err } r.HandleFunc("/drain/{workedby}", srv.drainHandler).Methods("POST") + r.HandleFunc("/reset-worker/{workedby}", srv.resetWorkerHandler).Methods("POST") r.HandleFunc("/complete/{workedby}", srv.completeHandler).Methods("POST") r.HandleFunc("/pop-task", srv.popTaskHandler).Methods("POST") r.HandleFunc("/tasks", srv.getTasksHandler).Methods("GET") diff --git a/controller/http_test.go b/controller/http_test.go index 763558b8..208514e3 100644 --- a/controller/http_test.go +++ b/controller/http_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "net" "net/http" @@ -32,6 +33,8 @@ func TestControllerHTTPInterface(t *testing.T) { ctx := context.Background() testCases := map[string]func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder){ "list and update tasks": func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder) { + require.NoError(t, populateTestTasksFromFile(ctx, jsonTestDeals, apiClient)) + currentTasks, err := apiClient.ListTasks(ctx) require.NoError(t, err) require.Len(t, currentTasks, 4) @@ -83,6 +86,8 @@ func TestControllerHTTPInterface(t *testing.T) { recorder.AssertExactObservedStatuses(t, mustString(currentTasks[1].UUID.AsString()), tasks.Successful) }, "pop a task": func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder) { + require.NoError(t, populateTestTasksFromFile(ctx, jsonTestDeals, apiClient)) + updatedTask, err := apiClient.PopTask(ctx, tasks.Type.PopTask.Of("dealbot 1", tasks.InProgress)) require.NoError(t, err) require.Equal(t, *tasks.InProgress, updatedTask.Status) @@ -101,6 +106,8 @@ func TestControllerHTTPInterface(t *testing.T) { } }, "creating tasks": func(ctx context.Context, t *testing.T, apiClient *client.Client, _ *testrecorder.TestMetricsRecorder) { + require.NoError(t, populateTestTasksFromFile(ctx, jsonTestDeals, apiClient)) + newStorageTask := tasks.Type.StorageTask.Of("t01000", 100000000000000000, // 0.10 FIL 2048, // 1kb @@ -135,6 +142,8 @@ func TestControllerHTTPInterface(t *testing.T) { require.Len(t, currentTasks, 6) }, "export finished tasks": func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder) { + require.NoError(t, populateTestTasksFromFile(ctx, jsonTestDeals, apiClient)) + // dealbot1 takes a task. task, err := apiClient.PopTask(ctx, tasks.Type.PopTask.Of("dealbot1", tasks.InProgress)) require.NoError(t, err) @@ -157,6 +166,118 @@ func TestControllerHTTPInterface(t *testing.T) { _, err = carContents.Next() require.NoError(t, err) }, + "reset worker tasks": func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder) { + + var resetWorkerTasks = ` + [{"Miner":"t01000","PayloadCID":"bafk2bzacedli6qxp43sf54feczjd26jgeyfxv4ucwylujd3xo5s6cohcqbg36","CARExport":false}, + {"Miner":"t01000","PayloadCID":"bafk2bzacecettil4umy443e4ferok7jbxiqqseef7soa3ntelflf3zkvvndbg","CARExport":false}, + {"Miner":"f0127896","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false}, + {"Miner":"f0127897","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false}, + {"Miner":"f0127898","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false}, + {"Miner":"f0127899","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false}, + {"Miner":"f0127900","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false}] + ` + require.NoError(t, populateTestTasks(ctx, bytes.NewReader([]byte(resetWorkerTasks)), apiClient)) + + worker := fmt.Sprintf("tester") + otherWorker := fmt.Sprintf("other-worker") + + // pop two tasks and leave them in progress + req := tasks.Type.PopTask.Of(worker, tasks.InProgress) + inProgressTask1, err := apiClient.PopTask(ctx, req) + require.NoError(t, err) + inProgressTask2, err := apiClient.PopTask(ctx, req) + require.NoError(t, err) + + // add some stage logs to the second task + stage1 := tasks.Type.StageDetails.Of("Doing Stuff", "A good long while").WithLog("stuff happened") + stage2 := tasks.Type.StageDetails.Of("Doing More Stuff", "A good long while").WithLog("more stuff happened") + + _, err = apiClient.UpdateTask(ctx, inProgressTask2.GetUUID(), + tasks.Type.UpdateTask.OfStage(inProgressTask2.WorkedBy.Must().String(), tasks.InProgress, "", "Stage1", stage1, 1)) + require.NoError(t, err) + _, err = apiClient.UpdateTask(ctx, inProgressTask2.GetUUID(), + tasks.Type.UpdateTask.OfStage(inProgressTask2.WorkedBy.Must().String(), tasks.InProgress, "", "Stage2", stage2, 1)) + require.NoError(t, err) + + // pop a task and set it failed + req = tasks.Type.PopTask.Of(worker, tasks.Failed) + failedTask, err := apiClient.PopTask(ctx, req) + require.NoError(t, err) + + // pop a task and set it successful + req = tasks.Type.PopTask.Of(worker, tasks.Successful) + successfulTask, err := apiClient.PopTask(ctx, req) + require.NoError(t, err) + + // pop two tasks to the other worker and leave them in progress + req = tasks.Type.PopTask.Of(otherWorker, tasks.InProgress) + otherWorkerTask1, err := apiClient.PopTask(ctx, req) + require.NoError(t, err) + otherWorkerTask2, err := apiClient.PopTask(ctx, req) + require.NoError(t, err) + + allTasks, err := apiClient.ListTasks(ctx) + require.NoError(t, err) + + // find the remaining unassigned task + var unassignedTask tasks.Task + for _, task := range allTasks { + if task.Status == *tasks.Available { + unassignedTask = task + break + } + } + require.NotNil(t, unassignedTask) + + apiClient.ResetWorker(ctx, worker) + + // in progress task should now be available and unassigned + inProgressTask1, err = apiClient.GetTask(ctx, inProgressTask1.GetUUID()) + require.Equal(t, *tasks.Available, inProgressTask1.Status) + require.Equal(t, "", inProgressTask1.WorkedBy.Must().String()) + + // in progress task should now be available and unassigned, + // and stage logs should be wiped + inProgressTask2, err = apiClient.GetTask(ctx, inProgressTask2.GetUUID()) + require.Equal(t, *tasks.Available, inProgressTask2.Status) + require.Equal(t, "", inProgressTask2.WorkedBy.Must().String()) + require.Equal(t, "", inProgressTask2.Stage.String()) + require.False(t, inProgressTask2.CurrentStageDetails.Exists()) + require.False(t, inProgressTask2.PastStageDetails.Exists()) + + // successful and failed records should not change + successfulTask, err = apiClient.GetTask(ctx, successfulTask.GetUUID()) + require.Equal(t, *tasks.Successful, successfulTask.Status) + require.Equal(t, worker, successfulTask.WorkedBy.Must().String()) + failedTask, err = apiClient.GetTask(ctx, failedTask.GetUUID()) + require.Equal(t, *tasks.Failed, failedTask.Status) + require.Equal(t, worker, failedTask.WorkedBy.Must().String()) + + // tasks for other worker should not change + otherWorkerTask1, err = apiClient.GetTask(ctx, otherWorkerTask1.GetUUID()) + require.Equal(t, *tasks.InProgress, otherWorkerTask1.Status) + require.Equal(t, otherWorker, otherWorkerTask1.WorkedBy.Must().String()) + otherWorkerTask2, err = apiClient.GetTask(ctx, otherWorkerTask2.GetUUID()) + require.Equal(t, *tasks.InProgress, otherWorkerTask2.Status) + require.Equal(t, otherWorker, otherWorkerTask2.WorkedBy.Must().String()) + + // unassigned task should not chang + unassignedTask, err = apiClient.GetTask(ctx, unassignedTask.GetUUID()) + require.Equal(t, *tasks.Available, unassignedTask.Status) + require.Equal(t, "", unassignedTask.WorkedBy.Must().String()) + + // try assigning a task -- should reassign first newly available task + req = tasks.Type.PopTask.Of(otherWorker, tasks.InProgress) + newInProgressTask1, err := apiClient.PopTask(ctx, req) + require.NoError(t, err) + require.Equal(t, inProgressTask1.GetUUID(), newInProgressTask1.GetUUID()) + + req = tasks.Type.PopTask.Of(worker, tasks.InProgress) + newInProgressTask2, err := apiClient.PopTask(ctx, req) + require.NoError(t, err) + require.Equal(t, inProgressTask2.GetUUID(), newInProgressTask2.GetUUID()) + }, } for testCase, run := range testCases { @@ -206,18 +327,20 @@ func newHarness(ctx context.Context, t *testing.T) *harness { } }() - // populate test tasks - require.NoError(t, populateTestTasks(ctx, jsonTestDeals, h.apiClient)) - return h } -func populateTestTasks(ctx context.Context, jsonTests string, apiClient *client.Client) error { +func populateTestTasksFromFile(ctx context.Context, jsonTests string, apiClient *client.Client) error { sampleTaskFile, err := os.Open(jsonTestDeals) if err != nil { return err } defer sampleTaskFile.Close() + return populateTestTasks(ctx, sampleTaskFile, apiClient) +} + +func populateTestTasks(ctx context.Context, sampleTaskFile io.Reader, apiClient *client.Client) error { + sampleTasks, err := ioutil.ReadAll(sampleTaskFile) if err != nil { return err diff --git a/controller/state/interface.go b/controller/state/interface.go index b1699690..881886db 100644 --- a/controller/state/interface.go +++ b/controller/state/interface.go @@ -19,6 +19,7 @@ type State interface { NewRetrievalTask(ctx context.Context, retrievalTask tasks.RetrievalTask) (tasks.Task, error) DrainWorker(ctx context.Context, worker string) error PublishRecordsFrom(ctx context.Context, worker string) error + ResetWorkerTasks(ctx context.Context, worker string) error Store(ctx context.Context) Store } diff --git a/controller/state/statedb.go b/controller/state/statedb.go index 460bbfca..07bfa948 100644 --- a/controller/state/statedb.go +++ b/controller/state/statedb.go @@ -803,3 +803,63 @@ func (s *stateDB) DrainWorker(ctx context.Context, worker string) error { } return nil } + +// ResetWorkerTasks finds all in progress tasks for a worker and resets them to as if they had never been run +func (s *stateDB) ResetWorkerTasks(ctx context.Context, worker string) error { + var resetTasks []tasks.Task + err := s.transact(ctx, func(tx *sql.Tx) error { + inProgressWorkerTasks, err := tx.QueryContext(ctx, workerTasksByStatusSQL, worker, tasks.InProgress.Int()) + if err != nil { + return err + } + + for inProgressWorkerTasks.Next() { + var uuid, serialized string + err := inProgressWorkerTasks.Scan(&uuid, &serialized) + if err != nil { + return err + } + + tp := tasks.Type.Task.NewBuilder() + if err = dagjson.Decoder(tp, bytes.NewBufferString(serialized)); err != nil { + return err + } + task := tp.Build().(tasks.Task) + + updatedTask := task.Reset() + + lnk, data, err := serializeToJSON(ctx, updatedTask.Representation()) + if err != nil { + return err + } + + // save the update back to DB + _, err = tx.ExecContext(ctx, unassignTaskSQL, uuid, data, lnk.String()) + if err != nil { + return err + } + + // reset the task in the task status ledger + _, err = tx.ExecContext(ctx, upsertTaskStatusSQL, uuid, updatedTask.Status.Int(), updatedTask.Stage.String(), 0, time.Now()) + if err != nil { + return err + } + resetTasks = append(resetTasks, updatedTask) + } + + return nil + }) + + if err != nil { + return err + } + + if s.recorder != nil && len(resetTasks) > 0 { + for _, resetTask := range resetTasks { + if err = s.recorder.ObserveTask(resetTask); err != nil { + return err + } + } + } + return nil +} diff --git a/controller/state/statedb_dml.go b/controller/state/statedb_dml.go index 53337291..a84c6dd6 100644 --- a/controller/state/statedb_dml.go +++ b/controller/state/statedb_dml.go @@ -97,4 +97,15 @@ const ( queryHeadSQL = ` SELECT cid FROM record_updates WHERE status = $1 AND worked_by = $2 ` + + workerTasksByStatusSQL = ` + SELECT tasks.uuid, tasks.data FROM tasks + INNER JOIN task_status_ledger ON tasks.uuid=task_status_ledger.uuid + WHERE tasks.worked_by = $1 AND task_status_ledger.status = $2 +` + + unassignTaskSQL = ` + UPDATE tasks SET data = $2, worked_by = NULL, cid = $3 + WHERE uuid = $1 + ` ) diff --git a/controller/state/statedb_test.go b/controller/state/statedb_test.go index b2aafa7a..7532ea4c 100644 --- a/controller/state/statedb_test.go +++ b/controller/state/statedb_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -68,7 +69,7 @@ func TestAssignTask(t *testing.T) { state, ok := stateInterface.(*stateDB) require.True(t, ok, "returned wrong type") - err = populateTestTasks(ctx, jsonTestDeals, stateInterface) + err = populateTestTasksFromFile(ctx, jsonTestDeals, stateInterface) require.NoError(t, err) rt := tasks.Type.RetrievalTask.Of("t01000", "bafk2bzacedli6qxp43sf54feczjd26jgeyfxv4ucwylujd3xo5s6cohcqbg36", false, "") @@ -131,7 +132,7 @@ func TestAssignConcurrentTask(t *testing.T) { state, ok := stateInterface.(*stateDB) require.True(t, ok, "returned wrong type") - err = populateTestTasks(ctx, jsonTestDeals, stateInterface) + err = populateTestTasksFromFile(ctx, jsonTestDeals, stateInterface) require.NoError(t, err) taskCount, err := state.countTasks(ctx) @@ -258,7 +259,7 @@ func TestUpdateTasks(t *testing.T) { state, ok := stateInterface.(*stateDB) require.True(t, ok, "returned wrong type") - err = populateTestTasks(ctx, jsonTestDeals, stateInterface) + err = populateTestTasksFromFile(ctx, jsonTestDeals, stateInterface) require.NoError(t, err) taskCount, err := state.countTasks(ctx) @@ -421,13 +422,148 @@ func TestUpdateTasks(t *testing.T) { } } -func populateTestTasks(ctx context.Context, jsonTests string, state State) error { +func TestResetWorkerTasks(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tmpDir, err := ioutil.TempDir("", "testdealbot") + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + + key, err := makeKey() + require.NoError(t, err) + + stateInterface, err := NewStateDB(ctx, "sqlite", filepath.Join(tmpDir, "teststate.db"), key, nil) + require.NoError(t, err) + state, ok := stateInterface.(*stateDB) + require.True(t, ok, "returned wrong type") + + var resetWorkerTasks = ` +[{"Miner":"t01000","PayloadCID":"bafk2bzacedli6qxp43sf54feczjd26jgeyfxv4ucwylujd3xo5s6cohcqbg36","CARExport":false}, +{"Miner":"t01000","PayloadCID":"bafk2bzacecettil4umy443e4ferok7jbxiqqseef7soa3ntelflf3zkvvndbg","CARExport":false}, +{"Miner":"f0127896","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false}, +{"Miner":"f0127897","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false}, +{"Miner":"f0127898","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false}, +{"Miner":"f0127899","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false}, +{"Miner":"f0127900","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false}] +` + err = populateTestTasks(ctx, bytes.NewReader([]byte(resetWorkerTasks)), stateInterface) + require.NoError(t, err) + + worker := fmt.Sprintf("tester") + otherWorker := fmt.Sprintf("other-worker") + + // pop two tasks and leave them in progress + req := tasks.Type.PopTask.Of(worker, tasks.InProgress) + inProgressTask1, err := state.AssignTask(ctx, req) + require.NoError(t, err) + inProgressTask2, err := state.AssignTask(ctx, req) + require.NoError(t, err) + + // add some stage logs to the second task + stage1 := tasks.Type.StageDetails.Of("Doing Stuff", "A good long while").WithLog("stuff happened") + stage2 := tasks.Type.StageDetails.Of("Doing More Stuff", "A good long while").WithLog("more stuff happened") + + _, err = state.Update(ctx, inProgressTask2.GetUUID(), + tasks.Type.UpdateTask.OfStage(inProgressTask2.WorkedBy.Must().String(), tasks.InProgress, "", "Stage1", stage1, 1)) + require.NoError(t, err) + _, err = state.Update(ctx, inProgressTask2.GetUUID(), + tasks.Type.UpdateTask.OfStage(inProgressTask2.WorkedBy.Must().String(), tasks.InProgress, "", "Stage2", stage2, 1)) + require.NoError(t, err) + + // pop a task and set it failed + req = tasks.Type.PopTask.Of(worker, tasks.Failed) + failedTask, err := state.AssignTask(ctx, req) + require.NoError(t, err) + + // pop a task and set it successful + req = tasks.Type.PopTask.Of(worker, tasks.Successful) + successfulTask, err := state.AssignTask(ctx, req) + require.NoError(t, err) + + // pop two tasks to the other worker and leave them in progress + req = tasks.Type.PopTask.Of(otherWorker, tasks.InProgress) + otherWorkerTask1, err := state.AssignTask(ctx, req) + require.NoError(t, err) + otherWorkerTask2, err := state.AssignTask(ctx, req) + require.NoError(t, err) + + allTasks, err := state.GetAll(ctx) + require.NoError(t, err) + + // find the remaining unassigned task + var unassignedTask tasks.Task + for _, task := range allTasks { + if task.Status == *tasks.Available { + unassignedTask = task + break + } + } + require.NotNil(t, unassignedTask) + + state.ResetWorkerTasks(ctx, worker) + + // in progress task should now be available and unassigned + inProgressTask1, err = state.Get(ctx, inProgressTask1.GetUUID()) + require.Equal(t, *tasks.Available, inProgressTask1.Status) + require.Equal(t, "", inProgressTask1.WorkedBy.Must().String()) + + // in progress task should now be available and unassigned, + // and stage logs should be wiped + inProgressTask2, err = state.Get(ctx, inProgressTask2.GetUUID()) + require.Equal(t, *tasks.Available, inProgressTask2.Status) + require.Equal(t, "", inProgressTask2.WorkedBy.Must().String()) + require.Equal(t, "", inProgressTask2.Stage.String()) + require.False(t, inProgressTask2.CurrentStageDetails.Exists()) + require.False(t, inProgressTask2.PastStageDetails.Exists()) + + // successful and failed records should not change + successfulTask, err = state.Get(ctx, successfulTask.GetUUID()) + require.Equal(t, *tasks.Successful, successfulTask.Status) + require.Equal(t, worker, successfulTask.WorkedBy.Must().String()) + failedTask, err = state.Get(ctx, failedTask.GetUUID()) + require.Equal(t, *tasks.Failed, failedTask.Status) + require.Equal(t, worker, failedTask.WorkedBy.Must().String()) + + // tasks for other worker should not change + otherWorkerTask1, err = state.Get(ctx, otherWorkerTask1.GetUUID()) + require.Equal(t, *tasks.InProgress, otherWorkerTask1.Status) + require.Equal(t, otherWorker, otherWorkerTask1.WorkedBy.Must().String()) + otherWorkerTask2, err = state.Get(ctx, otherWorkerTask2.GetUUID()) + require.Equal(t, *tasks.InProgress, otherWorkerTask2.Status) + require.Equal(t, otherWorker, otherWorkerTask2.WorkedBy.Must().String()) + + // unassigned task should not chang + unassignedTask, err = state.Get(ctx, unassignedTask.GetUUID()) + require.Equal(t, *tasks.Available, unassignedTask.Status) + require.Equal(t, "", unassignedTask.WorkedBy.Must().String()) + + // try assigning a task -- should reassign first newly available task + req = tasks.Type.PopTask.Of(otherWorker, tasks.InProgress) + newInProgressTask1, err := state.AssignTask(ctx, req) + require.NoError(t, err) + require.Equal(t, inProgressTask1.GetUUID(), newInProgressTask1.GetUUID()) + + req = tasks.Type.PopTask.Of(worker, tasks.InProgress) + newInProgressTask2, err := state.AssignTask(ctx, req) + require.NoError(t, err) + require.Equal(t, inProgressTask2.GetUUID(), newInProgressTask2.GetUUID()) + +} + +func populateTestTasksFromFile(ctx context.Context, jsonTests string, state State) error { sampleTaskFile, err := os.Open(jsonTestDeals) if err != nil { return err } defer sampleTaskFile.Close() - sampleTasks, err := ioutil.ReadAll(sampleTaskFile) + return populateTestTasks(ctx, sampleTaskFile, state) +} + +func populateTestTasks(ctx context.Context, stream io.Reader, state State) error { + sampleTasks, err := ioutil.ReadAll(stream) if err != nil { return err } diff --git a/controller/tasks.go b/controller/tasks.go index 08bb7837..be50891f 100644 --- a/controller/tasks.go +++ b/controller/tasks.go @@ -105,6 +105,27 @@ func (c *Controller) drainHandler(w http.ResponseWriter, r *http.Request) { w.Write([]byte("\"OK\"")) } +func (c *Controller) resetWorkerHandler(w http.ResponseWriter, r *http.Request) { + logger := log.With("req_id", r.Header.Get("X-Request-ID")) + + logger.Debugw("handle request", "command", "resetWorker") + defer logger.Debugw("request handled", "command", "resetWorker") + + enableCors(&w, r) + vars := mux.Vars(r) + workedBy := vars["workedby"] + + err := c.db.ResetWorkerTasks(r.Context(), workedBy) + if err != nil { + log.Errorw("reset worker DB error", "err", err.Error()) + w.WriteHeader(http.StatusBadRequest) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte("\"OK\"")) +} + func (c *Controller) completeHandler(w http.ResponseWriter, r *http.Request) { logger := log.With("req_id", r.Header.Get("X-Request-ID")) diff --git a/engine/engine.go b/engine/engine.go index 791af38b..965e42c5 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -71,6 +71,15 @@ func New(ctx context.Context, cliCtx *cli.Context) (*Engine, error) { log.Infof("remote version: %s", v.Version) + // before we do anything, reset all this workers tasks + err = client.ResetWorker(cliCtx.Context, host_id) + if err != nil { + // for now, just log an error if this happens... seems like there are scenarios + // where we want to get the dealbot up and running even though reset worker failed for + // whatever eason + log.Errorf("error resetting tasks for worker: %s", err) + } + e := &Engine{ client: client, nodeConfig: nodeConfig, diff --git a/tasks/model.go b/tasks/model.go index 917052de..852c25e7 100644 --- a/tasks/model.go +++ b/tasks/model.go @@ -238,6 +238,20 @@ func (tp *_Task__Prototype) New(r RetrievalTask, s StorageTask) Task { return &t } +func (t *_Task) Reset() Task { + newTask := _Task{ + UUID: _String{t.UUID.x}, + Status: *Available, + WorkedBy: _String__Maybe{m: schema.Maybe_Value, v: &_String{""}}, + Stage: _String{""}, + CurrentStageDetails: _StageDetails__Maybe{m: schema.Maybe_Absent}, + StartedAt: _Time__Maybe{m: schema.Maybe_Absent}, + RetrievalTask: t.RetrievalTask, + StorageTask: t.StorageTask, + } + return &newTask +} + func (t *_Task) Assign(worker string, status Status) Task { newTask := _Task{ UUID: t.UUID,