Skip to content

Commit

Permalink
feat(engine): release tasks early
Browse files Browse the repository at this point in the history
Setup queue so tasks can be released by worker before being totally finished. setup storage to
release when data transfer is complete
  • Loading branch information
hannahhoward committed Jun 25, 2021
1 parent 214b064 commit 238817f
Show file tree
Hide file tree
Showing 9 changed files with 598 additions and 124 deletions.
2 changes: 1 addition & 1 deletion commands/retrieval_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func makeRetrievalDeal(cctx *cli.Context) error {

task := tasks.Type.RetrievalTask.Of(minerParam, payloadCid, carExport, "")

err = tasks.MakeRetrievalDeal(cctx.Context, nodeConfig, node, task, emptyUpdateStage, log.Infow, stageTimeouts)
err = tasks.MakeRetrievalDeal(cctx.Context, nodeConfig, node, task, emptyUpdateStage, log.Infow, stageTimeouts, func() {})
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion commands/storage_deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ func makeStorageDeal(cctx *cli.Context) error {

task := tasks.Type.StorageTask.Of(miner, int64(maxPrice), int64(size.Bytes()), int64(startOffset), fastRetrieval, verified, "")

return tasks.MakeStorageDeal(cctx.Context, nodeConfig, node, task, emptyUpdateStage, log.Infow, stageTimeouts)
return tasks.MakeStorageDeal(cctx.Context, nodeConfig, node, task, emptyUpdateStage, log.Infow, stageTimeouts, func() {})
}
254 changes: 157 additions & 97 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package engine

import (
"context"
"math"
"os/exec"
"sync"
"time"

"github.com/benbjohnson/clock"
"github.com/filecoin-project/dealbot/controller/client"
"github.com/filecoin-project/dealbot/lotus"
"github.com/filecoin-project/dealbot/tasks"
Expand All @@ -24,20 +26,44 @@ const (

var log = logging.Logger("engine")

type apiClient interface {
GetTask(ctx context.Context, uuid string) (tasks.Task, error)
UpdateTask(ctx context.Context, uuid string, r tasks.UpdateTask) (tasks.Task, error)
PopTask(ctx context.Context, r tasks.PopTask) (tasks.Task, error)
ResetWorker(ctx context.Context, worker string) error
}

type taskExecutor interface {
MakeStorageDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.StorageTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error
MakeRetrievalDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.RetrievalTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error
}

type defaultTaskExecutor struct{}

func (defaultTaskExecutor) MakeStorageDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.StorageTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error {
return tasks.MakeStorageDeal(ctx, config, node, task, updateStage, log, stageTimeouts, releaseWorker)
}

func (defaultTaskExecutor) MakeRetrievalDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.RetrievalTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error {
return tasks.MakeRetrievalDeal(ctx, config, node, task, updateStage, log, stageTimeouts, releaseWorker)
}

type Engine struct {
host string
client *client.Client

nodeConfig tasks.NodeConfig
node api.FullNode
closer lotus.NodeCloser
shutdown chan struct{}
stopped chan struct{}
tags []string
workerPing chan struct{}
cancelTasks context.CancelFunc
shutdown chan struct{}
stopped chan struct{}
queueIsEmpty chan struct{}

host string
tags []string
stageTimeouts map[string]time.Duration

// depedencies
node api.FullNode
nodeConfig tasks.NodeConfig
closer lotus.NodeCloser
client apiClient
clock clock.Clock
taskExecutor taskExecutor
}

func New(ctx context.Context, cliCtx *cli.Context) (*Engine, error) {
Expand All @@ -63,6 +89,34 @@ func New(ctx context.Context, cliCtx *cli.Context) (*Engine, error) {
return nil, err
}

clock := clock.New()

tags := cliCtx.StringSlice("tags")

engine, err := new(ctx, host_id, stageTimeouts, tags, node, nodeConfig, closer, client, clock, defaultTaskExecutor{}, nil)
if err != nil {
return nil, err
}

go engine.run(ctx, workers)
return engine, nil
}

// used for testing
func new(
ctx context.Context,
host string,
stageTimeouts map[string]time.Duration,
tags []string,
node api.FullNode,
nodeConfig tasks.NodeConfig,
closer lotus.NodeCloser,
client apiClient,
clock clock.Clock,
taskExecutor taskExecutor,
queueIsEmpty chan struct{},
) (*Engine, error) {

v, err := node.Version(ctx)
if err != nil {
return nil, err
Expand All @@ -71,94 +125,124 @@ func New(ctx context.Context, cliCtx *cli.Context) (*Engine, error) {
log.Infof("remote version: %s", v.Version)

// before we do anything, reset all this workers tasks
err = client.ResetWorker(cliCtx.Context, host_id)
err = client.ResetWorker(ctx, host)
if err != nil {
// for now, just log an error if this happens... seems like there are scenarios
// where we want to get the dealbot up and running even though reset worker failed for
// whatever eason
log.Errorf("error resetting tasks for worker: %s", err)
}

e := &Engine{
return &Engine{
client: client,
nodeConfig: nodeConfig,
node: node,
closer: closer,
host: host_id,
host: host,
shutdown: make(chan struct{}),
stopped: make(chan struct{}),
tags: cliCtx.StringSlice("tags"),
workerPing: make(chan struct{}),
tags: tags,
stageTimeouts: stageTimeouts,
}

var tasksCtx context.Context
tasksCtx, e.cancelTasks = context.WithCancel(context.Background())

go e.run(tasksCtx, workers)
return e, nil
clock: clock,
taskExecutor: taskExecutor,
queueIsEmpty: queueIsEmpty,
}, nil
}

func (e *Engine) run(tasksCtx context.Context, workers int) {
func (e *Engine) run(ctx context.Context, workers int) {
defer close(e.stopped)
defer e.cancelTasks()

var wg sync.WaitGroup
runChan := make(chan tasks.Task)

// Start workers
wg.Add(workers)
for i := 0; i < workers; i++ {
go e.worker(tasksCtx, i, &wg, runChan)
ctx, cancel := context.WithCancel(ctx)

e.taskLoop(ctx, wg, workers)
cancel()

// Stop workers and wait for all workers to exit
wg.Wait()
}

func (e *Engine) taskLoop(ctx context.Context, wg sync.WaitGroup, workers int) {

// super annoying -- make a new timer that is already stopped
popTimer := e.clock.Timer(math.MinInt64)
if !popTimer.Stop() {
<-popTimer.C
}

popTimer := time.NewTimer(noTasksWait)
taskLoop:
ready := make(chan struct{}, 1)
// we start in a ready state
ready <- struct{}{}

released := make(chan struct{}, workers)
active := 0

for {
// insure at most one operation runs before a quit
select {
case <-e.shutdown:
break taskLoop
return
default:
}

if e.pingWorker() && e.apiGood() {
// Check if there is a new task
task := e.popTask()
select {
case <-e.shutdown:
return
case <-ready:
// stop and drain timer if not already drained
if !popTimer.Stop() {
select {
case <-popTimer.C:
default:
}
}
task := e.tryPopTask()
if task != nil {
log.Infow("sending task to worker", "uuid", task.UUID.String())
runChan <- task
continue
active++
wg.Add(1)
go func() {
defer wg.Done()
e.runTask(ctx, task, 1, released)
}()
if active < workers {
ready <- struct{}{}
}
} else {
popTimer.Reset(noTasksWait)
// only used for testing
if e.queueIsEmpty != nil {
e.queueIsEmpty <- struct{}{}
}
}
}

// No tasks to run now, so wait for timer
select {
case <-popTimer.C:
// Time to check for more new tasks
popTimer.Reset(noTasksWait)
case <-e.shutdown:
// Engine shutdown
break taskLoop
// ready to queue next task if not otherwise queued
select {
case ready <- struct{}{}:
default:
}
case <-released:
active--
// ready to queue next task if not otherwise queued
select {
case ready <- struct{}{}:
default:
}
}
}

// Stop workers and wait for all workers to exit
close(runChan)
wg.Wait()
}

func (e *Engine) Close(ctx context.Context) {
close(e.shutdown) // signal to stop workers
select {
case <-e.stopped: // wait for workers to stop
case <-ctx.Done(): // if waiting too long
e.cancelTasks() // cancel any running tasks
<-e.stopped // wait for stop
}
<-e.stopped // wait for workers to stop
e.closer()
}

func (e *Engine) popTask() tasks.Task {
func (e *Engine) tryPopTask() tasks.Task {
if !e.apiGood() {
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), popTaskTimeout)
defer cancel()

Expand All @@ -180,30 +264,16 @@ func (e *Engine) popTask() tasks.Task {
return task // found a runable task
}

func (e *Engine) worker(ctx context.Context, n int, wg *sync.WaitGroup, runChan <-chan tasks.Task) {
log.Infow("engine worker started", "worker_id", n)
defer wg.Done()
for {
select {
case task, ok := <-runChan:
if !ok {
return
}
e.runTask(ctx, task, n)
case <-e.workerPing:
}
}
}

func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) {
// Create a context to manage the running time of the current task
ctx, cancel := context.WithTimeout(ctx, maxTaskRunTime)
defer cancel()

runCount64, _ := task.RunCount.AsInt()
runCount := int(runCount64) + 1
func (e *Engine) runTask(ctx context.Context, task tasks.Task, runCount int, released chan<- struct{}) {
var err error
log.Infow("worker running task", "uuid", task.UUID.String(), "run_count", runCount, "worker_id", worker)
log.Infow("worker running task", "uuid", task.UUID.String(), "run_count", runCount)

var releaseOnce sync.Once
releaseWorker := func() {
releaseOnce.Do(func() {
released <- struct{}{}
})
}

// Define function to update task stage. Use shutdown context, not task
updateStage := func(ctx context.Context, stage string, stageDetails tasks.StageDetails) error {
Expand All @@ -225,7 +295,7 @@ func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) {

// Start deals
if task.RetrievalTask.Exists() {
err = tasks.MakeRetrievalDeal(ctx, e.nodeConfig, e.node, task.RetrievalTask.Must(), updateStage, log.Infow, e.stageTimeouts)
err = e.taskExecutor.MakeRetrievalDeal(ctx, e.nodeConfig, e.node, task.RetrievalTask.Must(), updateStage, log.Infow, e.stageTimeouts, releaseWorker)
if err != nil {
if err == context.Canceled {
// Engine closed, do not update final state
Expand All @@ -239,7 +309,7 @@ func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) {
tlog.Info("successfully retrieved data")
}
} else if task.StorageTask.Exists() {
err = tasks.MakeStorageDeal(ctx, e.nodeConfig, e.node, task.StorageTask.Must(), updateStage, log.Infow, e.stageTimeouts)
err = e.taskExecutor.MakeStorageDeal(ctx, e.nodeConfig, e.node, task.StorageTask.Must(), updateStage, log.Infow, e.stageTimeouts, releaseWorker)
if err != nil {
if err == context.Canceled {
// Engine closed, do not update final state
Expand All @@ -263,6 +333,9 @@ func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) {
tlog.Errorw("cannot get updated task to finalize", "err", err)
}

// if we haven't already released the queue to run more jobs, release it now
releaseWorker()

// Update task final status. Do not use task context.
var stageDetails tasks.StageDetails
if task.CurrentStageDetails.Exists() {
Expand Down Expand Up @@ -295,19 +368,6 @@ func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) {
}
}

// pingWorker returns true if a worker is available to read the workerPing
// channel. This does not guarantee that the worker will still be available
// after returning true if there are scheduled tasks that the scheduler may
// run.
func (e *Engine) pingWorker() bool {
select {
case e.workerPing <- struct{}{}:
return true
default:
return false
}
}

// apiGood returns true if the api can be reached and reports sufficient fil/cap to process tasks.
func (e *Engine) apiGood() bool {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
Expand Down
Loading

0 comments on commit 238817f

Please sign in to comment.