Skip to content

Commit

Permalink
fix(state): fix several PG errors (#238)
Browse files Browse the repository at this point in the history
Add test harness for PSQL and verify all database operations work with PSQL
  • Loading branch information
hannahhoward authored Jun 22, 2021
1 parent d28614d commit fa4ac18
Show file tree
Hide file tree
Showing 6 changed files with 476 additions and 451 deletions.
4 changes: 3 additions & 1 deletion controller/state/postgresdb/postgresdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package postgresdb
import (
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -114,7 +115,8 @@ func (db *PostgresDB) RetryableError(err error) bool {
errCannotConnectNow = "57P03"
)

if e, ok := err.(*pq.Error); ok {
var e *pq.Error
if errors.As(err, &e) {
switch string(e.Code) {
case errSerializationFailure, errDeadlockDetected, errCannotConnectNow, errInsufficientResources:
time.Sleep(time.Duration(rand.Int63n(int64(serializationErrorRetryDelay + 1))))
Expand Down
9 changes: 9 additions & 0 deletions controller/state/setup_pg_cluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

CLUSTER_DIR=$1
CLUSTER_PORT=$2

"$POSTGRES_BIN"/initdb -U postgres "$CLUSTER_DIR"
mv "$CLUSTER_DIR/postgresql.conf" "$CLUSTER_DIR/postgresql.conf.orig"
sed "s/#port = 5432/port = $CLUSTER_PORT/g" "$CLUSTER_DIR/postgresql.conf.orig" > "$CLUSTER_DIR/postgresql.conf"
"$POSTGRES_BIN"/pg_ctl -D "$CLUSTER_DIR" start
36 changes: 22 additions & 14 deletions controller/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ func (s *stateDB) transact(ctx context.Context, f func(*sql.Tx) error) error {
s.txlock.Unlock()
}
if err != nil {
if s.dbconn.RetryableError(err) && time.Since(start) < maxRetryTime {
if s.dbconn.RetryableError(err) && (start.IsZero() || time.Since(start) < maxRetryTime) {
if start.IsZero() {
start = time.Now()
}
Expand All @@ -611,7 +611,9 @@ func (s *stateDB) transact(ctx context.Context, f func(*sql.Tx) error) error {

func withTransaction(ctx context.Context, db *sql.DB, f func(*sql.Tx) error) (err error) {
var tx *sql.Tx
tx, err = db.BeginTx(ctx, nil)
tx, err = db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable,
})
if err != nil {
return
}
Expand Down Expand Up @@ -724,6 +726,15 @@ func (s *stateDB) PublishRecordsFrom(ctx context.Context, worker string) error {
return err
}

var lnks []string
for itms.Next() {
var lnk string
if err := itms.Scan(&lnk); err != nil {
return err
}
lnks = append(lnks, lnk)
}

rcrds := []tasks.AuthenticatedRecord{}

updateHeadStmt, err := tx.PrepareContext(ctx, updateHeadSQL)
Expand All @@ -732,12 +743,7 @@ func (s *stateDB) PublishRecordsFrom(ctx context.Context, worker string) error {
}
defer updateHeadStmt.Close()

for itms.Next() {
var lnk string
if err := itms.Scan(&lnk); err != nil {
return err
}

for _, lnk := range lnks {
_, err = updateHeadStmt.ExecContext(ctx, ATTACHED_RECORD, lnk)
if err != nil {
return err
Expand Down Expand Up @@ -813,9 +819,10 @@ func (s *stateDB) ResetWorkerTasks(ctx context.Context, worker string) error {
return err
}

var queriedTasks []tasks.Task
for inProgressWorkerTasks.Next() {
var uuid, serialized string
err := inProgressWorkerTasks.Scan(&uuid, &serialized)
var serialized string
err := inProgressWorkerTasks.Scan(&serialized)
if err != nil {
return err
}
Expand All @@ -825,22 +832,23 @@ func (s *stateDB) ResetWorkerTasks(ctx context.Context, worker string) error {
return err
}
task := tp.Build().(tasks.Task)
queriedTasks = append(queriedTasks, task)
}

for _, task := range queriedTasks {
updatedTask := task.Reset()

lnk, data, err := serializeToJSON(ctx, updatedTask.Representation())
if err != nil {
return err
}

// save the update back to DB
_, err = tx.ExecContext(ctx, unassignTaskSQL, uuid, data, lnk.String())
_, err = tx.ExecContext(ctx, unassignTaskSQL, updatedTask.UUID.String(), data, lnk.String())
if err != nil {
return err
}

// reset the task in the task status ledger
_, err = tx.ExecContext(ctx, upsertTaskStatusSQL, uuid, updatedTask.Status.Int(), updatedTask.Stage.String(), 0, time.Now())
_, err = tx.ExecContext(ctx, upsertTaskStatusSQL, updatedTask.UUID.String(), updatedTask.Status.Int(), updatedTask.Stage.String(), 0, time.Now())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions controller/state/statedb_dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (
oldestAvailableTaskWithTagsSQL = `
SELECT uuid, data FROM tasks
WHERE worked_by IS NULL
AND tag is NULL OR tag IN (?)
AND tag is NULL OR tag = ANY($1)
ORDER BY created
LIMIT 1
`
Expand Down Expand Up @@ -99,7 +99,7 @@ const (
`

workerTasksByStatusSQL = `
SELECT tasks.uuid, tasks.data FROM tasks
SELECT tasks.data FROM tasks
INNER JOIN task_status_ledger ON tasks.uuid=task_status_ledger.uuid
WHERE tasks.worked_by = $1 AND task_status_ledger.status = $2
`
Expand Down
Loading

0 comments on commit fa4ac18

Please sign in to comment.