Skip to content

Commit

Permalink
Release tasks early (#253)
Browse files Browse the repository at this point in the history
* feat(engine): release tasks early

Setup queue so tasks can be released by worker before being totally finished. setup storage to
release when data transfer is complete

* fix(integration_tests): fix controller test
  • Loading branch information
hannahhoward authored Jun 26, 2021
1 parent 3a15ff0 commit d9f6c40
Show file tree
Hide file tree
Showing 10 changed files with 639 additions and 127 deletions.
2 changes: 1 addition & 1 deletion commands/retrieval_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func makeRetrievalDeal(cctx *cli.Context) error {

task := tasks.Type.RetrievalTask.Of(minerParam, payloadCid, carExport, "")

err = tasks.MakeRetrievalDeal(cctx.Context, nodeConfig, node, task, emptyUpdateStage, log.Infow, stageTimeouts)
err = tasks.MakeRetrievalDeal(cctx.Context, nodeConfig, node, task, emptyUpdateStage, log.Infow, stageTimeouts, func() {})
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion commands/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ func makeStorageDeal(cctx *cli.Context) error {

task := tasks.Type.StorageTask.Of(miner, int64(maxPrice), int64(size.Bytes()), int64(startOffset), fastRetrieval, verified, "")

return tasks.MakeStorageDeal(cctx.Context, nodeConfig, node, task, emptyUpdateStage, log.Infow, stageTimeouts)
return tasks.MakeStorageDeal(cctx.Context, nodeConfig, node, task, emptyUpdateStage, log.Infow, stageTimeouts, func() {})
}
254 changes: 157 additions & 97 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package engine

import (
"context"
"math"
"os/exec"
"sync"
"time"

"github.com/benbjohnson/clock"
"github.com/filecoin-project/dealbot/controller/client"
"github.com/filecoin-project/dealbot/lotus"
"github.com/filecoin-project/dealbot/tasks"
Expand All @@ -24,20 +26,44 @@ const (

var log = logging.Logger("engine")

type apiClient interface {
GetTask(ctx context.Context, uuid string) (tasks.Task, error)
UpdateTask(ctx context.Context, uuid string, r tasks.UpdateTask) (tasks.Task, error)
PopTask(ctx context.Context, r tasks.PopTask) (tasks.Task, error)
ResetWorker(ctx context.Context, worker string) error
}

type taskExecutor interface {
MakeStorageDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.StorageTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error
MakeRetrievalDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.RetrievalTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error
}

type defaultTaskExecutor struct{}

func (defaultTaskExecutor) MakeStorageDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.StorageTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error {
return tasks.MakeStorageDeal(ctx, config, node, task, updateStage, log, stageTimeouts, releaseWorker)
}

func (defaultTaskExecutor) MakeRetrievalDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.RetrievalTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error {
return tasks.MakeRetrievalDeal(ctx, config, node, task, updateStage, log, stageTimeouts, releaseWorker)
}

type Engine struct {
host string
client *client.Client

nodeConfig tasks.NodeConfig
node api.FullNode
closer lotus.NodeCloser
shutdown chan struct{}
stopped chan struct{}
tags []string
workerPing chan struct{}
cancelTasks context.CancelFunc
shutdown chan struct{}
stopped chan struct{}
queueIsEmpty chan struct{}

host string
tags []string
stageTimeouts map[string]time.Duration

// depedencies
node api.FullNode
nodeConfig tasks.NodeConfig
closer lotus.NodeCloser
client apiClient
clock clock.Clock
taskExecutor taskExecutor
}

func New(ctx context.Context, cliCtx *cli.Context) (*Engine, error) {
Expand All @@ -63,6 +89,34 @@ func New(ctx context.Context, cliCtx *cli.Context) (*Engine, error) {
return nil, err
}

clock := clock.New()

tags := cliCtx.StringSlice("tags")

engine, err := new(ctx, host_id, stageTimeouts, tags, node, nodeConfig, closer, client, clock, defaultTaskExecutor{}, nil)
if err != nil {
return nil, err
}

go engine.run(ctx, workers)
return engine, nil
}

// used for testing
func new(
ctx context.Context,
host string,
stageTimeouts map[string]time.Duration,
tags []string,
node api.FullNode,
nodeConfig tasks.NodeConfig,
closer lotus.NodeCloser,
client apiClient,
clock clock.Clock,
taskExecutor taskExecutor,
queueIsEmpty chan struct{},
) (*Engine, error) {

v, err := node.Version(ctx)
if err != nil {
return nil, err
Expand All @@ -71,94 +125,124 @@ func New(ctx context.Context, cliCtx *cli.Context) (*Engine, error) {
log.Infof("remote version: %s", v.Version)

// before we do anything, reset all this workers tasks
err = client.ResetWorker(cliCtx.Context, host_id)
err = client.ResetWorker(ctx, host)
if err != nil {
// for now, just log an error if this happens... seems like there are scenarios
// where we want to get the dealbot up and running even though reset worker failed for
// whatever eason
log.Errorf("error resetting tasks for worker: %s", err)
}

e := &Engine{
return &Engine{
client: client,
nodeConfig: nodeConfig,
node: node,
closer: closer,
host: host_id,
host: host,
shutdown: make(chan struct{}),
stopped: make(chan struct{}),
tags: cliCtx.StringSlice("tags"),
workerPing: make(chan struct{}),
tags: tags,
stageTimeouts: stageTimeouts,
}

var tasksCtx context.Context
tasksCtx, e.cancelTasks = context.WithCancel(context.Background())

go e.run(tasksCtx, workers)
return e, nil
clock: clock,
taskExecutor: taskExecutor,
queueIsEmpty: queueIsEmpty,
}, nil
}

func (e *Engine) run(tasksCtx context.Context, workers int) {
func (e *Engine) run(ctx context.Context, workers int) {
defer close(e.stopped)
defer e.cancelTasks()

var wg sync.WaitGroup
runChan := make(chan tasks.Task)

// Start workers
wg.Add(workers)
for i := 0; i < workers; i++ {
go e.worker(tasksCtx, i, &wg, runChan)
ctx, cancel := context.WithCancel(ctx)

e.taskLoop(ctx, wg, workers)
cancel()

// Stop workers and wait for all workers to exit
wg.Wait()
}

func (e *Engine) taskLoop(ctx context.Context, wg sync.WaitGroup, workers int) {

// super annoying -- make a new timer that is already stopped
popTimer := e.clock.Timer(math.MinInt64)
if !popTimer.Stop() {
<-popTimer.C
}

popTimer := time.NewTimer(noTasksWait)
taskLoop:
ready := make(chan struct{}, 1)
// we start in a ready state
ready <- struct{}{}

released := make(chan struct{}, workers)
active := 0

for {
// insure at most one operation runs before a quit
select {
case <-e.shutdown:
break taskLoop
return
default:
}

if e.pingWorker() && e.apiGood() {
// Check if there is a new task
task := e.popTask()
select {
case <-e.shutdown:
return
case <-ready:
// stop and drain timer if not already drained
if !popTimer.Stop() {
select {
case <-popTimer.C:
default:
}
}
task := e.tryPopTask()
if task != nil {
log.Infow("sending task to worker", "uuid", task.UUID.String())
runChan <- task
continue
active++
wg.Add(1)
go func() {
defer wg.Done()
e.runTask(ctx, task, 1, released)
}()
if active < workers {
ready <- struct{}{}
}
} else {
popTimer.Reset(noTasksWait)
// only used for testing
if e.queueIsEmpty != nil {
e.queueIsEmpty <- struct{}{}
}
}
}

// No tasks to run now, so wait for timer
select {
case <-popTimer.C:
// Time to check for more new tasks
popTimer.Reset(noTasksWait)
case <-e.shutdown:
// Engine shutdown
break taskLoop
// ready to queue next task if not otherwise queued
select {
case ready <- struct{}{}:
default:
}
case <-released:
active--
// ready to queue next task if not otherwise queued
select {
case ready <- struct{}{}:
default:
}
}
}

// Stop workers and wait for all workers to exit
close(runChan)
wg.Wait()
}

func (e *Engine) Close(ctx context.Context) {
close(e.shutdown) // signal to stop workers
select {
case <-e.stopped: // wait for workers to stop
case <-ctx.Done(): // if waiting too long
e.cancelTasks() // cancel any running tasks
<-e.stopped // wait for stop
}
<-e.stopped // wait for workers to stop
e.closer()
}

func (e *Engine) popTask() tasks.Task {
func (e *Engine) tryPopTask() tasks.Task {
if !e.apiGood() {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), popTaskTimeout)
defer cancel()

Expand All @@ -180,30 +264,16 @@ func (e *Engine) popTask() tasks.Task {
return task // found a runable task
}

func (e *Engine) worker(ctx context.Context, n int, wg *sync.WaitGroup, runChan <-chan tasks.Task) {
log.Infow("engine worker started", "worker_id", n)
defer wg.Done()
for {
select {
case task, ok := <-runChan:
if !ok {
return
}
e.runTask(ctx, task, n)
case <-e.workerPing:
}
}
}

func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) {
// Create a context to manage the running time of the current task
ctx, cancel := context.WithTimeout(ctx, maxTaskRunTime)
defer cancel()

runCount64, _ := task.RunCount.AsInt()
runCount := int(runCount64) + 1
func (e *Engine) runTask(ctx context.Context, task tasks.Task, runCount int, released chan<- struct{}) {
var err error
log.Infow("worker running task", "uuid", task.UUID.String(), "run_count", runCount, "worker_id", worker)
log.Infow("worker running task", "uuid", task.UUID.String(), "run_count", runCount)

var releaseOnce sync.Once
releaseWorker := func() {
releaseOnce.Do(func() {
released <- struct{}{}
})
}

// Define function to update task stage. Use shutdown context, not task
updateStage := func(ctx context.Context, stage string, stageDetails tasks.StageDetails) error {
Expand All @@ -225,7 +295,7 @@ func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) {

// Start deals
if task.RetrievalTask.Exists() {
err = tasks.MakeRetrievalDeal(ctx, e.nodeConfig, e.node, task.RetrievalTask.Must(), updateStage, log.Infow, e.stageTimeouts)
err = e.taskExecutor.MakeRetrievalDeal(ctx, e.nodeConfig, e.node, task.RetrievalTask.Must(), updateStage, log.Infow, e.stageTimeouts, releaseWorker)
if err != nil {
if err == context.Canceled {
// Engine closed, do not update final state
Expand All @@ -239,7 +309,7 @@ func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) {
tlog.Info("successfully retrieved data")
}
} else if task.StorageTask.Exists() {
err = tasks.MakeStorageDeal(ctx, e.nodeConfig, e.node, task.StorageTask.Must(), updateStage, log.Infow, e.stageTimeouts)
err = e.taskExecutor.MakeStorageDeal(ctx, e.nodeConfig, e.node, task.StorageTask.Must(), updateStage, log.Infow, e.stageTimeouts, releaseWorker)
if err != nil {
if err == context.Canceled {
// Engine closed, do not update final state
Expand All @@ -263,6 +333,9 @@ func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) {
tlog.Errorw("cannot get updated task to finalize", "err", err)
}

// if we haven't already released the queue to run more jobs, release it now
releaseWorker()

// Update task final status. Do not use task context.
var stageDetails tasks.StageDetails
if task.CurrentStageDetails.Exists() {
Expand Down Expand Up @@ -295,19 +368,6 @@ func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) {
}
}

// pingWorker returns true if a worker is available to read the workerPing
// channel. This does not guarantee that the worker will still be available
// after returning true if there are scheduled tasks that the scheduler may
// run.
func (e *Engine) pingWorker() bool {
select {
case e.workerPing <- struct{}{}:
return true
default:
return false
}
}

// apiGood returns true if the api can be reached and reports sufficient fil/cap to process tasks.
func (e *Engine) apiGood() bool {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
Expand Down
Loading

0 comments on commit d9f6c40

Please sign in to comment.