Skip to content

Commit

Permalink
Delete tasks endpoint (#260)
Browse files Browse the repository at this point in the history
add delete method

add delete method to state. Unfortunately limitations of SQLite forced a division of migrations to
alter a constraint.

Remove SQLite support
  • Loading branch information
hannahhoward authored Jul 1, 2021
1 parent b98feab commit 564a5fd
Show file tree
Hide file tree
Showing 22 changed files with 983 additions and 248 deletions.
20 changes: 18 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ orbs:
go: gotest/tools@0.0.13

executors:
golang:
golang-with-postgres:
docker:
- image: cimg/go:1.16
- image: circleci/postgres:9.6.2-alpine
resource_class: 2xlarge

commands:
Expand All @@ -31,8 +32,23 @@ commands:

jobs:
build-and-test:
executor: golang
executor: golang-with-postgres
environment:
PGHOST: 127.0.0.1
PGPORT: 5432
PGUSER: postgres
PGPASSWORD: ""
PGDATABASE: circle_test
PGSSLMODE: disable
steps:
- run:
name: install dockerize
command: curl -LO https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz && sudo tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz && rm dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz
environment:
DOCKERIZE_VERSION: v0.3.0
- run:
name: wait for database
command: dockerize -wait tcp://127.0.0.1:5432 -timeout 1m
- checkout-dealbot
- install-lotus
- test-integration-dealbot
Expand Down
24 changes: 24 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@ jobs:

build:
runs-on: ubuntu-latest
services:
# Label used to access the service container
postgres:
# Docker Hub image
image: postgres
# Provide the password for postgres
env:
POSTGRES_PASSWORD: postgres
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
# Maps tcp port 5432 on service container to the host
- 5432:5432
steps:
- uses: actions/checkout@v2

Expand All @@ -23,3 +40,10 @@ jobs:

- name: Test
run: go test -v ./...
env:
PGHOST: 127.0.0.1
PGPORT: 5432
PGUSER: postgres
PGPASSWORD: "postgres"
PGDATABASE: postgres
PGSSLMODE: disable
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ Dealbot Controller
| graphql | `DEALBOT_GRAPHQL_LISTEN` | exposed `host:port` for external public graphql queries |
| metrics | `DEALBOT_METRICS` | either `prometheus` to expose a `/metrics` api, or `log` to write metrics to stdout |
| identity | `DEALBOT_IDENTITY_KEYPAIR` | filepath of a libp2p identity to sign public records of dealbot activity |
| driver | `DEALBOT_PERSISTENCE_DRIVER` | `sqlite` or `postgres` |
| dbloc | `DEALBOT_PERSISTENCE_CONN` | the file (for sqlite) or db conn string from postgres |
| driver | `DEALBOT_PERSISTENCE_DRIVER` | `postgres` |
| dbloc | `DEALBOT_PERSISTENCE_CONN` | db conn string from postgres |
| gqlAccessToken | `DEALBOT_GRAPHQL_ACCESS_TOKEN` | a static key for querying non-public data from the graphql server |

Dealbot Daemon
Expand Down
2 changes: 1 addition & 1 deletion commands/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ var ControllerFlags = []cli.Flag{
Name: "driver",
Usage: "type of database backend to use",
EnvVars: []string{"DEALBOT_PERSISTENCE_DRIVER"},
Value: "sqlite",
Value: "postgres",
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: "dbloc",
Expand Down
18 changes: 18 additions & 0 deletions controller/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,24 @@ func (c *Client) CreateRetrievalTask(ctx context.Context, retrievalTask tasks.Re
return tp.Build().(tasks.Task), nil
}

func (c *Client) DeleteTask(ctx context.Context, uuid string) error {
resp, err := c.request(ctx, "DELETE", "/tasks/"+uuid, nil)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusNoContent {
return nil
}

if resp.StatusCode != http.StatusOK {
return ErrRequestFailed{resp.StatusCode}
}

return nil
}

func (c *Client) request(ctx context.Context, method string, path string, body io.Reader, headers ...string) (*http.Response, error) {
if len(headers)%2 != 0 {
return nil, fmt.Errorf("headers must be tuples: key1, value1, key2, value2")
Expand Down
11 changes: 10 additions & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,15 @@ func New(ctx *cli.Context) (*Controller, error) {
}
}

backend, err := state.NewStateDB(ctx.Context, ctx.String("driver"), ctx.String("dbloc"), ctx.String("datapointlog"), key, recorder)
connector, err := state.NewDBConnector(ctx.String("driver"), ctx.String("dbloc"))
if err != nil {
return nil, err
}
migrator, err := state.NewMigrator(ctx.String("driver"))
if err != nil {
return nil, err
}
backend, err := state.NewStateDB(ctx.Context, connector, migrator, ctx.String("datapointlog"), key, recorder)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -151,6 +159,7 @@ func NewWithDependencies(ctx *cli.Context, listener, graphqlListener net.Listene
r.HandleFunc("/status", srv.reportStatusHandler).Methods("POST")
r.HandleFunc("/tasks/{uuid}", srv.updateTaskHandler).Methods("PATCH")
r.HandleFunc("/tasks/{uuid}", srv.getTaskHandler).Methods("GET")
r.HandleFunc("/tasks/{uuid}", srv.deleteTaskHandler).Methods("DELETE")
r.HandleFunc("/car", srv.carHandler).Methods("GET")
r.HandleFunc("/health", srv.healthHandler).Methods("GET")
r.HandleFunc("/cred.js", srv.authHandler).Methods("GET")
Expand Down
82 changes: 75 additions & 7 deletions controller/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/filecoin-project/dealbot/controller"
"github.com/filecoin-project/dealbot/controller/client"
"github.com/filecoin-project/dealbot/controller/state"
"github.com/filecoin-project/dealbot/controller/state/postgresdb"
"github.com/filecoin-project/dealbot/controller/state/postgresdb/temporary"
"github.com/filecoin-project/dealbot/metrics/testrecorder"
"github.com/filecoin-project/dealbot/tasks"
"github.com/ipld/go-ipld-prime/codec/dagjson"
Expand All @@ -33,6 +35,19 @@ func mustString(s string, _ error) string {

func TestControllerHTTPInterface(t *testing.T) {
ctx := context.Background()
migrator, err := state.NewMigrator("postgres")
require.NoError(t, err)
var dbConn state.DBConnector
existingPGconn := postgresdb.PostgresConfig{}.String()
dbConn = postgresdb.New(existingPGconn)
err = dbConn.Connect()
if err != nil {
tempPG, err := temporary.NewTemporaryPostgres(ctx, temporary.Params{HostPort: defaultPGPort})
require.NoError(t, err)
dbConn = tempPG
defer tempPG.Shutdown(ctx)
}

testCases := map[string]func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder){
"list and update tasks": func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder) {
require.NoError(t, populateTestTasksFromFile(ctx, jsonTestDeals, apiClient))
Expand Down Expand Up @@ -280,13 +295,67 @@ func TestControllerHTTPInterface(t *testing.T) {
require.NoError(t, err)
require.Equal(t, inProgressTask2.GetUUID(), newInProgressTask2.GetUUID())
},

"delete tasks": func(ctx context.Context, t *testing.T, apiClient *client.Client, recorder *testrecorder.TestMetricsRecorder) {

var resetWorkerTasks = `
[{"Miner":"t01000","PayloadCID":"bafk2bzacecettil4umy443e4ferok7jbxiqqseef7soa3ntelflf3zkvvndbg","CARExport":false},
{"Miner":"t01000","PayloadCID":"bafk2bzacedli6qxp43sf54feczjd26jgeyfxv4ucwylujd3xo5s6cohcqbg36","CARExport":false,"Schedule":"0 0 * * *","ScheduleLimit":"168h"},
{"Miner":"f0127896","PayloadCID":"bafykbzacedikkmeotawrxqquthryw3cijaonobygdp7fb5bujhuos6wdkwomm","CARExport":false}]
`

err := populateTestTasks(ctx, bytes.NewReader([]byte(resetWorkerTasks)), apiClient)
require.NoError(t, err)

worker := "testworker"
// pop a task
req := tasks.Type.PopTask.Of(worker, tasks.InProgress)
inProgressTask1, err := apiClient.PopTask(ctx, req)

allTasks, err := apiClient.ListTasks(ctx)
require.NoError(t, err)

var unassignedTask tasks.Task
for _, task := range allTasks {
if task.Status == *tasks.Available && !task.HasSchedule() {
unassignedTask = task
break
}
}
require.NotNil(t, unassignedTask)

var scheduledTask tasks.Task
for _, task := range allTasks {
if task.HasSchedule() {
scheduledTask = task
break
}
}
require.NotNil(t, scheduledTask)

err = apiClient.DeleteTask(ctx, unassignedTask.GetUUID())
require.NoError(t, err)
task, err := apiClient.GetTask(ctx, unassignedTask.GetUUID())
require.EqualError(t, err, client.ErrRequestFailed{http.StatusNotFound}.Error())
require.Nil(t, task)
err = apiClient.DeleteTask(ctx, scheduledTask.GetUUID())
require.NoError(t, err)
task, err = apiClient.GetTask(ctx, scheduledTask.GetUUID())
require.EqualError(t, err, client.ErrRequestFailed{http.StatusNotFound}.Error())
require.Nil(t, task)
err = apiClient.DeleteTask(ctx, inProgressTask1.GetUUID())
require.EqualError(t, err, client.ErrRequestFailed{http.StatusBadRequest}.Error())
err = apiClient.DeleteTask(ctx, "apples and oranges")
require.EqualError(t, err, client.ErrRequestFailed{http.StatusNotFound}.Error())
},
}

for testCase, run := range testCases {
t.Run(testCase, func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
h := newHarness(ctx, t)
state.WipeAndReset(dbConn, migrator)
h := newHarness(ctx, t, dbConn, migrator)
run(ctx, t, h.apiClient, h.recorder)

h.Shutdown(t)
Expand All @@ -304,7 +373,9 @@ type harness struct {
serveErr chan error
}

func newHarness(ctx context.Context, t *testing.T) *harness {
const defaultPGPort = 5434

func newHarness(ctx context.Context, t *testing.T, connector state.DBConnector, migrator state.Migrator) *harness {
h := &harness{ctx: ctx}
h.recorder = testrecorder.NewTestMetricsRecorder()
listener, err := net.Listen("tcp", "localhost:0")
Expand All @@ -314,9 +385,9 @@ func newHarness(ctx context.Context, t *testing.T) *harness {
h.port = p
h.apiClient = client.NewFromEndpoint("http://localhost:" + p)
pr, _, _ := crypto.GenerateKeyPair(crypto.Ed25519, 0)
h.dbloc, err = ioutil.TempDir("", "dealbot_test_*")
require.NoError(t, err)
be, err := state.NewStateDB(ctx, "sqlite", h.dbloc+"/tmp.sqlite", "", pr, h.recorder)

be, err := state.NewStateDB(ctx, connector, migrator, "", pr, h.recorder)
require.NoError(t, err)
cc := cli.NewContext(cli.NewApp(), &flag.FlagSet{}, nil)
h.controller, err = controller.NewWithDependencies(cc, listener, nil, h.recorder, be)
Expand Down Expand Up @@ -382,7 +453,4 @@ func (h *harness) Shutdown(t *testing.T) {
case err = <-h.serveErr:
require.EqualError(t, err, http.ErrServerClosed.Error())
}
if _, err := os.Stat(h.dbloc); !os.IsNotExist(err) {
os.RemoveAll(h.dbloc)
}
}
54 changes: 53 additions & 1 deletion controller/state/dbconnector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package state

import "database/sql"
import (
"context"
"database/sql"
"fmt"
"time"

"github.com/filecoin-project/dealbot/controller/state/postgresdb"
)

// DBConnector provides an interface for working with the underlying DB implementations
type DBConnector interface {
Expand All @@ -9,3 +16,48 @@ type DBConnector interface {
RetryableError(error) bool
SqlDB() *sql.DB
}

func NewDBConnector(driver string, conn string) (DBConnector, error) {
switch driver {
case "postgres":
if conn == "" {
conn = postgresdb.PostgresConfig{}.String()
}
return postgresdb.New(conn), nil
default:
return nil, fmt.Errorf("database driver %q is not supported", driver)
}
}

func dropAllRecords(tx *sql.Tx) error {
_, err := tx.Exec(dropAllRecordsSQL)
return err
}

func WipeAndReset(dbConn DBConnector, migrator Migrator) error {
// Check connection and reconnect if down
err := dbConn.Connect()
if err != nil {
return err
}
var start time.Time

err = migrator(dbConn.SqlDB())
if err != nil {
return err
}
for {
err = withTransaction(context.Background(), dbConn.SqlDB(), dropAllRecords)
if err != nil {
if dbConn.RetryableError(err) && (start.IsZero() || time.Since(start) < maxRetryTime) {
if start.IsZero() {
start = time.Now()
}
log.Warnw("retrying transaction after error", "err", err)
continue
}
return err
}
return nil
}
}
1 change: 1 addition & 0 deletions controller/state/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type State interface {
DrainWorker(ctx context.Context, worker string) error
PublishRecordsFrom(ctx context.Context, worker string) error
ResetWorkerTasks(ctx context.Context, worker string) error
Delete(ctx context.Context, uuid string) error
Store(ctx context.Context) Store
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
BEGIN;

ALTER TABLE task_status_ledger DROP
CONSTRAINT fk_status_ledger_uuid;
ALTER TABLE task_status_ledger ADD
CONSTRAINT fk_status_ledger_uuid FOREIGN KEY (uuid) REFERENCES tasks(uuid);

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
BEGIN;

ALTER TABLE task_status_ledger DROP
CONSTRAINT fk_status_ledger_uuid;
ALTER TABLE task_status_ledger ADD
CONSTRAINT fk_status_ledger_uuid FOREIGN KEY (uuid) REFERENCES tasks(uuid) ON DELETE CASCADE;

COMMIT;
Loading

0 comments on commit 564a5fd

Please sign in to comment.