diff --git a/commands/retrieval_deal.go b/commands/retrieval_deal.go index cb87e653..5734a3e0 100644 --- a/commands/retrieval_deal.go +++ b/commands/retrieval_deal.go @@ -44,7 +44,7 @@ func makeRetrievalDeal(cctx *cli.Context) error { task := tasks.Type.RetrievalTask.Of(minerParam, payloadCid, carExport, "") - err = tasks.MakeRetrievalDeal(cctx.Context, nodeConfig, node, task, emptyUpdateStage, log.Infow, stageTimeouts) + err = tasks.MakeRetrievalDeal(cctx.Context, nodeConfig, node, task, emptyUpdateStage, log.Infow, stageTimeouts, func() {}) if err != nil { log.Fatal(err) } diff --git a/commands/storage_deal.go b/commands/storage_deal.go index e4357fbb..6fb12db1 100644 --- a/commands/storage_deal.go +++ b/commands/storage_deal.go @@ -45,5 +45,5 @@ func makeStorageDeal(cctx *cli.Context) error { task := tasks.Type.StorageTask.Of(miner, int64(maxPrice), int64(size.Bytes()), int64(startOffset), fastRetrieval, verified, "") - return tasks.MakeStorageDeal(cctx.Context, nodeConfig, node, task, emptyUpdateStage, log.Infow, stageTimeouts) + return tasks.MakeStorageDeal(cctx.Context, nodeConfig, node, task, emptyUpdateStage, log.Infow, stageTimeouts, func() {}) } diff --git a/engine/engine.go b/engine/engine.go index 74d06c3e..049ee6e7 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -2,10 +2,12 @@ package engine import ( "context" + "math" "os/exec" "sync" "time" + "github.com/benbjohnson/clock" "github.com/filecoin-project/dealbot/controller/client" "github.com/filecoin-project/dealbot/lotus" "github.com/filecoin-project/dealbot/tasks" @@ -24,20 +26,44 @@ const ( var log = logging.Logger("engine") +type apiClient interface { + GetTask(ctx context.Context, uuid string) (tasks.Task, error) + UpdateTask(ctx context.Context, uuid string, r tasks.UpdateTask) (tasks.Task, error) + PopTask(ctx context.Context, r tasks.PopTask) (tasks.Task, error) + ResetWorker(ctx context.Context, worker string) error +} + +type taskExecutor interface { + MakeStorageDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.StorageTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error + MakeRetrievalDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.RetrievalTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error +} + +type defaultTaskExecutor struct{} + +func (defaultTaskExecutor) MakeStorageDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.StorageTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error { + return tasks.MakeStorageDeal(ctx, config, node, task, updateStage, log, stageTimeouts, releaseWorker) +} + +func (defaultTaskExecutor) MakeRetrievalDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.RetrievalTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error { + return tasks.MakeRetrievalDeal(ctx, config, node, task, updateStage, log, stageTimeouts, releaseWorker) +} + type Engine struct { - host string - client *client.Client - - nodeConfig tasks.NodeConfig - node api.FullNode - closer lotus.NodeCloser - shutdown chan struct{} - stopped chan struct{} - tags []string - workerPing chan struct{} - cancelTasks context.CancelFunc + shutdown chan struct{} + stopped chan struct{} + queueIsEmpty chan struct{} + host string + tags []string stageTimeouts map[string]time.Duration + + // depedencies + node api.FullNode + nodeConfig tasks.NodeConfig + closer lotus.NodeCloser + client apiClient + clock clock.Clock + taskExecutor taskExecutor } func New(ctx context.Context, cliCtx *cli.Context) (*Engine, error) { @@ -63,6 +89,34 @@ func New(ctx context.Context, cliCtx *cli.Context) (*Engine, error) { return nil, err } + clock := clock.New() + + tags := cliCtx.StringSlice("tags") + + engine, err := new(ctx, host_id, stageTimeouts, tags, node, nodeConfig, closer, client, clock, defaultTaskExecutor{}, nil) + if err != nil { + return nil, err + } + + go engine.run(ctx, workers) + return engine, nil +} + +// used for testing +func new( + ctx context.Context, + host string, + stageTimeouts map[string]time.Duration, + tags []string, + node api.FullNode, + nodeConfig tasks.NodeConfig, + closer lotus.NodeCloser, + client apiClient, + clock clock.Clock, + taskExecutor taskExecutor, + queueIsEmpty chan struct{}, +) (*Engine, error) { + v, err := node.Version(ctx) if err != nil { return nil, err @@ -71,7 +125,7 @@ func New(ctx context.Context, cliCtx *cli.Context) (*Engine, error) { log.Infof("remote version: %s", v.Version) // before we do anything, reset all this workers tasks - err = client.ResetWorker(cliCtx.Context, host_id) + err = client.ResetWorker(ctx, host) if err != nil { // for now, just log an error if this happens... seems like there are scenarios // where we want to get the dealbot up and running even though reset worker failed for @@ -79,86 +133,116 @@ func New(ctx context.Context, cliCtx *cli.Context) (*Engine, error) { log.Errorf("error resetting tasks for worker: %s", err) } - e := &Engine{ + return &Engine{ client: client, nodeConfig: nodeConfig, node: node, closer: closer, - host: host_id, + host: host, shutdown: make(chan struct{}), stopped: make(chan struct{}), - tags: cliCtx.StringSlice("tags"), - workerPing: make(chan struct{}), + tags: tags, stageTimeouts: stageTimeouts, - } - - var tasksCtx context.Context - tasksCtx, e.cancelTasks = context.WithCancel(context.Background()) - - go e.run(tasksCtx, workers) - return e, nil + clock: clock, + taskExecutor: taskExecutor, + queueIsEmpty: queueIsEmpty, + }, nil } -func (e *Engine) run(tasksCtx context.Context, workers int) { +func (e *Engine) run(ctx context.Context, workers int) { defer close(e.stopped) - defer e.cancelTasks() var wg sync.WaitGroup - runChan := make(chan tasks.Task) - // Start workers - wg.Add(workers) - for i := 0; i < workers; i++ { - go e.worker(tasksCtx, i, &wg, runChan) + ctx, cancel := context.WithCancel(ctx) + + e.taskLoop(ctx, wg, workers) + cancel() + + // Stop workers and wait for all workers to exit + wg.Wait() +} + +func (e *Engine) taskLoop(ctx context.Context, wg sync.WaitGroup, workers int) { + + // super annoying -- make a new timer that is already stopped + popTimer := e.clock.Timer(math.MinInt64) + if !popTimer.Stop() { + <-popTimer.C } - popTimer := time.NewTimer(noTasksWait) -taskLoop: + ready := make(chan struct{}, 1) + // we start in a ready state + ready <- struct{}{} + + released := make(chan struct{}, workers) + active := 0 + for { + // insure at most one operation runs before a quit select { case <-e.shutdown: - break taskLoop + return default: } - if e.pingWorker() && e.apiGood() { - // Check if there is a new task - task := e.popTask() + select { + case <-e.shutdown: + return + case <-ready: + // stop and drain timer if not already drained + if !popTimer.Stop() { + select { + case <-popTimer.C: + default: + } + } + task := e.tryPopTask() if task != nil { - log.Infow("sending task to worker", "uuid", task.UUID.String()) - runChan <- task - continue + active++ + wg.Add(1) + go func() { + defer wg.Done() + e.runTask(ctx, task, 1, released) + }() + if active < workers { + ready <- struct{}{} + } + } else { + popTimer.Reset(noTasksWait) + // only used for testing + if e.queueIsEmpty != nil { + e.queueIsEmpty <- struct{}{} + } } - } - - // No tasks to run now, so wait for timer - select { case <-popTimer.C: - // Time to check for more new tasks - popTimer.Reset(noTasksWait) - case <-e.shutdown: - // Engine shutdown - break taskLoop + // ready to queue next task if not otherwise queued + select { + case ready <- struct{}{}: + default: + } + case <-released: + active-- + // ready to queue next task if not otherwise queued + select { + case ready <- struct{}{}: + default: + } } } - - // Stop workers and wait for all workers to exit - close(runChan) - wg.Wait() } func (e *Engine) Close(ctx context.Context) { close(e.shutdown) // signal to stop workers - select { - case <-e.stopped: // wait for workers to stop - case <-ctx.Done(): // if waiting too long - e.cancelTasks() // cancel any running tasks - <-e.stopped // wait for stop - } + <-e.stopped // wait for workers to stop e.closer() } -func (e *Engine) popTask() tasks.Task { +func (e *Engine) tryPopTask() tasks.Task { + if !e.apiGood() { + return nil + } + ctx, cancel := context.WithTimeout(context.Background(), popTaskTimeout) defer cancel() @@ -180,30 +264,16 @@ func (e *Engine) popTask() tasks.Task { return task // found a runable task } -func (e *Engine) worker(ctx context.Context, n int, wg *sync.WaitGroup, runChan <-chan tasks.Task) { - log.Infow("engine worker started", "worker_id", n) - defer wg.Done() - for { - select { - case task, ok := <-runChan: - if !ok { - return - } - e.runTask(ctx, task, n) - case <-e.workerPing: - } - } -} - -func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) { - // Create a context to manage the running time of the current task - ctx, cancel := context.WithTimeout(ctx, maxTaskRunTime) - defer cancel() - - runCount64, _ := task.RunCount.AsInt() - runCount := int(runCount64) + 1 +func (e *Engine) runTask(ctx context.Context, task tasks.Task, runCount int, released chan<- struct{}) { var err error - log.Infow("worker running task", "uuid", task.UUID.String(), "run_count", runCount, "worker_id", worker) + log.Infow("worker running task", "uuid", task.UUID.String(), "run_count", runCount) + + var releaseOnce sync.Once + releaseWorker := func() { + releaseOnce.Do(func() { + released <- struct{}{} + }) + } // Define function to update task stage. Use shutdown context, not task updateStage := func(ctx context.Context, stage string, stageDetails tasks.StageDetails) error { @@ -225,7 +295,7 @@ func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) { // Start deals if task.RetrievalTask.Exists() { - err = tasks.MakeRetrievalDeal(ctx, e.nodeConfig, e.node, task.RetrievalTask.Must(), updateStage, log.Infow, e.stageTimeouts) + err = e.taskExecutor.MakeRetrievalDeal(ctx, e.nodeConfig, e.node, task.RetrievalTask.Must(), updateStage, log.Infow, e.stageTimeouts, releaseWorker) if err != nil { if err == context.Canceled { // Engine closed, do not update final state @@ -239,7 +309,7 @@ func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) { tlog.Info("successfully retrieved data") } } else if task.StorageTask.Exists() { - err = tasks.MakeStorageDeal(ctx, e.nodeConfig, e.node, task.StorageTask.Must(), updateStage, log.Infow, e.stageTimeouts) + err = e.taskExecutor.MakeStorageDeal(ctx, e.nodeConfig, e.node, task.StorageTask.Must(), updateStage, log.Infow, e.stageTimeouts, releaseWorker) if err != nil { if err == context.Canceled { // Engine closed, do not update final state @@ -263,6 +333,9 @@ func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) { tlog.Errorw("cannot get updated task to finalize", "err", err) } + // if we haven't already released the queue to run more jobs, release it now + releaseWorker() + // Update task final status. Do not use task context. var stageDetails tasks.StageDetails if task.CurrentStageDetails.Exists() { @@ -295,19 +368,6 @@ func (e *Engine) runTask(ctx context.Context, task tasks.Task, worker int) { } } -// pingWorker returns true if a worker is available to read the workerPing -// channel. This does not guarantee that the worker will still be available -// after returning true if there are scheduled tasks that the scheduler may -// run. -func (e *Engine) pingWorker() bool { - select { - case e.workerPing <- struct{}{}: - return true - default: - return false - } -} - // apiGood returns true if the api can be reached and reports sufficient fil/cap to process tasks. func (e *Engine) apiGood() bool { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) diff --git a/engine/engine_test.go b/engine/engine_test.go new file mode 100644 index 00000000..a9278835 --- /dev/null +++ b/engine/engine_test.go @@ -0,0 +1,382 @@ +package engine + +import ( + "context" + "errors" + "reflect" + "sort" + "strconv" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/filecoin-project/dealbot/lotus" + "github.com/filecoin-project/dealbot/tasks" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/mocks" + "github.com/filecoin-project/lotus/chain/types" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +func TestEngineTiming(t *testing.T) { + ctx := context.Background() + testCases := map[string]struct { + testTasks testTasks + workers int + emptyAt []time.Duration + }{ + "schedules tasks in parallel up to max": { + testTasks: testTasks{ + { + availableAt: 0, + expectedPoppedAt: 0, + finishDuration: 100, + }, + { + availableAt: 0, + expectedPoppedAt: 0, + finishDuration: 100, + }, + { + availableAt: 0, + expectedPoppedAt: 0, + finishDuration: 100, + }, + { + availableAt: 0, + expectedPoppedAt: 100, + finishDuration: 100, + }, + }, + workers: 3, + }, + "schedules tasks in parallel up to max, released early": { + testTasks: testTasks{ + { + availableAt: 0, + expectedPoppedAt: 0, + finishDuration: 100, + }, + { + availableAt: 0, + expectedPoppedAt: 0, + finishDuration: 100, + }, + { + availableAt: 0, + expectedPoppedAt: 0, + releaseDuration: 50, + finishDuration: 100, + }, + { + availableAt: 0, + expectedPoppedAt: 50, + finishDuration: 100, + }, + }, + workers: 3, + }, + "pop on timeout up to max, released": { + testTasks: testTasks{ + { + availableAt: 100, + expectedPoppedAt: noTasksWait, + // finish total at noTasksWait*3+200 + finishDuration: noTasksWait*2 + 200, + }, + { + availableAt: noTasksWait + 100, + expectedPoppedAt: noTasksWait * 2, + releaseDuration: 0, + finishDuration: noTasksWait*2 + 200, + }, + { + availableAt: noTasksWait*2 + 100, + expectedPoppedAt: noTasksWait * 3, + finishDuration: noTasksWait*2 + 200, + }, + // parallel task should get picked up as soon as queue frees + // due to first task finishing + { + availableAt: noTasksWait*3 + 100, + expectedPoppedAt: noTasksWait*3 + 200, + releaseDuration: 0, + finishDuration: noTasksWait*2 + 200, + }, + }, + workers: 3, + emptyAt: []time.Duration{0, noTasksWait, noTasksWait * 2}, + }, + } + + for testCase, data := range testCases { + t.Run(testCase, func(t *testing.T) { + //ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + //defer cancel() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + host := "testhost" + + sort.Sort(data.testTasks) + generatedTestTasks := make([]generatedTestTask, 0, len(data.testTasks)) + for i, tt := range data.testTasks { + // alternate storage and retrieval + var task tasks.Task + if i%2 == 0 { + storageTask := tasks.Type.StorageTask.Of("t1000", 0, 1000000000, 6152, true, true, strconv.Itoa(i)) + task = tasks.Type.Task.New(nil, storageTask) + } else { + retrievalTask := tasks.Type.RetrievalTask.Of("t1000", "qmXXXXX", false, strconv.Itoa(i)) + task = tasks.Type.Task.New(retrievalTask, nil) + } + task = task.Assign(host, tasks.InProgress) + generatedTestTasks = append(generatedTestTasks, generatedTestTask{task, tt}) + } + + tags := []string{} + stageTimeouts := map[string]time.Duration{} + + // depedencies + walletAddress := address.TestAddress + head := &types.TipSet{} + node := mocks.NewMockFullNode(ctrl) + ctxType := reflect.TypeOf((*context.Context)(nil)).Elem() + amt := abi.NewTokenAmount(1000000000000) + node.EXPECT().Version(gomock.AssignableToTypeOf(ctxType)).Return(api.APIVersion{ + Version: "1.2.3", + }, nil) + node.EXPECT().WalletBalance(gomock.AssignableToTypeOf(ctxType), gomock.Eq(walletAddress)). + Return(amt, nil). + AnyTimes() + node.EXPECT().ChainHead(gomock.AssignableToTypeOf(ctxType)).Return(head, nil). + AnyTimes() + node.EXPECT().StateVerifiedClientStatus( + gomock.AssignableToTypeOf(ctxType), gomock.Eq(walletAddress), gomock.Eq(head.Key())). + Return(&amt, nil). + AnyTimes() + nodeConfig := tasks.NodeConfig{ + WalletAddress: walletAddress, + MinWalletBalance: big.Zero(), + MinWalletCap: big.Zero(), + } + var closer lotus.NodeCloser = func() {} + clock := clock.NewMock() + startTime := clock.Now() + client := &testAPIClient{ + startTime: startTime, + clock: clock, + tasks: generatedTestTasks, + popped: make(map[string]struct{}), + } + receivedTasks := make(chan int, 1) + taskExecutor := &testTaskExecutor{ + receivedTasks: receivedTasks, + clock: clock, + tasks: data.testTasks, + } + + queueIsEmpty := make(chan struct{}, 1) + e, err := new(ctx, host, stageTimeouts, tags, node, nodeConfig, closer, client, clock, taskExecutor, queueIsEmpty) + require.NoError(t, err) + done := make(chan struct{}, 1) + go func() { + e.run(ctx, data.workers) + done <- struct{}{} + }() + + checkPointsByTime := make(map[time.Duration]*checkPoint) + for i, task := range data.testTasks { + _, ok := checkPointsByTime[task.availableAt] + if !ok { + checkPointsByTime[task.availableAt] = &checkPoint{time: task.availableAt, tasks: map[int]testTask{}} + } + cp, ok := checkPointsByTime[task.expectedPoppedAt] + if !ok { + cp = &checkPoint{time: task.expectedPoppedAt, tasks: map[int]testTask{}} + checkPointsByTime[task.expectedPoppedAt] = cp + } + cp.tasks[i] = task + } + for _, emptyAt := range data.emptyAt { + cp, ok := checkPointsByTime[emptyAt] + if !ok { + cp = &checkPoint{time: emptyAt, tasks: map[int]testTask{}} + checkPointsByTime[emptyAt] = cp + } + cp.emptyAt = true + } + checkPoints := make([]*checkPoint, 0, len(checkPointsByTime)) + for _, cp := range checkPointsByTime { + checkPoints = append(checkPoints, cp) + } + sort.Slice(checkPoints, func(i, j int) bool { return checkPoints[i].time < checkPoints[j].time }) + for _, cp := range checkPoints { + currentTime := clock.Since(startTime) + clock.Add(cp.time - currentTime) + seen := map[int]bool{} + // check popped tasks + for i := 0; i < len(cp.tasks); i++ { + select { + case poppedTask := <-receivedTasks: + _, expected := cp.tasks[poppedTask] + require.True(t, expected) + require.False(t, seen[poppedTask]) + seen[poppedTask] = true + case <-ctx.Done(): + t.Fatal("should have popped expected task at given time but didn't") + } + } + // check queue empty + if cp.emptyAt { + select { + case <-queueIsEmpty: + case <-ctx.Done(): + t.Fatal("expected queue to be empty, but was not") + } + } + } + e.Close(ctx) + select { + case <-done: + case <-ctx.Done(): + t.Fatal("did not finish") + } + }) + } +} + +type checkPoint struct { + time time.Duration + tasks map[int]testTask + emptyAt bool +} + +type testTask struct { + availableAt time.Duration + expectedPoppedAt time.Duration + releaseDuration time.Duration + finishDuration time.Duration +} + +type generatedTestTask struct { + tasks.Task + testTask +} + +type testAPIClient struct { + startTime time.Time + clock clock.Clock + tasks []generatedTestTask + popped map[string]struct{} +} + +func (tapi *testAPIClient) GetTask(ctx context.Context, uuid string) (tasks.Task, error) { + for _, task := range tapi.tasks { + if task.Task.UUID.String() == uuid { + return task.Task, nil + } + } + return nil, errors.New("not found") +} + +func (tapi *testAPIClient) UpdateTask(ctx context.Context, uuid string, r tasks.UpdateTask) (tasks.Task, error) { + for _, task := range tapi.tasks { + if task.Task.UUID.String() == uuid { + return task.Task, nil + } + } + return nil, errors.New("not found") +} + +func (tapi *testAPIClient) PopTask(ctx context.Context, r tasks.PopTask) (tasks.Task, error) { + for _, task := range tapi.tasks { + _, popped := tapi.popped[task.UUID.String()] + if !popped && task.availableAt <= tapi.clock.Since(tapi.startTime) { + tapi.popped[task.UUID.String()] = struct{}{} + return task.Task, nil + } + } + return nil, nil +} + +func (tapi *testAPIClient) ResetWorker(ctx context.Context, worker string) error { + return nil +} + +type testTaskExecutor struct { + receivedTasks chan<- int + clock clock.Clock + tasks testTasks +} + +func (tte *testTaskExecutor) runTask(ctx context.Context, index int, releaseWorker func()) error { + task := tte.tasks[index] + var timer *clock.Timer + if task.releaseDuration != time.Duration(0) { + timer = tte.clock.Timer(task.releaseDuration) + tte.receivedTasks <- index + // wait for release + select { + case <-timer.C: + releaseWorker() + case <-ctx.Done(): + return nil + } + // reset to remaining time left on task + timer.Reset(task.finishDuration - task.releaseDuration) + } else { + timer = tte.clock.Timer(task.finishDuration) + tte.receivedTasks <- index + } + + // wait for final duration + select { + case <-timer.C: + case <-ctx.Done(): + } + return nil +} + +func (tte *testTaskExecutor) MakeStorageDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.StorageTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error { + tag := task.Tag.Must().String() + index, err := strconv.Atoi(tag) + if err != nil { + return err + } + if index > len(tte.tasks) { + return errors.New("index out of bounds") + } + return tte.runTask(ctx, index, releaseWorker) +} + +func (tte *testTaskExecutor) MakeRetrievalDeal(ctx context.Context, config tasks.NodeConfig, node api.FullNode, task tasks.RetrievalTask, updateStage tasks.UpdateStage, log tasks.LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error { + tag := task.Tag.Must().String() + index, err := strconv.Atoi(tag) + if err != nil { + return err + } + if index > len(tte.tasks) { + return errors.New("index out of bounds") + } + return tte.runTask(ctx, index, releaseWorker) +} + +type testTasks []testTask + +func (t testTasks) Len() int { + return len(t) +} +func (t testTasks) Less(i int, j int) bool { + if t[i].availableAt == t[j].availableAt { + return t[i].expectedPoppedAt < t[j].expectedPoppedAt + } + return t[i].availableAt < t[j].availableAt +} +func (t testTasks) Swap(i int, j int) { + t[i], t[j] = t[j], t[i] +} diff --git a/go.mod b/go.mod index 9283b157..f691ea8a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/filecoin-project/dealbot go 1.16 require ( + github.com/benbjohnson/clock v1.0.3 github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2 github.com/evanw/esbuild v0.12.9 github.com/filecoin-project/go-address v0.0.5 diff --git a/tasks/model.go b/tasks/model.go index 1a946fd7..0b30492f 100644 --- a/tasks/model.go +++ b/tasks/model.go @@ -326,6 +326,20 @@ func (t *_Task) Schedule() (string, string) { return schedule, limit } +func (t *_Task) Tag() string { + if rt := t.RetrievalTask; rt.Exists() { + if tag := rt.Must().Tag; tag.Exists() { + return tag.Must().String() + } + } + if st := t.StorageTask; st.Exists() { + if tag := st.Must().Tag; tag.Exists() { + return tag.Must().String() + } + } + return "" +} + func (t *_Task) Assign(worker string, status Status) Task { newTask := _Task{ UUID: t.UUID, diff --git a/tasks/retrieval_deal.go b/tasks/retrieval_deal.go index 3bf4eb87..090084cb 100644 --- a/tasks/retrieval_deal.go +++ b/tasks/retrieval_deal.go @@ -34,15 +34,16 @@ var RetrievalDealStages = []RetrievalStage{ RetrievalStageDealComplete, } -func MakeRetrievalDeal(ctx context.Context, config NodeConfig, node api.FullNode, task RetrievalTask, updateStage UpdateStage, log LogStatus, stageTimeouts map[string]time.Duration) error { +func MakeRetrievalDeal(ctx context.Context, config NodeConfig, node api.FullNode, task RetrievalTask, updateStage UpdateStage, log LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error { de := &retrievalDealExecutor{ dealExecutor: dealExecutor{ - ctx: ctx, - config: config, - node: node, - miner: task.Miner.x, - log: log, - makeHost: libp2p.New, + ctx: ctx, + config: config, + node: node, + miner: task.Miner.x, + log: log, + makeHost: libp2p.New, + releaseWorker: releaseWorker, }, task: task, } diff --git a/tasks/storage_deal.go b/tasks/storage_deal.go index 1fd978f0..287cd583 100644 --- a/tasks/storage_deal.go +++ b/tasks/storage_deal.go @@ -41,15 +41,16 @@ const ( startOffsetDefault = 30760 ) -func MakeStorageDeal(ctx context.Context, config NodeConfig, node api.FullNode, task StorageTask, updateStage UpdateStage, log LogStatus, stageTimeouts map[string]time.Duration) error { +func MakeStorageDeal(ctx context.Context, config NodeConfig, node api.FullNode, task StorageTask, updateStage UpdateStage, log LogStatus, stageTimeouts map[string]time.Duration, releaseWorker func()) error { de := &storageDealExecutor{ dealExecutor: dealExecutor{ - ctx: ctx, - config: config, - node: node, - miner: task.Miner.x, - log: log, - makeHost: libp2p.New, + ctx: ctx, + config: config, + node: node, + miner: task.Miner.x, + log: log, + makeHost: libp2p.New, + releaseWorker: releaseWorker, }, task: task, } @@ -112,16 +113,17 @@ func MakeStorageDeal(ctx context.Context, config NodeConfig, node api.FullNode, } type dealExecutor struct { - ctx context.Context - config NodeConfig - node api.FullNode - miner string - log LogStatus - tipSet *types.TipSet - minerAddress address.Address - minerInfo miner.MinerInfo - pi peer.AddrInfo - makeHost func(ctx context.Context, opts ...config.Option) (host.Host, error) + ctx context.Context + config NodeConfig + node api.FullNode + miner string + log LogStatus + tipSet *types.TipSet + minerAddress address.Address + minerInfo miner.MinerInfo + pi peer.AddrInfo + releaseWorker func() + makeHost func(ctx context.Context, opts ...config.Option) (host.Host, error) } type storageDealExecutor struct { @@ -357,6 +359,9 @@ func (de *storageDealExecutor) executeAndMonitorDeal(ctx context.Context, update "provider", info.Provider, ) lastState = info.State + if info.State == storagemarket.StorageDealCheckForAcceptance { + de.releaseWorker() + } } switch info.State { diff --git a/tasks/storage_deal_test.go b/tasks/storage_deal_test.go index 3ee70d58..a3930dd4 100644 --- a/tasks/storage_deal_test.go +++ b/tasks/storage_deal_test.go @@ -75,19 +75,28 @@ func TestExecuteDeal(t *testing.T) { root := generateRandomCID(t) proposalCid := generateRandomCID(t) basePrice := abi.NewTokenAmount(1000000000000) - dealInfo := make(chan api.DealInfo, 1) + dealInfo := make(chan api.DealInfo, 2) + dealInfo <- api.DealInfo{ + ProposalCid: proposalCid, + State: storagemarket.StorageDealCheckForAcceptance, + DealStages: &storagemarket.DealStages{}, + } dealInfo <- api.DealInfo{ ProposalCid: proposalCid, State: storagemarket.StorageDealActive, DealStages: &storagemarket.DealStages{}, } + released := false de := &storageDealExecutor{ dealExecutor: dealExecutor{ ctx: ctx, node: node, log: func(msg string, keysAndValues ...interface{}) {}, tipSet: &types.TipSet{}, + releaseWorker: func() { + released = true + }, }, task: Type.StorageTask.Of("f1000", 1000000000000, 21474836480, 6152, true, true, "verified"), importRes: &api.ImportRes{ @@ -118,6 +127,8 @@ func TestExecuteDeal(t *testing.T) { err := de.executeAndMonitorDeal(ctx, func(ctx context.Context, stage string, stageDetails StageDetails) error { return nil }, map[string]time.Duration{}) require.NoError(t, err) + + require.True(t, released) } func assertHasLogWithPrefix(t *testing.T, logs List_Logs, prefix string) (postfix string) {