Skip to content

Commit

Permalink
Move task scheduler into controller (#244)
Browse files Browse the repository at this point in the history
* Move task scheduler into controller

Task scheduling works by the controller reading scheduled tasks and periodically generating non-scheduled (runnable) tasks from them.
  • Loading branch information
gammazero authored Jun 24, 2021
1 parent 9e03e50 commit 11fff0f
Show file tree
Hide file tree
Showing 10 changed files with 604 additions and 691 deletions.
169 changes: 144 additions & 25 deletions controller/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
crypto "github.com/libp2p/go-libp2p-crypto"
"github.com/multiformats/go-multicodec"
"github.com/robfig/cron/v3"

// DB interfaces
"github.com/filecoin-project/dealbot/controller/state/postgresdb"
Expand Down Expand Up @@ -83,10 +84,12 @@ func serializeToJSON(ctx context.Context, n ipld.Node) (cid.Cid, []byte, error)

// stateDB is a persisted implementation of the State interface
type stateDB struct {
dbconn DBConnector
dbconn DBConnector
cronSched *cron.Cron
crypto.PrivKey
recorder metrics.MetricsRecorder
txlock sync.Mutex
recorder metrics.MetricsRecorder
txlock sync.Mutex
runNotice chan string
}

func migratePostgres(db *sql.DB) error {
Expand Down Expand Up @@ -125,6 +128,11 @@ func migrateDatabase(dbName string, dbInstance database.Driver) error {

// NewStateDB creates a state instance with a given driver and identity
func NewStateDB(ctx context.Context, driver, conn string, identity crypto.PrivKey, recorder metrics.MetricsRecorder) (State, error) {
return newStateDBWithNotify(ctx, driver, conn, identity, recorder, nil)
}

// newStateDBWithNotify is NewStateDB with additional parameters for testing
func newStateDBWithNotify(ctx context.Context, driver, conn string, identity crypto.PrivKey, recorder metrics.MetricsRecorder, runNotice chan string) (State, error) {
var dbConn DBConnector
var migrateFunc func(*sql.DB) error

Expand Down Expand Up @@ -154,10 +162,21 @@ func NewStateDB(ctx context.Context, driver, conn string, identity crypto.PrivKe
return nil, fmt.Errorf("%s database migration failed: %w", driver, err)
}

//cronSched := cron.New(cron.WithSeconds())
cronSched := cron.New()
cronSched.Start()

st := &stateDB{
dbconn: dbConn,
PrivKey: identity,
recorder: recorder,
dbconn: dbConn,
cronSched: cronSched,
PrivKey: identity,
recorder: recorder,
runNotice: runNotice,
}

err = st.recoverScheduledTasks(ctx)
if err != nil {
return nil, err
}

return st, nil
Expand Down Expand Up @@ -222,10 +241,17 @@ func (s *stateDB) db() *sql.DB {

// Get returns a specific task identified by ID
func (s *stateDB) Get(ctx context.Context, taskID string) (tasks.Task, error) {
task, _, err := s.getWithTag(ctx, taskID)
return task, err
}

func (s *stateDB) getWithTag(ctx context.Context, taskID string) (tasks.Task, string, error) {
var task tasks.Task
var tag string
err := s.transact(ctx, func(tx *sql.Tx) error {
var serialized string
err := tx.QueryRowContext(ctx, getTaskSQL, taskID).Scan(&serialized)
var tagString sql.NullString
err := tx.QueryRowContext(ctx, getTaskWithTagSQL, taskID).Scan(&serialized, &tagString)
if err != nil {
return err
}
Expand All @@ -235,12 +261,16 @@ func (s *stateDB) Get(ctx context.Context, taskID string) (tasks.Task, error) {
return err
}
task = tp.Build().(tasks.Task)

if tagString.Valid {
tag = tagString.String
}
return nil
})
if err != nil {
return nil, err
return nil, "", err
}
return task, nil
return task, tag, nil
}

// Get returns a specific task identified by CID
Expand Down Expand Up @@ -313,6 +343,15 @@ func (s *stateDB) AssignTask(ctx context.Context, req tasks.PopTask) (tasks.Task
return nil, fmt.Errorf("cannot assign %q status to task", req.Status.String())
}

// Check if this worker is being drained
draining, err := s.drainingWorker(ctx, req.WorkedBy.String())
if err != nil {
return nil, err
}
if draining {
return nil, nil
}

var tags []interface{}
if req.Tags.Exists() {
reqTags := req.Tags.Must()
Expand All @@ -328,20 +367,66 @@ func (s *stateDB) AssignTask(ctx context.Context, req tasks.PopTask) (tasks.Task
}
}

// Keep popping tasks until there a runanable task is found or until there
// are no more tasks
var assigned tasks.Task
for {
task, scheduled, err := s.popTask(ctx, req.WorkedBy.String(), &req.Status, tags)
if err != nil {
return nil, err
}
if task == nil {
// No more tasks to pop
return nil, nil
}
if !scheduled {
// Found a task that is runable, stop looking for tasks
assigned = task
break
}
// Got a scheduled task, so schedule it.
err = s.scheduleTask(task)
if err != nil {
log.Errorw("failed to schedule task", "taskID", task.UUID.String(), "err", err)
}
}

if s.recorder != nil {
if err = s.recorder.ObserveTask(assigned); err != nil {
return nil, err
}
}
return assigned, nil
}

func (s *stateDB) drainingWorker(ctx context.Context, worker string) (bool, error) {
var draining bool
err := s.transact(ctx, func(tx *sql.Tx) error {
var cnt int
err := tx.QueryRowContext(ctx, drainedQuerySQL, req.WorkedBy.String()).Scan(&cnt)
err := tx.QueryRowContext(ctx, drainedQuerySQL, worker).Scan(&cnt)
if err != nil {
return err
}

if cnt > 0 {
// worker is being drained
return nil
draining = true
}
return nil
})
if err != nil {
return false, err
}
return draining, nil
}

func (s *stateDB) popTask(ctx context.Context, workedBy string, status tasks.Status, tags []interface{}) (tasks.Task, bool, error) {
var assigned tasks.Task
var scheduled bool

err := s.transact(ctx, func(tx *sql.Tx) error {
var taskID, serialized string
var err error
if len(tags) == 0 {
err = tx.QueryRowContext(ctx, oldestAvailableTaskSQL).Scan(&taskID, &serialized)
} else {
Expand All @@ -367,37 +452,49 @@ func (s *stateDB) AssignTask(ctx context.Context, req tasks.PopTask) (tasks.Task
}
task := tp.Build().(tasks.Task)

assigned = task.Assign(req.WorkedBy.String(), &req.Status)
if task.HasSchedule() {
// This task has a schedule, but is not owned by the scheduler.
// Normally tasks are scheduled on ingestion, before they are
// written to the DB. However, this may have been a previously
// scheduled task that became that was rescheduled or inserted into
// the DB outside of the notmal ingestion process.
log.Warnw("found scheduled task with no owner, scheduling task", taskID)

// Assign task to scheduler in DB only
_, err = tx.ExecContext(ctx, updateTaskWorkedBySQL, taskID, schedulerOwner)
if err != nil {
return fmt.Errorf("could not assign task: %w", err)
}
assigned = task
scheduled = true
return nil
}

assigned = task.Assign(workedBy, status)

lnk, data, err := serializeToJSON(ctx, assigned.Representation())
if err != nil {
return err
}

// Assign task to worker
_, err = tx.ExecContext(ctx, assignTaskSQL, taskID, data, req.WorkedBy.String(), lnk.String())
_, err = tx.ExecContext(ctx, assignTaskSQL, taskID, data, workedBy, lnk.String())
if err != nil {
return fmt.Errorf("could not assign task: %w", err)
}

// Set new status for task
_, err = tx.ExecContext(ctx, setTaskStatusSQL, taskID, req.Status.Int(), time.Now())
_, err = tx.ExecContext(ctx, setTaskStatusSQL, taskID, status.Int(), time.Now())
if err != nil {
return fmt.Errorf("could not update status task: %w", err)
}

return nil
})
if err != nil {
return nil, err
}

if s.recorder != nil && assigned != nil {
if err = s.recorder.ObserveTask(assigned); err != nil {
return nil, err
}
return nil, false, err
}
return assigned, nil
return assigned, scheduled, nil
}

func (s *stateDB) Update(ctx context.Context, taskID string, req tasks.UpdateTask) (tasks.Task, error) {
Expand Down Expand Up @@ -647,17 +744,39 @@ func (s *stateDB) saveTask(ctx context.Context, task tasks.Task, tag string) err
}
}

return s.transact(ctx, func(tx *sql.Tx) error {
taskID := task.UUID.String()
scheduledTask := task.HasSchedule()

err = s.transact(ctx, func(tx *sql.Tx) error {
now := time.Now()
if _, err := tx.ExecContext(ctx, createTaskSQL, task.UUID.String(), data, now, lnk.String(), tagCol); err != nil {
if _, err := tx.ExecContext(ctx, createTaskSQL, taskID, data, now, lnk.String(), tagCol); err != nil {
return err
}

if _, err := tx.ExecContext(ctx, setTaskStatusSQL, task.UUID.String(), tasks.Available.Int(), now); err != nil {
if _, err := tx.ExecContext(ctx, setTaskStatusSQL, taskID, tasks.Available.Int(), now); err != nil {
return err
}

if scheduledTask {
// Assign task to scheduler in DB only
if _, err = tx.ExecContext(ctx, updateTaskWorkedBySQL, taskID, schedulerOwner); err != nil {
return fmt.Errorf("could not assign task to scheduler: %w", err)
}
}

return nil
})
if err != nil {
return err
}

if scheduledTask {
// Got a scheduled task, so schedule it.
if err = s.scheduleTask(task); err != nil {
return fmt.Errorf("failed to schedule task %q: %w", taskID, err)
}
}
return nil
}

// countTasks retrieves the total number of tasks
Expand Down
18 changes: 18 additions & 0 deletions controller/state/statedb_dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,18 @@ const (
SELECT data FROM tasks
`

getAllTasksForOwnerSQL = `
SELECT data FROM tasks WHERE worked_by = $1
`

getTaskSQL = `
SELECT data FROM tasks WHERE uuid = $1
`

getTaskWithTagSQL = `
SELECT data, tag FROM tasks WHERE uuid = $1
`

getTaskByCidSQL = `
SELECT data FROM tasks WHERE cid = $1
`
Expand Down Expand Up @@ -104,8 +112,18 @@ const (
WHERE tasks.worked_by = $1 AND task_status_ledger.status = $2
`

unassignScheduledTaskSQL = `
UPDATE tasks SET worked_by = NULL
WHERE uuid = $1
`

unassignTaskSQL = `
UPDATE tasks SET data = $2, worked_by = NULL, cid = $3
WHERE uuid = $1
`

updateTaskWorkedBySQL = `
UPDATE tasks SET worked_by = $2
WHERE uuid = $1
`
)
Loading

0 comments on commit 11fff0f

Please sign in to comment.