From ac7dc28cfb2c1439e40cae38e4ed5757e696b39a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 9 Jul 2020 12:58:52 +0200 Subject: [PATCH 01/14] sched: WIP Windows --- sched.go | 388 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 198 insertions(+), 190 deletions(-) diff --git a/sched.go b/sched.go index 9fddd7b..af6981b 100644 --- a/sched.go +++ b/sched.go @@ -3,11 +3,11 @@ package sectorstorage import ( "container/heap" "context" + "math/rand" "sort" "sync" "time" - "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" "github.com/filecoin-project/specs-actors/actors/abi" @@ -20,6 +20,11 @@ type schedPrioCtxKey int var SchedPriorityKey schedPrioCtxKey var DefaultSchedPriority = 0 +var SelectorTimeout = 5 * time.Second + +var ( + SchedWindows = 2 +) func getPriority(ctx context.Context) int { sp := ctx.Value(SchedPriorityKey) @@ -56,11 +61,63 @@ type scheduler struct { watchClosing chan WorkerID workerClosing chan WorkerID - schedule chan *workerRequest - workerFree chan WorkerID - closing chan struct{} + schedule chan *workerRequest + windowRequests chan *schedWindowRequest + + // owned by the sh.runSched goroutine + schedQueue *requestQueue + openWindows []*schedWindowRequest + + closing chan struct{} +} + +type workerHandle struct { + w Worker + + info storiface.WorkerInfo + + preparing *activeResources + active *activeResources +} + +type schedWindowRequest struct { + worker WorkerID + + done chan *schedWindow +} + +type schedWindow struct { + worker WorkerID + allocated *activeResources + todo []*workerRequest +} + +type activeResources struct { + memUsedMin uint64 + memUsedMax uint64 + gpuUsed bool + cpuUse uint64 + + cond *sync.Cond +} + +type workerRequest struct { + sector abi.SectorID + taskType sealtasks.TaskType + priority int // larger values more important + sel WorkerSelector + + prepare WorkerAction + work WorkerAction + + index int // The index of the item in the heap. + + ret chan<- workerResponse + ctx context.Context +} - schedQueue *requestQueue +type workerResponse struct { + err error } func newScheduler(spt abi.RegisteredSealProof) *scheduler { @@ -75,9 +132,8 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler { watchClosing: make(chan WorkerID), workerClosing: make(chan WorkerID), - schedule: make(chan *workerRequest), - workerFree: make(chan WorkerID), - closing: make(chan struct{}), + schedule: make(chan *workerRequest), + closing: make(chan struct{}), schedQueue: &requestQueue{}, } @@ -115,25 +171,6 @@ func (sh *scheduler) Schedule(ctx context.Context, sector abi.SectorID, taskType } } -type workerRequest struct { - sector abi.SectorID - taskType sealtasks.TaskType - priority int // larger values more important - sel WorkerSelector - - prepare WorkerAction - work WorkerAction - - index int // The index of the item in the heap. - - ret chan<- workerResponse - ctx context.Context -} - -type workerResponse struct { - err error -} - func (r *workerRequest) respond(err error) { select { case r.ret <- workerResponse{err: err}: @@ -142,46 +179,25 @@ func (r *workerRequest) respond(err error) { } } -type activeResources struct { - memUsedMin uint64 - memUsedMax uint64 - gpuUsed bool - cpuUse uint64 - - cond *sync.Cond -} - -type workerHandle struct { - w Worker - - info storiface.WorkerInfo - - preparing *activeResources - active *activeResources -} - func (sh *scheduler) runSched() { go sh.runWorkerWatcher() for { select { case w := <-sh.newWorkers: - sh.schedNewWorker(w) + sh.newWorker(w) + case wid := <-sh.workerClosing: - sh.schedDropWorker(wid) - case req := <-sh.schedule: - scheduled, err := sh.maybeSchedRequest(req) - if err != nil { - req.respond(err) - continue - } - if scheduled { - continue - } + sh.dropWorker(wid) + case req := <-sh.schedule: heap.Push(sh.schedQueue, req) - case wid := <-sh.workerFree: - sh.onWorkerFreed(wid) + sh.trySched() + + case req := <-sh.windowRequests: + sh.openWindows = append(sh.openWindows, req) + sh.trySched() + case <-sh.closing: sh.schedClose() return @@ -189,169 +205,161 @@ func (sh *scheduler) runSched() { } } -func (sh *scheduler) onWorkerFreed(wid WorkerID) { - sh.workersLk.Lock() - w, ok := sh.workers[wid] - sh.workersLk.Unlock() - if !ok { - log.Warnf("onWorkerFreed on invalid worker %d", wid) - return - } +func (sh *scheduler) trySched() { + /* + This assigns tasks to workers based on: + - Task priority (achieved by handling sh.schedQueue in order, since it's already sorted by priority) + - Worker resource availability + - Task-specified worker preference (acceptableWindows array below sorted by this preference) + - Window request age - for i := 0; i < sh.schedQueue.Len(); i++ { - req := (*sh.schedQueue)[i] + 1. For each task in the schedQueue find windows which can handle them + 1.1. Create list of windows capable of handling a task + 1.2. Sort windows according to task selector preferences + 2. Going through schedQueue again, assign task to first acceptable window + with resources available + 3. Submit windows with scheduled tasks to workers - ok, err := req.sel.Ok(req.ctx, req.taskType, sh.spt, w) - if err != nil { - log.Errorf("onWorkerFreed req.sel.Ok error: %+v", err) - continue - } + */ - if !ok { - continue - } - - scheduled, err := sh.maybeSchedRequest(req) - if err != nil { - req.respond(err) - continue - } + windows := make([]schedWindow, len(sh.openWindows)) + acceptableWindows := make([][]int, sh.schedQueue.Len()) - if scheduled { - heap.Remove(sh.schedQueue, i) - i-- - continue - } - } -} + // Step 1 + for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { + task := (*sh.schedQueue)[sqi] + needRes := ResourceTable[task.taskType][sh.spt] -var selectorTimeout = 5 * time.Second + for wnd, windowRequest := range sh.openWindows { + worker := sh.workers[windowRequest.worker] -func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) { - sh.workersLk.Lock() - defer sh.workersLk.Unlock() - - tried := 0 - var acceptable []WorkerID + // TODO: allow bigger windows + if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, worker.info.Resources) { + continue + } - needRes := ResourceTable[req.taskType][sh.spt] + ok, err := task.sel.Ok(task.ctx, task.taskType, sh.spt, worker) + if err != nil { + log.Errorf("trySched(1) req.sel.Ok error: %+v", err) + continue + } - for wid, worker := range sh.workers { - rpcCtx, cancel := context.WithTimeout(req.ctx, selectorTimeout) - ok, err := req.sel.Ok(rpcCtx, req.taskType, sh.spt, worker) - cancel() + if !ok { + continue + } - if err != nil { - return false, err + acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd) } - if !ok { + if len(acceptableWindows[sqi]) == 0 { continue } - tried++ - if !canHandleRequest(needRes, wid, worker.info.Resources, worker.preparing) { - continue - } + // Pick best worker (shuffle in case some workers are equally as good) + rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) { + acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i] + }) + sort.SliceStable(acceptableWindows, func(i, j int) bool { + wii := sh.openWindows[acceptableWindows[sqi][i]].worker + wji := sh.openWindows[acceptableWindows[sqi][j]].worker + + if wii == wji { + // for the same worker prefer older windows + return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] + } - acceptable = append(acceptable, wid) + wi := sh.workers[wii] + wj := sh.workers[wji] + + rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout) + defer cancel() + + r, err := task.sel.Cmp(rpcCtx, task.taskType, wi, wj) + if err != nil { + log.Error("selecting best worker: %s", err) + } + return r + }) } - if len(acceptable) > 0 { - { - var serr error + // Step 2 + scheduled := 0 - sort.SliceStable(acceptable, func(i, j int) bool { - rpcCtx, cancel := context.WithTimeout(req.ctx, selectorTimeout) - defer cancel() - r, err := req.sel.Cmp(rpcCtx, req.taskType, sh.workers[acceptable[i]], sh.workers[acceptable[j]]) + for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { + task := (*sh.schedQueue)[sqi] + needRes := ResourceTable[task.taskType][sh.spt] - if err != nil { - serr = multierror.Append(serr, err) - } - return r - }) + selectedWindow := -1 + for _, wnd := range acceptableWindows[sqi+scheduled] { + wid := sh.openWindows[wnd].worker + wr := sh.workers[wid].info.Resources - if serr != nil { - return false, xerrors.Errorf("error(s) selecting best worker: %w", serr) + // TODO: allow bigger windows + if windows[wnd].allocated.canHandleRequest(needRes, wid, wr) { + continue } - } - return true, sh.assignWorker(acceptable[0], sh.workers[acceptable[0]], req) - } + windows[wnd].allocated.add(wr, needRes) - if tried == 0 { - return false, xerrors.New("maybeSchedRequest didn't find any good workers") - } + selectedWindow = wnd + break + } - return false, nil // put in waiting queue -} + windows[selectedWindow].todo = append(windows[selectedWindow].todo, task) -func (sh *scheduler) assignWorker(wid WorkerID, w *workerHandle, req *workerRequest) error { - needRes := ResourceTable[req.taskType][sh.spt] + heap.Remove(sh.schedQueue, sqi) + sqi-- + scheduled++ + } - w.preparing.add(w.info.Resources, needRes) + // Step 3 - go func() { - err := req.prepare(req.ctx, w.w) - sh.workersLk.Lock() + if scheduled == 0 { + return + } - if err != nil { - w.preparing.free(w.info.Resources, needRes) - sh.workersLk.Unlock() + scheduledWindows := map[int]struct{}{} + for wnd, window := range windows { + if len(window.todo) == 0 { + // Nothing scheduled here, keep the window open + continue + } - select { - case sh.workerFree <- wid: - case <-sh.closing: - log.Warnf("scheduler closed while sending response (prepare error: %+v)", err) - } + scheduledWindows[wnd] = struct{}{} - select { - case req.ret <- workerResponse{err: err}: - case <-req.ctx.Done(): - log.Warnf("request got cancelled before we could respond (prepare error: %+v)", err) - case <-sh.closing: - log.Warnf("scheduler closed while sending response (prepare error: %+v)", err) - } - return + select { + case sh.openWindows[wnd].done <- &window: + default: + log.Error("expected sh.openWindows[wnd].done to be buffered") } + } - err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error { - w.preparing.free(w.info.Resources, needRes) - sh.workersLk.Unlock() - defer sh.workersLk.Lock() // we MUST return locked from this function - - select { - case sh.workerFree <- wid: - case <-sh.closing: - } + // Rewrite sh.openWindows array, removing scheduled windows + newOpenWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)-len(scheduledWindows)) + for wnd, window := range sh.openWindows { + if _, scheduled := scheduledWindows[wnd]; !scheduled { + // keep unscheduled windows open + continue + } - err = req.work(req.ctx, w.w) + newOpenWindows = append(newOpenWindows, window) + } - select { - case req.ret <- workerResponse{err: err}: - case <-req.ctx.Done(): - log.Warnf("request got cancelled before we could respond") - case <-sh.closing: - log.Warnf("scheduler closed while sending response") - } + sh.openWindows = newOpenWindows +} - return nil - }) +func (sh *scheduler) runWorker(wid WorkerID) { + w := sh.workers[wid] - sh.workersLk.Unlock() + go func() { + for { - // This error should always be nil, since nothing is setting it, but just to be safe: - if err != nil { - log.Errorf("error executing worker (withResources): %+v", err) } }() - - return nil } func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { - for !canHandleRequest(r, id, wr, a) { + for !a.canHandleRequest(r, id, wr) { if a.cond == nil { a.cond = sync.NewCond(locker) } @@ -396,16 +404,16 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { a.memUsedMax -= r.MaxMemory } -func canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources, active *activeResources) bool { +func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool { // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) - minNeedMem := res.MemReserved + active.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory + minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory if minNeedMem > res.MemPhysical { log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) return false } - maxNeedMem := res.MemReserved + active.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory + maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory if maxNeedMem > res.MemSwap+res.MemPhysical { log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) @@ -413,19 +421,19 @@ func canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResou } if needRes.MultiThread() { - if active.cpuUse > 0 { - log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, active.cpuUse, res.CPUs) + if a.cpuUse > 0 { + log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs) return false } } else { - if active.cpuUse+uint64(needRes.Threads) > res.CPUs { - log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, active.cpuUse, res.CPUs) + if a.cpuUse+uint64(needRes.Threads) > res.CPUs { + log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs) return false } } if len(res.GPUs) > 0 && needRes.CanGPU { - if active.gpuUsed { + if a.gpuUsed { log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) return false } @@ -453,7 +461,7 @@ func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { return max } -func (sh *scheduler) schedNewWorker(w *workerHandle) { +func (sh *scheduler) newWorker(w *workerHandle) { sh.workersLk.Lock() id := sh.nextWorker @@ -468,10 +476,10 @@ func (sh *scheduler) schedNewWorker(w *workerHandle) { return } - sh.onWorkerFreed(id) + sh.runWorker(id) } -func (sh *scheduler) schedDropWorker(wid WorkerID) { +func (sh *scheduler) dropWorker(wid WorkerID) { sh.workersLk.Lock() defer sh.workersLk.Unlock() From da96f06202c7dd6d4482396fcf9f2e0e22287a16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 9 Jul 2020 13:49:01 +0200 Subject: [PATCH 02/14] sched: implement runWorker --- sched.go | 206 ++++++++++++++++++++++++++------------------- sched_resources.go | 110 ++++++++++++++++++++++++ 2 files changed, 230 insertions(+), 86 deletions(-) create mode 100644 sched_resources.go diff --git a/sched.go b/sched.go index af6981b..966bf2c 100644 --- a/sched.go +++ b/sched.go @@ -349,116 +349,150 @@ func (sh *scheduler) trySched() { } func (sh *scheduler) runWorker(wid WorkerID) { - w := sh.workers[wid] - go func() { - for { + worker := sh.workers[wid] + scheduledWindows := make(chan *schedWindow, SchedWindows) + taskDone := make(chan struct{}, 1) + windowsRequested := 0 - } - }() -} + var activeWindows []*schedWindow -func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { - for !a.canHandleRequest(r, id, wr) { - if a.cond == nil { - a.cond = sync.NewCond(locker) - } - a.cond.Wait() - } + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() - a.add(wr, r) + workerClosing, err := worker.w.Closing(ctx) + if err != nil { + return + } - err := cb() + defer func() { + log.Warnw("Worker closing", "workerid", wid) - a.free(wr, r) - if a.cond != nil { - a.cond.Broadcast() - } + // TODO: close / return all queued tasks + }() - return err -} + for { + // ask for more windows if we need them + for ; windowsRequested < SchedWindows; windowsRequested++ { + select { + case sh.windowRequests <- &schedWindowRequest{ + worker: wid, + done: scheduledWindows, + }: + case <-sh.closing: + return + case <-workerClosing: + return + } + } -func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { - a.gpuUsed = r.CanGPU - if r.MultiThread() { - a.cpuUse += wr.CPUs - } else { - a.cpuUse += uint64(r.Threads) - } + select { + case w := <-scheduledWindows: + activeWindows = append(activeWindows, w) + case <-taskDone: + case <-sh.closing: + return + case <-workerClosing: + return + } - a.memUsedMin += r.MinMemory - a.memUsedMax += r.MaxMemory + assignLoop: + // process windows in order + for len(activeWindows) > 0 { + // process tasks within a window in order + for len(activeWindows[0].todo) > 0 { + todo := activeWindows[0].todo[0] + needRes := ResourceTable[todo.taskType][sh.spt] + + sh.workersLk.Lock() + ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources) + if !ok { + sh.workersLk.Unlock() + break assignLoop + } + + err := sh.assignWorker(taskDone, wid, worker, todo) + sh.workersLk.Unlock() + + if err != nil { + log.Error("assignWorker error: %+v", err) + go todo.respond(xerrors.Errorf("assignWorker error: %w", err)) + } + + activeWindows[0].todo = activeWindows[0].todo[1:] + } + + copy(activeWindows, activeWindows[1:]) + activeWindows[len(activeWindows)-1] = nil + activeWindows = activeWindows[:len(activeWindows)-1] + + windowsRequested-- + } + } + }() } -func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { - if r.CanGPU { - a.gpuUsed = false - } - if r.MultiThread() { - a.cpuUse -= wr.CPUs - } else { - a.cpuUse -= uint64(r.Threads) - } +func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error { + needRes := ResourceTable[req.taskType][sh.spt] - a.memUsedMin -= r.MinMemory - a.memUsedMax -= r.MaxMemory -} + w.preparing.add(w.info.Resources, needRes) -func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool { - - // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) - minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory - if minNeedMem > res.MemPhysical { - log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) - return false - } + go func() { + err := req.prepare(req.ctx, w.w) + sh.workersLk.Lock() - maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory + if err != nil { + w.preparing.free(w.info.Resources, needRes) + sh.workersLk.Unlock() - if maxNeedMem > res.MemSwap+res.MemPhysical { - log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) - return false - } + select { + case taskDone <- struct{}{}: + case <-sh.closing: + log.Warnf("scheduler closed while sending response (prepare error: %+v)", err) + } - if needRes.MultiThread() { - if a.cpuUse > 0 { - log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs) - return false - } - } else { - if a.cpuUse+uint64(needRes.Threads) > res.CPUs { - log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs) - return false + select { + case req.ret <- workerResponse{err: err}: + case <-req.ctx.Done(): + log.Warnf("request got cancelled before we could respond (prepare error: %+v)", err) + case <-sh.closing: + log.Warnf("scheduler closed while sending response (prepare error: %+v)", err) + } + return } - } - if len(res.GPUs) > 0 && needRes.CanGPU { - if a.gpuUsed { - log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) - return false - } - } + err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error { + w.preparing.free(w.info.Resources, needRes) + sh.workersLk.Unlock() + defer sh.workersLk.Lock() // we MUST return locked from this function - return true -} + select { + case taskDone <- struct{}{}: + case <-sh.closing: + } -func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { - var max float64 + err = req.work(req.ctx, w.w) - cpu := float64(a.cpuUse) / float64(wr.CPUs) - max = cpu + select { + case req.ret <- workerResponse{err: err}: + case <-req.ctx.Done(): + log.Warnf("request got cancelled before we could respond") + case <-sh.closing: + log.Warnf("scheduler closed while sending response") + } - memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical) - if memMin > max { - max = memMin - } + return nil + }) - memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap) - if memMax > max { - max = memMax - } + sh.workersLk.Unlock() - return max + // This error should always be nil, since nothing is setting it, but just to be safe: + if err != nil { + log.Errorf("error executing worker (withResources): %+v", err) + } + }() + + return nil } func (sh *scheduler) newWorker(w *workerHandle) { diff --git a/sched_resources.go b/sched_resources.go new file mode 100644 index 0000000..0ba9d1f --- /dev/null +++ b/sched_resources.go @@ -0,0 +1,110 @@ +package sectorstorage + +import ( + "sync" + + "github.com/filecoin-project/sector-storage/storiface" +) + +func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error { + for !a.canHandleRequest(r, id, wr) { + if a.cond == nil { + a.cond = sync.NewCond(locker) + } + a.cond.Wait() + } + + a.add(wr, r) + + err := cb() + + a.free(wr, r) + if a.cond != nil { + a.cond.Broadcast() + } + + return err +} + +func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { + a.gpuUsed = r.CanGPU + if r.MultiThread() { + a.cpuUse += wr.CPUs + } else { + a.cpuUse += uint64(r.Threads) + } + + a.memUsedMin += r.MinMemory + a.memUsedMax += r.MaxMemory +} + +func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { + if r.CanGPU { + a.gpuUsed = false + } + if r.MultiThread() { + a.cpuUse -= wr.CPUs + } else { + a.cpuUse -= uint64(r.Threads) + } + + a.memUsedMin -= r.MinMemory + a.memUsedMax -= r.MaxMemory +} + +func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool { + + // TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running) + minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory + if minNeedMem > res.MemPhysical { + log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib) + return false + } + + maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory + + if maxNeedMem > res.MemSwap+res.MemPhysical { + log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib) + return false + } + + if needRes.MultiThread() { + if a.cpuUse > 0 { + log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs) + return false + } + } else { + if a.cpuUse+uint64(needRes.Threads) > res.CPUs { + log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs) + return false + } + } + + if len(res.GPUs) > 0 && needRes.CanGPU { + if a.gpuUsed { + log.Debugf("sched: not scheduling on worker %d; GPU in use", wid) + return false + } + } + + return true +} + +func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { + var max float64 + + cpu := float64(a.cpuUse) / float64(wr.CPUs) + max = cpu + + memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical) + if memMin > max { + max = memMin + } + + memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap) + if memMax > max { + max = memMax + } + + return max +} From 903731adaf924a89d3b0ae41fd64a0fcf030fee4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 9 Jul 2020 14:40:53 +0200 Subject: [PATCH 03/14] sched: Fix tests --- manager_test.go | 4 ++++ sched.go | 17 +++++++++++++---- stores/index.go | 4 ++-- stores/local.go | 1 + 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/manager_test.go b/manager_test.go index ae318b4..19d9e38 100644 --- a/manager_test.go +++ b/manager_test.go @@ -22,6 +22,10 @@ import ( "github.com/filecoin-project/sector-storage/stores" ) +func init() { + logging.SetAllLoggers(logging.LevelDebug) +} + type testStorage stores.StorageConfig func newTestStorage(t *testing.T) *testStorage { diff --git a/sched.go b/sched.go index 966bf2c..d1ec338 100644 --- a/sched.go +++ b/sched.go @@ -88,7 +88,7 @@ type schedWindowRequest struct { type schedWindow struct { worker WorkerID - allocated *activeResources + allocated activeResources todo []*workerRequest } @@ -132,10 +132,12 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler { watchClosing: make(chan WorkerID), workerClosing: make(chan WorkerID), - schedule: make(chan *workerRequest), - closing: make(chan struct{}), + schedule: make(chan *workerRequest), + windowRequests: make(chan *schedWindowRequest), schedQueue: &requestQueue{}, + + closing: make(chan struct{}), } } @@ -295,7 +297,7 @@ func (sh *scheduler) trySched() { wr := sh.workers[wid].info.Resources // TODO: allow bigger windows - if windows[wnd].allocated.canHandleRequest(needRes, wid, wr) { + if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) { continue } @@ -305,6 +307,11 @@ func (sh *scheduler) trySched() { break } + if selectedWindow < 0 { + // all windows full + continue + } + windows[selectedWindow].todo = append(windows[selectedWindow].todo, task) heap.Remove(sh.schedQueue, sqi) @@ -327,6 +334,7 @@ func (sh *scheduler) trySched() { scheduledWindows[wnd] = struct{}{} + window := window // copy select { case sh.openWindows[wnd].done <- &window: default: @@ -390,6 +398,7 @@ func (sh *scheduler) runWorker(wid WorkerID) { case w := <-scheduledWindows: activeWindows = append(activeWindows, w) case <-taskDone: + log.Debugw("task done", "workerid", wid) case <-sh.closing: return case <-workerClosing: diff --git a/stores/index.go b/stores/index.go index 049e2dc..fda9731 100644 --- a/stores/index.go +++ b/stores/index.go @@ -384,8 +384,8 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s } sort.Slice(candidates, func(i, j int) bool { - iw := big.Mul(big.NewInt(int64(candidates[i].fsi.Available)), big.NewInt(int64(candidates[i].info.Weight))) - jw := big.Mul(big.NewInt(int64(candidates[j].fsi.Available)), big.NewInt(int64(candidates[j].info.Weight))) + iw := big.Mul(big.NewInt(candidates[i].fsi.Available), big.NewInt(int64(candidates[i].info.Weight))) + jw := big.Mul(big.NewInt(candidates[j].fsi.Available), big.NewInt(int64(candidates[j].info.Weight))) return iw.GreaterThan(jw) }) diff --git a/stores/local.go b/stores/local.go index 26b7ccb..92b7773 100644 --- a/stores/local.go +++ b/stores/local.go @@ -13,6 +13,7 @@ import ( "golang.org/x/xerrors" + "github.com/filecoin-project/sector-storage/fsutil" "github.com/filecoin-project/specs-actors/actors/abi" ) From 45c1b268f1294088d936e0a639511fc2b9cdc120 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 9 Jul 2020 15:09:38 +0200 Subject: [PATCH 04/14] sched: Remove unused worker field --- sched.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sched.go b/sched.go index d1ec338..b038eff 100644 --- a/sched.go +++ b/sched.go @@ -87,7 +87,6 @@ type schedWindowRequest struct { } type schedWindow struct { - worker WorkerID allocated activeResources todo []*workerRequest } From 5c5fe09990830f4619fefc414f14fe219b068f3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 9 Jul 2020 15:18:45 +0200 Subject: [PATCH 05/14] post-rebase fixes --- stores/index.go | 4 ++-- stores/local.go | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/stores/index.go b/stores/index.go index fda9731..049e2dc 100644 --- a/stores/index.go +++ b/stores/index.go @@ -384,8 +384,8 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s } sort.Slice(candidates, func(i, j int) bool { - iw := big.Mul(big.NewInt(candidates[i].fsi.Available), big.NewInt(int64(candidates[i].info.Weight))) - jw := big.Mul(big.NewInt(candidates[j].fsi.Available), big.NewInt(int64(candidates[j].info.Weight))) + iw := big.Mul(big.NewInt(int64(candidates[i].fsi.Available)), big.NewInt(int64(candidates[i].info.Weight))) + jw := big.Mul(big.NewInt(int64(candidates[j].fsi.Available)), big.NewInt(int64(candidates[j].info.Weight))) return iw.GreaterThan(jw) }) diff --git a/stores/local.go b/stores/local.go index 92b7773..26b7ccb 100644 --- a/stores/local.go +++ b/stores/local.go @@ -13,7 +13,6 @@ import ( "golang.org/x/xerrors" - "github.com/filecoin-project/sector-storage/fsutil" "github.com/filecoin-project/specs-actors/actors/abi" ) From 7f115954fd7b977f59564cdbff7e2a61107d6de4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 9 Jul 2020 19:17:15 +0200 Subject: [PATCH 06/14] sched: More fixes --- sched.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sched.go b/sched.go index b038eff..d926e75 100644 --- a/sched.go +++ b/sched.go @@ -260,7 +260,7 @@ func (sh *scheduler) trySched() { rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) { acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i] }) - sort.SliceStable(acceptableWindows, func(i, j int) bool { + sort.SliceStable(acceptableWindows[sqi], func(i, j int) bool { wii := sh.openWindows[acceptableWindows[sqi][i]].worker wji := sh.openWindows[acceptableWindows[sqi][j]].worker @@ -344,7 +344,7 @@ func (sh *scheduler) trySched() { // Rewrite sh.openWindows array, removing scheduled windows newOpenWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)-len(scheduledWindows)) for wnd, window := range sh.openWindows { - if _, scheduled := scheduledWindows[wnd]; !scheduled { + if _, scheduled := scheduledWindows[wnd]; scheduled { // keep unscheduled windows open continue } From 045e5977875f4a7ffb571b42401543b9d78bac80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 11 Jul 2020 01:21:48 +0200 Subject: [PATCH 07/14] remove open windows when dropping workers --- sched.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sched.go b/sched.go index d926e75..241440b 100644 --- a/sched.go +++ b/sched.go @@ -528,6 +528,16 @@ func (sh *scheduler) dropWorker(wid WorkerID) { w := sh.workers[wid] delete(sh.workers, wid) + newWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)) + for _, window := range sh.openWindows { + if window.worker != wid { + newWindows = append(newWindows, window) + } + } + sh.openWindows = newWindows + + // TODO: sync close worker goroutine + go func() { if err := w.w.Close(); err != nil { log.Warnf("closing worker %d: %+v", err) From 0a6c939a7390e632f486e7e05738611dcd9a0dff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 16 Jul 2020 23:40:54 +0200 Subject: [PATCH 08/14] Drop unused SectorInfo fields --- stores/index.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/stores/index.go b/stores/index.go index c85dc12..acad2ab 100644 --- a/stores/index.go +++ b/stores/index.go @@ -29,9 +29,6 @@ type StorageInfo struct { CanSeal bool CanStore bool - - LastHeartbeat time.Time - HeartbeatErr error } type HealthReport struct { From be6b88f4064ac8df22e3efcfce6424008fa9dc10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 16 Jul 2020 23:41:04 +0200 Subject: [PATCH 09/14] Some sched tests --- manager.go | 15 +- sched_test.go | 408 ++++++++++++++++++++++++++++++++++++++++++++++ selector_alloc.go | 4 +- 3 files changed, 414 insertions(+), 13 deletions(-) diff --git a/manager.go b/manager.go index 0c18645..0cd081d 100644 --- a/manager.go +++ b/manager.go @@ -208,7 +208,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect var selector WorkerSelector if len(best) == 0 { // new - selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing) + selector = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing) } else { // append to existing selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) } @@ -269,7 +269,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie var selector WorkerSelector var err error if len(existingPieces) == 0 { // new - selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing) + selector = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing) } else { // use existing selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) } @@ -300,10 +300,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke // TODO: also consider where the unsealed data sits - selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing) - if err != nil { - return nil, xerrors.Errorf("creating path selector: %w", err) - } + selector := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing) err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { p, err := w.SealPreCommit1(ctx, sector, ticket, pieces) @@ -417,11 +414,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU return err } - fetchSel, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage) - if err != nil { - return xerrors.Errorf("creating fetchSel: %w", err) - } - + fetchSel := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage) moveUnsealed := unsealed { if len(keepUnsealed) == 0 { diff --git a/sched_test.go b/sched_test.go index d0d0e7c..e810b6a 100644 --- a/sched_test.go +++ b/sched_test.go @@ -2,9 +2,21 @@ package sectorstorage import ( "context" + "io" + "sync" "testing" + "time" + "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" + + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/sector-storage/fsutil" + "github.com/filecoin-project/sector-storage/sealtasks" + "github.com/filecoin-project/sector-storage/stores" + "github.com/filecoin-project/sector-storage/storiface" + "github.com/filecoin-project/specs-storage/storage" ) func TestWithPriority(t *testing.T) { @@ -16,3 +28,399 @@ func TestWithPriority(t *testing.T) { require.Equal(t, 2222, getPriority(ctx)) } + +type schedTestWorker struct { + name string + taskTypes map[sealtasks.TaskType]struct{} + paths []stores.StoragePath + + closed bool + closing chan struct{} +} + +func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) { + panic("implement me") +} + +func (s *schedTestWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) { + panic("implement me") +} + +func (s *schedTestWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) { + panic("implement me") +} + +func (s *schedTestWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) { + panic("implement me") +} + +func (s *schedTestWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { + panic("implement me") +} + +func (s *schedTestWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error { + panic("implement me") +} + +func (s *schedTestWorker) Remove(ctx context.Context, sector abi.SectorID) error { + panic("implement me") +} + +func (s *schedTestWorker) NewSector(ctx context.Context, sector abi.SectorID) error { + panic("implement me") +} + +func (s *schedTestWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { + panic("implement me") +} + +func (s *schedTestWorker) MoveStorage(ctx context.Context, sector abi.SectorID) error { + panic("implement me") +} + +func (s *schedTestWorker) Fetch(ctx context.Context, id abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error { + panic("implement me") +} + +func (s *schedTestWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error { + panic("implement me") +} + +func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { + panic("implement me") +} + +func (s *schedTestWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) { + return s.taskTypes, nil +} + +func (s *schedTestWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) { + return s.paths, nil +} + +func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) { + return storiface.WorkerInfo{ + Hostname: s.name, + Resources: storiface.WorkerResources{ + MemPhysical: 128 << 30, + MemSwap: 200 << 30, + MemReserved: 2 << 30, + CPUs: 32, + GPUs: []string{"a GPU"}, + }, + }, nil +} + +func (s *schedTestWorker) Closing(ctx context.Context) (<-chan struct{}, error) { + return s.closing, nil +} + +func (s *schedTestWorker) Close() error { + if !s.closed { + s.closed = true + close(s.closing) + } + return nil +} + +var _ Worker = &schedTestWorker{} + +func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}) { + w := &schedTestWorker{ + name: name, + taskTypes: taskTypes, + paths: []stores.StoragePath{{ID: "bb-8", Weight: 2, LocalPath: "food", CanSeal: true, CanStore: true}}, + + closing: make(chan struct{}), + } + + for _, path := range w.paths { + err := index.StorageAttach(context.TODO(), stores.StorageInfo{ + ID: path.ID, + URLs: nil, + Weight: path.Weight, + CanSeal: path.CanSeal, + CanStore: path.CanStore, + }, fsutil.FsStat{ + Capacity: 1 << 40, + Available: 1 << 40, + Reserved: 3, + }) + require.NoError(t, err) + } + + info, err := w.Info(context.TODO()) + require.NoError(t, err) + + sched.newWorkers <- &workerHandle{ + w: w, + info: info, + preparing: &activeResources{}, + active: &activeResources{}, + } +} + +func TestSchedStartStop(t *testing.T) { + spt := abi.RegisteredSealProof_StackedDrg32GiBV1 + sched := newScheduler(spt) + go sched.runSched() + + addTestWorker(t, sched, stores.NewIndex(), "fred", nil) + + sched.schedClose() +} + +func TestSched(t *testing.T) { + ctx := context.Background() + spt := abi.RegisteredSealProof_StackedDrg32GiBV1 + + sectorAte := abi.SectorID{ + Miner: 8, + Number: 8, + } + + type workerSpec struct { + name string + taskTypes map[sealtasks.TaskType]struct{} + } + + noopPrepare := func(ctx context.Context, w Worker) error { + return nil + } + + type runMeta struct { + done map[string]chan struct{} + + wg sync.WaitGroup + } + + type task func(*testing.T, *scheduler, *stores.Index, *runMeta) + + sched := func(taskName, expectWorker string, taskType sealtasks.TaskType) task { + return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { + done := make(chan struct{}) + rm.done[taskName] = done + + sel := newAllocSelector(ctx, index, stores.FTCache, stores.PathSealing) + + rm.wg.Add(1) + go func() { + defer rm.wg.Done() + + err := sched.Schedule(ctx, sectorAte, taskType, sel, noopPrepare, func(ctx context.Context, w Worker) error { + wi, err := w.Info(ctx) + require.NoError(t, err) + + require.Equal(t, expectWorker, wi.Hostname) + + log.Info("IN ", taskName) + + for { + _, ok := <-done + if !ok { + break + } + } + + log.Info("OUT ", taskName) + + return nil + }) + require.NoError(t, err) + }() + } + } + + taskStarted := func(name string) task { + return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { + rm.done[name] <- struct{}{} + } + } + + taskDone := func(name string) task { + return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { + rm.done[name] <- struct{}{} + close(rm.done[name]) + } + } + + taskNotScheduled := func(name string) task { + return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { + select { + case rm.done[name] <- struct{}{}: + t.Fatal("not expected") + case <-time.After(10 * time.Millisecond): // TODO: better synchronization thingy + } + } + } + + testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) { + return func(t *testing.T) { + index := stores.NewIndex() + + sched := newScheduler(spt) + go sched.runSched() + + for _, worker := range workers { + addTestWorker(t, sched, index, worker.name, worker.taskTypes) + } + + rm := runMeta{ + done: map[string]chan struct{}{}, + } + + for _, task := range tasks { + task(t, sched, index, &rm) + } + + log.Info("wait for async stuff") + rm.wg.Wait() + + sched.schedClose() + } + } + + multTask := func(tasks ...task) task { + return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) { + for _, tsk := range tasks { + tsk(t, s, index, meta) + } + } + } + + t.Run("one-pc1", testFunc([]workerSpec{ + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}}}, + }, []task{ + sched("pc1-1", "fred", sealtasks.TTPreCommit1), + taskDone("pc1-1"), + })) + + t.Run("pc1-2workers-1", testFunc([]workerSpec{ + {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2:{}}}, + {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}}}, + }, []task{ + sched("pc1-1", "fred1", sealtasks.TTPreCommit1), + taskDone("pc1-1"), + })) + + t.Run("pc1-2workers-2", testFunc([]workerSpec{ + {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}}}, + {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2:{}}}, + }, []task{ + sched("pc1-1", "fred1", sealtasks.TTPreCommit1), + taskDone("pc1-1"), + })) + + t.Run("pc1-block-pc2", testFunc([]workerSpec{ + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}, sealtasks.TTPreCommit2:{}}}, + }, []task{ + sched("pc1", "fred", sealtasks.TTPreCommit1), + taskStarted("pc1"), + + sched("pc2", "fred", sealtasks.TTPreCommit2), + taskNotScheduled("pc2"), + + taskDone("pc1"), + taskDone("pc2"), + })) + + t.Run("pc2-block-pc1", testFunc([]workerSpec{ + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}, sealtasks.TTPreCommit2:{}}}, + }, []task{ + sched("pc2", "fred", sealtasks.TTPreCommit2), + taskStarted("pc2"), + + sched("pc1", "fred", sealtasks.TTPreCommit1), + taskNotScheduled("pc1"), + + taskDone("pc2"), + taskDone("pc1"), + })) + + t.Run("pc1-batching", testFunc([]workerSpec{ + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}}}, + }, []task{ + sched("t1", "fred", sealtasks.TTPreCommit1), + taskStarted("t1"), + + sched("t2", "fred", sealtasks.TTPreCommit1), + taskStarted("t2"), + + // with worker settings, we can only run 2 parallel PC1s + + // start 2 more to fill fetch buffer + + sched("t3", "fred", sealtasks.TTPreCommit1), + taskNotScheduled("t3"), + + sched("t4", "fred", sealtasks.TTPreCommit1), + taskNotScheduled("t4"), + + taskDone("t1"), + taskDone("t2"), + + taskStarted("t3"), + taskStarted("t4"), + + taskDone("t3"), + taskDone("t4"), + })) + + twoPC1 := func(prefix string, schedAssert func(name string) task) task { + return multTask( + sched(prefix + "-a", "fred", sealtasks.TTPreCommit1), + schedAssert(prefix + "-a"), + + sched(prefix + "-b", "fred", sealtasks.TTPreCommit1), + schedAssert(prefix + "-b"), + ) + } + + twoPC1Done := func(prefix string) task { + return multTask( + taskDone(prefix + "-1"), + taskDone(prefix + "-b"), + ) + } + + t.Run("pc1-pc2-prio", testFunc([]workerSpec{ + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}, sealtasks.TTPreCommit2: {}}}, + }, []task{ + // fill exec/fetch buffers + twoPC1("w0", taskStarted), + twoPC1("w1", taskNotScheduled), + + // fill worker windows + twoPC1("w2", taskNotScheduled), + twoPC1("w3", taskNotScheduled), + + // windowed + + sched("t1", "fred", sealtasks.TTPreCommit1), + taskNotScheduled("t1"), + + sched("t2", "fred", sealtasks.TTPreCommit1), + taskNotScheduled("t2"), + + sched("t3", "fred", sealtasks.TTPreCommit2), + taskNotScheduled("t3"), + + twoPC1Done("w0"), + twoPC1Done("w1"), + twoPC1Done("w2"), + twoPC1Done("w3"), + + taskStarted("t1"), + taskNotScheduled("t2"), + taskNotScheduled("t3"), + + taskDone("t1"), + + taskStarted("t2"), + taskStarted("t3"), + + taskDone("t2"), + taskDone("t3"), + })) +} + diff --git a/selector_alloc.go b/selector_alloc.go index 874bf7b..53e1217 100644 --- a/selector_alloc.go +++ b/selector_alloc.go @@ -17,12 +17,12 @@ type allocSelector struct { ptype stores.PathType } -func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) (*allocSelector, error) { +func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) (*allocSelector) { return &allocSelector{ index: index, alloc: alloc, ptype: ptype, - }, nil + } } func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { From 2e557573f4864fdb431f205b59b4cb7574dc19db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 16 Jul 2020 23:41:15 +0200 Subject: [PATCH 10/14] gofmt --- sched_test.go | 59 +++++++++++++++++++++++------------------------ selector_alloc.go | 2 +- 2 files changed, 30 insertions(+), 31 deletions(-) diff --git a/sched_test.go b/sched_test.go index e810b6a..1c7b889 100644 --- a/sched_test.go +++ b/sched_test.go @@ -30,11 +30,11 @@ func TestWithPriority(t *testing.T) { } type schedTestWorker struct { - name string + name string taskTypes map[sealtasks.TaskType]struct{} - paths []stores.StoragePath + paths []stores.StoragePath - closed bool + closed bool closing chan struct{} } @@ -100,7 +100,7 @@ func (s *schedTestWorker) Paths(ctx context.Context) ([]stores.StoragePath, erro func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) { return storiface.WorkerInfo{ - Hostname: s.name, + Hostname: s.name, Resources: storiface.WorkerResources{ MemPhysical: 128 << 30, MemSwap: 200 << 30, @@ -127,20 +127,20 @@ var _ Worker = &schedTestWorker{} func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}) { w := &schedTestWorker{ - name: name, + name: name, taskTypes: taskTypes, - paths: []stores.StoragePath{{ID: "bb-8", Weight: 2, LocalPath: "food", CanSeal: true, CanStore: true}}, + paths: []stores.StoragePath{{ID: "bb-8", Weight: 2, LocalPath: "food", CanSeal: true, CanStore: true}}, closing: make(chan struct{}), } for _, path := range w.paths { err := index.StorageAttach(context.TODO(), stores.StorageInfo{ - ID: path.ID, - URLs: nil, - Weight: path.Weight, - CanSeal: path.CanSeal, - CanStore: path.CanStore, + ID: path.ID, + URLs: nil, + Weight: path.Weight, + CanSeal: path.CanSeal, + CanStore: path.CanStore, }, fsutil.FsStat{ Capacity: 1 << 40, Available: 1 << 40, @@ -180,7 +180,7 @@ func TestSched(t *testing.T) { } type workerSpec struct { - name string + name string taskTypes map[sealtasks.TaskType]struct{} } @@ -289,30 +289,30 @@ func TestSched(t *testing.T) { } t.Run("one-pc1", testFunc([]workerSpec{ - {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}}}, + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, }, []task{ sched("pc1-1", "fred", sealtasks.TTPreCommit1), taskDone("pc1-1"), })) t.Run("pc1-2workers-1", testFunc([]workerSpec{ - {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2:{}}}, - {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}}}, + {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}}, + {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, }, []task{ sched("pc1-1", "fred1", sealtasks.TTPreCommit1), taskDone("pc1-1"), })) t.Run("pc1-2workers-2", testFunc([]workerSpec{ - {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}}}, - {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2:{}}}, + {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, + {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}}, }, []task{ sched("pc1-1", "fred1", sealtasks.TTPreCommit1), taskDone("pc1-1"), })) t.Run("pc1-block-pc2", testFunc([]workerSpec{ - {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}, sealtasks.TTPreCommit2:{}}}, + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, }, []task{ sched("pc1", "fred", sealtasks.TTPreCommit1), taskStarted("pc1"), @@ -325,7 +325,7 @@ func TestSched(t *testing.T) { })) t.Run("pc2-block-pc1", testFunc([]workerSpec{ - {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}, sealtasks.TTPreCommit2:{}}}, + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, }, []task{ sched("pc2", "fred", sealtasks.TTPreCommit2), taskStarted("pc2"), @@ -338,7 +338,7 @@ func TestSched(t *testing.T) { })) t.Run("pc1-batching", testFunc([]workerSpec{ - {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}}}, + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, }, []task{ sched("t1", "fred", sealtasks.TTPreCommit1), taskStarted("t1"), @@ -368,23 +368,23 @@ func TestSched(t *testing.T) { twoPC1 := func(prefix string, schedAssert func(name string) task) task { return multTask( - sched(prefix + "-a", "fred", sealtasks.TTPreCommit1), - schedAssert(prefix + "-a"), + sched(prefix+"-a", "fred", sealtasks.TTPreCommit1), + schedAssert(prefix+"-a"), - sched(prefix + "-b", "fred", sealtasks.TTPreCommit1), - schedAssert(prefix + "-b"), - ) + sched(prefix+"-b", "fred", sealtasks.TTPreCommit1), + schedAssert(prefix+"-b"), + ) } twoPC1Done := func(prefix string) task { return multTask( - taskDone(prefix + "-1"), - taskDone(prefix + "-b"), - ) + taskDone(prefix+"-1"), + taskDone(prefix+"-b"), + ) } t.Run("pc1-pc2-prio", testFunc([]workerSpec{ - {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}, sealtasks.TTPreCommit2: {}}}, + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, }, []task{ // fill exec/fetch buffers twoPC1("w0", taskStarted), @@ -423,4 +423,3 @@ func TestSched(t *testing.T) { taskDone("t3"), })) } - diff --git a/selector_alloc.go b/selector_alloc.go index 53e1217..3522192 100644 --- a/selector_alloc.go +++ b/selector_alloc.go @@ -17,7 +17,7 @@ type allocSelector struct { ptype stores.PathType } -func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) (*allocSelector) { +func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) *allocSelector { return &allocSelector{ index: index, alloc: alloc, From cab0c74e08b35b3687c60f24a7a8e2724e5f4379 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 17 Jul 2020 01:26:55 +0200 Subject: [PATCH 11/14] more sched test debugging --- sched.go | 11 ++++ sched_test.go | 152 +++++++++++++++++++++++++++++++------------------- 2 files changed, 105 insertions(+), 58 deletions(-) diff --git a/sched.go b/sched.go index 241440b..d89dad3 100644 --- a/sched.go +++ b/sched.go @@ -69,6 +69,7 @@ type scheduler struct { openWindows []*schedWindowRequest closing chan struct{} + testSync chan struct{} // used for testing } type workerHandle struct { @@ -195,6 +196,9 @@ func (sh *scheduler) runSched() { heap.Push(sh.schedQueue, req) sh.trySched() + if sh.testSync != nil { + sh.testSync <- struct{}{} + } case req := <-sh.windowRequests: sh.openWindows = append(sh.openWindows, req) sh.trySched() @@ -226,6 +230,8 @@ func (sh *scheduler) trySched() { windows := make([]schedWindow, len(sh.openWindows)) acceptableWindows := make([][]int, sh.schedQueue.Len()) + log.Debugf("trySched %d queued; %d open windows", sh.schedQueue.Len(), len(windows)) + // Step 1 for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { task := (*sh.schedQueue)[sqi] @@ -295,11 +301,15 @@ func (sh *scheduler) trySched() { wid := sh.openWindows[wnd].worker wr := sh.workers[wid].info.Resources + log.Debugf("trySched try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) + // TODO: allow bigger windows if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) { continue } + log.Debugf("trySched ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd) + windows[wnd].allocated.add(wr, needRes) selectedWindow = wnd @@ -419,6 +429,7 @@ func (sh *scheduler) runWorker(wid WorkerID) { break assignLoop } + log.Debugf("assign worker sector %d", todo.sector.Number) err := sh.assignWorker(taskDone, wid, worker, todo) sh.workersLk.Unlock() diff --git a/sched_test.go b/sched_test.go index 1c7b889..26961a4 100644 --- a/sched_test.go +++ b/sched_test.go @@ -2,7 +2,9 @@ package sectorstorage import ( "context" + "fmt" "io" + "runtime" "sync" "testing" "time" @@ -171,13 +173,10 @@ func TestSchedStartStop(t *testing.T) { } func TestSched(t *testing.T) { - ctx := context.Background() - spt := abi.RegisteredSealProof_StackedDrg32GiBV1 + ctx, done := context.WithTimeout(context.Background(), 20 * time.Second) + defer done() - sectorAte := abi.SectorID{ - Miner: 8, - Number: 8, - } + spt := abi.RegisteredSealProof_StackedDrg32GiBV1 type workerSpec struct { name string @@ -196,7 +195,10 @@ func TestSched(t *testing.T) { type task func(*testing.T, *scheduler, *stores.Index, *runMeta) - sched := func(taskName, expectWorker string, taskType sealtasks.TaskType) task { + sched := func(taskName, expectWorker string, sid abi.SectorNumber, taskType sealtasks.TaskType) task { + _, _, l, _ := runtime.Caller(1) + _, _, l2, _ := runtime.Caller(2) + return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { done := make(chan struct{}) rm.done[taskName] = done @@ -207,7 +209,12 @@ func TestSched(t *testing.T) { go func() { defer rm.wg.Done() - err := sched.Schedule(ctx, sectorAte, taskType, sel, noopPrepare, func(ctx context.Context, w Worker) error { + sectorNum := abi.SectorID{ + Miner: 8, + Number: sid, + } + + err := sched.Schedule(ctx, sectorNum, taskType, sel, noopPrepare, func(ctx context.Context, w Worker) error { wi, err := w.Info(ctx) require.NoError(t, err) @@ -226,29 +233,45 @@ func TestSched(t *testing.T) { return nil }) - require.NoError(t, err) + require.NoError(t, err, fmt.Sprint(l, l2)) }() + + <-sched.testSync } } taskStarted := func(name string) task { + _, _, l, _ := runtime.Caller(1) + _, _, l2, _ := runtime.Caller(2) return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { - rm.done[name] <- struct{}{} + select { + case rm.done[name] <- struct{}{}: + case <-ctx.Done(): + t.Fatal("ctx error", ctx.Err(), l, l2) + } } } taskDone := func(name string) task { + _, _, l, _ := runtime.Caller(1) + _, _, l2, _ := runtime.Caller(2) return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { - rm.done[name] <- struct{}{} + select { + case rm.done[name] <- struct{}{}: + case <-ctx.Done(): + t.Fatal("ctx error", ctx.Err(), l, l2) + } close(rm.done[name]) } } taskNotScheduled := func(name string) task { + _, _, l, _ := runtime.Caller(1) + _, _, l2, _ := runtime.Caller(2) return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { select { case rm.done[name] <- struct{}{}: - t.Fatal("not expected") + t.Fatal("not expected", l, l2) case <-time.After(10 * time.Millisecond): // TODO: better synchronization thingy } } @@ -259,6 +282,8 @@ func TestSched(t *testing.T) { index := stores.NewIndex() sched := newScheduler(spt) + sched.testSync = make(chan struct{}) + go sched.runSched() for _, worker := range workers { @@ -291,7 +316,7 @@ func TestSched(t *testing.T) { t.Run("one-pc1", testFunc([]workerSpec{ {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, }, []task{ - sched("pc1-1", "fred", sealtasks.TTPreCommit1), + sched("pc1-1", "fred", 8, sealtasks.TTPreCommit1), taskDone("pc1-1"), })) @@ -299,7 +324,7 @@ func TestSched(t *testing.T) { {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}}, {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, }, []task{ - sched("pc1-1", "fred1", sealtasks.TTPreCommit1), + sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1), taskDone("pc1-1"), })) @@ -307,17 +332,17 @@ func TestSched(t *testing.T) { {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}}, }, []task{ - sched("pc1-1", "fred1", sealtasks.TTPreCommit1), + sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1), taskDone("pc1-1"), })) t.Run("pc1-block-pc2", testFunc([]workerSpec{ {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, }, []task{ - sched("pc1", "fred", sealtasks.TTPreCommit1), + sched("pc1", "fred", 8, sealtasks.TTPreCommit1), taskStarted("pc1"), - sched("pc2", "fred", sealtasks.TTPreCommit2), + sched("pc2", "fred", 8, sealtasks.TTPreCommit2), taskNotScheduled("pc2"), taskDone("pc1"), @@ -327,10 +352,10 @@ func TestSched(t *testing.T) { t.Run("pc2-block-pc1", testFunc([]workerSpec{ {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, }, []task{ - sched("pc2", "fred", sealtasks.TTPreCommit2), + sched("pc2", "fred", 8, sealtasks.TTPreCommit2), taskStarted("pc2"), - sched("pc1", "fred", sealtasks.TTPreCommit1), + sched("pc1", "fred", 8, sealtasks.TTPreCommit1), taskNotScheduled("pc1"), taskDone("pc2"), @@ -340,20 +365,20 @@ func TestSched(t *testing.T) { t.Run("pc1-batching", testFunc([]workerSpec{ {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}}, }, []task{ - sched("t1", "fred", sealtasks.TTPreCommit1), + sched("t1", "fred", 8, sealtasks.TTPreCommit1), taskStarted("t1"), - sched("t2", "fred", sealtasks.TTPreCommit1), + sched("t2", "fred", 8, sealtasks.TTPreCommit1), taskStarted("t2"), // with worker settings, we can only run 2 parallel PC1s // start 2 more to fill fetch buffer - sched("t3", "fred", sealtasks.TTPreCommit1), + sched("t3", "fred", 8, sealtasks.TTPreCommit1), taskNotScheduled("t3"), - sched("t4", "fred", sealtasks.TTPreCommit1), + sched("t4", "fred", 8, sealtasks.TTPreCommit1), taskNotScheduled("t4"), taskDone("t1"), @@ -366,60 +391,71 @@ func TestSched(t *testing.T) { taskDone("t4"), })) - twoPC1 := func(prefix string, schedAssert func(name string) task) task { + twoPC1 := func(prefix string, sid abi.SectorNumber, schedAssert func(name string) task) task { return multTask( - sched(prefix+"-a", "fred", sealtasks.TTPreCommit1), + sched(prefix+"-a", "fred", sid, sealtasks.TTPreCommit1), schedAssert(prefix+"-a"), - sched(prefix+"-b", "fred", sealtasks.TTPreCommit1), + sched(prefix+"-b", "fred", sid + 1, sealtasks.TTPreCommit1), schedAssert(prefix+"-b"), ) } - twoPC1Done := func(prefix string) task { + twoPC1Act := func(prefix string, schedAssert func(name string) task) task { return multTask( - taskDone(prefix+"-1"), - taskDone(prefix+"-b"), + schedAssert(prefix+"-a"), + schedAssert(prefix+"-b"), ) } - t.Run("pc1-pc2-prio", testFunc([]workerSpec{ - {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, - }, []task{ - // fill exec/fetch buffers - twoPC1("w0", taskStarted), - twoPC1("w1", taskNotScheduled), + for i := 0; i < 100; i++ { + t.Run("pc1-pc2-prio", testFunc([]workerSpec{ + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, + }, []task{ + // fill exec/fetch buffers + twoPC1("w0", 0, taskStarted), + twoPC1("w1", 2, taskNotScheduled), - // fill worker windows - twoPC1("w2", taskNotScheduled), - twoPC1("w3", taskNotScheduled), + // fill worker windows + twoPC1("w2", 4, taskNotScheduled), + //twoPC1("w3", taskNotScheduled), - // windowed + // windowed - sched("t1", "fred", sealtasks.TTPreCommit1), - taskNotScheduled("t1"), + sched("t1", "fred", 6, sealtasks.TTPreCommit1), + taskNotScheduled("t1"), - sched("t2", "fred", sealtasks.TTPreCommit1), - taskNotScheduled("t2"), + sched("t2", "fred", 7, sealtasks.TTPreCommit1), + taskNotScheduled("t2"), - sched("t3", "fred", sealtasks.TTPreCommit2), - taskNotScheduled("t3"), + sched("t3", "fred", 8, sealtasks.TTPreCommit2), + taskNotScheduled("t3"), - twoPC1Done("w0"), - twoPC1Done("w1"), - twoPC1Done("w2"), - twoPC1Done("w3"), + twoPC1Act("w0", taskDone), + twoPC1Act("w1", taskStarted), + twoPC1Act("w2", taskNotScheduled), + //twoPC1Act("w3", taskNotScheduled), - taskStarted("t1"), - taskNotScheduled("t2"), - taskNotScheduled("t3"), + twoPC1Act("w1", taskDone), + twoPC1Act("w2", taskStarted), + //twoPC1Act("w3", taskNotScheduled), - taskDone("t1"), + twoPC1Act("w2", taskDone), + //twoPC1Act("w3", taskStarted), - taskStarted("t2"), - taskStarted("t3"), + //twoPC1Act("w3", taskDone), - taskDone("t2"), - taskDone("t3"), - })) + taskStarted("t3"), + taskNotScheduled("t1"), + taskNotScheduled("t2"), + + taskDone("t3"), + + taskStarted("t1"), + taskStarted("t2"), + + taskDone("t1"), + taskDone("t2"), + })) + } } From bf315e63d77f7eaa5d87058d32ca507bef904ca4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 17 Jul 2020 01:32:49 +0200 Subject: [PATCH 12/14] sched: working tests --- sched.go | 2 +- sched_test.go | 34 +++++++++++----------------------- 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/sched.go b/sched.go index d89dad3..44e62f6 100644 --- a/sched.go +++ b/sched.go @@ -68,7 +68,7 @@ type scheduler struct { schedQueue *requestQueue openWindows []*schedWindowRequest - closing chan struct{} + closing chan struct{} testSync chan struct{} // used for testing } diff --git a/sched_test.go b/sched_test.go index 26961a4..e6bd8d2 100644 --- a/sched_test.go +++ b/sched_test.go @@ -173,7 +173,7 @@ func TestSchedStartStop(t *testing.T) { } func TestSched(t *testing.T) { - ctx, done := context.WithTimeout(context.Background(), 20 * time.Second) + ctx, done := context.WithTimeout(context.Background(), 20*time.Second) defer done() spt := abi.RegisteredSealProof_StackedDrg32GiBV1 @@ -183,7 +183,7 @@ func TestSched(t *testing.T) { taskTypes map[sealtasks.TaskType]struct{} } - noopPrepare := func(ctx context.Context, w Worker) error { + noopAction := func(ctx context.Context, w Worker) error { return nil } @@ -214,7 +214,7 @@ func TestSched(t *testing.T) { Number: sid, } - err := sched.Schedule(ctx, sectorNum, taskType, sel, noopPrepare, func(ctx context.Context, w Worker) error { + err := sched.Schedule(ctx, sectorNum, taskType, sel, func(ctx context.Context, w Worker) error { wi, err := w.Info(ctx) require.NoError(t, err) @@ -232,7 +232,7 @@ func TestSched(t *testing.T) { log.Info("OUT ", taskName) return nil - }) + }, noopAction) require.NoError(t, err, fmt.Sprint(l, l2)) }() @@ -396,7 +396,7 @@ func TestSched(t *testing.T) { sched(prefix+"-a", "fred", sid, sealtasks.TTPreCommit1), schedAssert(prefix+"-a"), - sched(prefix+"-b", "fred", sid + 1, sealtasks.TTPreCommit1), + sched(prefix+"-b", "fred", sid+1, sealtasks.TTPreCommit1), schedAssert(prefix+"-b"), ) } @@ -408,42 +408,30 @@ func TestSched(t *testing.T) { ) } - for i := 0; i < 100; i++ { + // run this one a bunch of times, it had a very annoying tendency to fail randomly + for i := 0; i < 40; i++ { t.Run("pc1-pc2-prio", testFunc([]workerSpec{ {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}}, }, []task{ - // fill exec/fetch buffers + // fill queues twoPC1("w0", 0, taskStarted), twoPC1("w1", 2, taskNotScheduled), - // fill worker windows - twoPC1("w2", 4, taskNotScheduled), - //twoPC1("w3", taskNotScheduled), - // windowed - sched("t1", "fred", 6, sealtasks.TTPreCommit1), + sched("t1", "fred", 8, sealtasks.TTPreCommit1), taskNotScheduled("t1"), - sched("t2", "fred", 7, sealtasks.TTPreCommit1), + sched("t2", "fred", 9, sealtasks.TTPreCommit1), taskNotScheduled("t2"), - sched("t3", "fred", 8, sealtasks.TTPreCommit2), + sched("t3", "fred", 10, sealtasks.TTPreCommit2), taskNotScheduled("t3"), twoPC1Act("w0", taskDone), twoPC1Act("w1", taskStarted), - twoPC1Act("w2", taskNotScheduled), - //twoPC1Act("w3", taskNotScheduled), twoPC1Act("w1", taskDone), - twoPC1Act("w2", taskStarted), - //twoPC1Act("w3", taskNotScheduled), - - twoPC1Act("w2", taskDone), - //twoPC1Act("w3", taskStarted), - - //twoPC1Act("w3", taskDone), taskStarted("t3"), taskNotScheduled("t1"), From 908d47305bc8aa7fb63725c3991dafe57e1da23d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 17 Jul 2020 01:46:59 +0200 Subject: [PATCH 13/14] fix race in runWorker --- sched.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sched.go b/sched.go index 44e62f6..caf67c6 100644 --- a/sched.go +++ b/sched.go @@ -367,7 +367,10 @@ func (sh *scheduler) trySched() { func (sh *scheduler) runWorker(wid WorkerID) { go func() { + sh.workersLk.Lock() worker := sh.workers[wid] + sh.workersLk.Unlock() + scheduledWindows := make(chan *schedWindow, SchedWindows) taskDone := make(chan struct{}, 1) windowsRequested := 0 From f1b38371866bec524ea8c5603b9e38d1a3391161 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 17 Jul 2020 12:59:12 +0200 Subject: [PATCH 14/14] fix worker setup/cleanup raciness --- manager.go | 4 +-- sched.go | 87 +++++++++++++++++++++++++++++++++++++++----------- sched_test.go | 7 ++-- sched_watch.go | 6 +++- 4 files changed, 80 insertions(+), 24 deletions(-) diff --git a/manager.go b/manager.go index 0cd081d..fc3be18 100644 --- a/manager.go +++ b/manager.go @@ -489,8 +489,8 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, erro return m.storage.FsStat(ctx, id) } -func (m *Manager) Close() error { - return m.sched.Close() +func (m *Manager) Close(ctx context.Context) error { + return m.sched.Close(ctx) } var _ SectorManager = &Manager{} diff --git a/sched.go b/sched.go index caf67c6..bec5ee0 100644 --- a/sched.go +++ b/sched.go @@ -3,6 +3,7 @@ package sectorstorage import ( "container/heap" "context" + "fmt" "math/rand" "sort" "sync" @@ -69,6 +70,7 @@ type scheduler struct { openWindows []*schedWindowRequest closing chan struct{} + closed chan struct{} testSync chan struct{} // used for testing } @@ -79,6 +81,11 @@ type workerHandle struct { preparing *activeResources active *activeResources + + // for sync manager goroutine closing + cleanupStarted bool + closedMgr chan struct{} + closingMgr chan struct{} } type schedWindowRequest struct { @@ -138,6 +145,7 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler { schedQueue: &requestQueue{}, closing: make(chan struct{}), + closed: make(chan struct{}), } } @@ -182,6 +190,8 @@ func (r *workerRequest) respond(err error) { } func (sh *scheduler) runSched() { + defer close(sh.closed) + go sh.runWorkerWatcher() for { @@ -366,11 +376,23 @@ func (sh *scheduler) trySched() { } func (sh *scheduler) runWorker(wid WorkerID) { + var ready sync.WaitGroup + ready.Add(1) + defer ready.Wait() + go func() { sh.workersLk.Lock() - worker := sh.workers[wid] + worker, found := sh.workers[wid] sh.workersLk.Unlock() + ready.Done() + + if !found { + panic(fmt.Sprintf("worker %d not found", wid)) + } + + defer close(worker.closedMgr) + scheduledWindows := make(chan *schedWindow, SchedWindows) taskDone := make(chan struct{}, 1) windowsRequested := 0 @@ -403,6 +425,8 @@ func (sh *scheduler) runWorker(wid WorkerID) { return case <-workerClosing: return + case <-worker.closingMgr: + return } } @@ -415,6 +439,8 @@ func (sh *scheduler) runWorker(wid WorkerID) { return case <-workerClosing: return + case <-worker.closingMgr: + return } assignLoop: @@ -518,6 +544,9 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke } func (sh *scheduler) newWorker(w *workerHandle) { + w.closedMgr = make(chan struct{}) + w.closingMgr = make(chan struct{}) + sh.workersLk.Lock() id := sh.nextWorker @@ -526,13 +555,13 @@ func (sh *scheduler) newWorker(w *workerHandle) { sh.workersLk.Unlock() + sh.runWorker(id) + select { case sh.watchClosing <- id: case <-sh.closing: return } - - sh.runWorker(id) } func (sh *scheduler) dropWorker(wid WorkerID) { @@ -540,37 +569,59 @@ func (sh *scheduler) dropWorker(wid WorkerID) { defer sh.workersLk.Unlock() w := sh.workers[wid] + + sh.workerCleanup(wid, w) + delete(sh.workers, wid) +} - newWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)) - for _, window := range sh.openWindows { - if window.worker != wid { - newWindows = append(newWindows, window) - } +func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) { + if !w.cleanupStarted { + close(w.closingMgr) + } + select { + case <-w.closedMgr: + case <-time.After(time.Second): + log.Errorf("timeout closing worker manager goroutine %d", wid) } - sh.openWindows = newWindows - // TODO: sync close worker goroutine + if !w.cleanupStarted { + w.cleanupStarted = true - go func() { - if err := w.w.Close(); err != nil { - log.Warnf("closing worker %d: %+v", err) + newWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)) + for _, window := range sh.openWindows { + if window.worker != wid { + newWindows = append(newWindows, window) + } } - }() + sh.openWindows = newWindows + + log.Debugf("dropWorker %d", wid) + + go func() { + if err := w.w.Close(); err != nil { + log.Warnf("closing worker %d: %+v", err) + } + }() + } } func (sh *scheduler) schedClose() { sh.workersLk.Lock() defer sh.workersLk.Unlock() + log.Debugf("closing scheduler") for i, w := range sh.workers { - if err := w.w.Close(); err != nil { - log.Errorf("closing worker %d: %+v", i, err) - } + sh.workerCleanup(i, w) } } -func (sh *scheduler) Close() error { +func (sh *scheduler) Close(ctx context.Context) error { close(sh.closing) + select { + case <-sh.closed: + case <-ctx.Done(): + return ctx.Err() + } return nil } diff --git a/sched_test.go b/sched_test.go index e6bd8d2..67a5eee 100644 --- a/sched_test.go +++ b/sched_test.go @@ -119,6 +119,7 @@ func (s *schedTestWorker) Closing(ctx context.Context) (<-chan struct{}, error) func (s *schedTestWorker) Close() error { if !s.closed { + log.Info("close schedTestWorker") s.closed = true close(s.closing) } @@ -169,11 +170,11 @@ func TestSchedStartStop(t *testing.T) { addTestWorker(t, sched, stores.NewIndex(), "fred", nil) - sched.schedClose() + require.NoError(t, sched.Close(context.TODO())) } func TestSched(t *testing.T) { - ctx, done := context.WithTimeout(context.Background(), 20*time.Second) + ctx, done := context.WithTimeout(context.Background(), 30*time.Second) defer done() spt := abi.RegisteredSealProof_StackedDrg32GiBV1 @@ -301,7 +302,7 @@ func TestSched(t *testing.T) { log.Info("wait for async stuff") rm.wg.Wait() - sched.schedClose() + require.NoError(t, sched.Close(context.TODO())) } } diff --git a/sched_watch.go b/sched_watch.go index 2144890..d93cf1a 100644 --- a/sched_watch.go +++ b/sched_watch.go @@ -74,7 +74,11 @@ func (sh *scheduler) runWorkerWatcher() { caseToWorker[toSet] = wid default: - wid := caseToWorker[n] + wid, found := caseToWorker[n] + if !found { + log.Errorf("worker ID not found for case %d", n) + continue + } delete(caseToWorker, n) cases[n] = reflect.SelectCase{