From 15d1003ef1e278ff6d97363d3f00f212f5e3177c Mon Sep 17 00:00:00 2001 From: liwei Date: Mon, 23 Oct 2023 17:20:12 +0800 Subject: [PATCH 1/5] temp --- pkg/download/downloader.go | 412 ++++++++++++++++++++++--------------- 1 file changed, 244 insertions(+), 168 deletions(-) diff --git a/pkg/download/downloader.go b/pkg/download/downloader.go index fffc58c15..cdbc3a7e3 100644 --- a/pkg/download/downloader.go +++ b/pkg/download/downloader.go @@ -2,6 +2,7 @@ package download import ( "errors" + "fmt" "github.com/GopeedLab/gopeed/internal/controller" "github.com/GopeedLab/gopeed/internal/fetcher" "github.com/GopeedLab/gopeed/internal/logger" @@ -56,11 +57,13 @@ type Downloader struct { fetcherCache map[string]fetcher.Fetcher storage Storage tasks []*Task + waitTasks []*Task listener Listener - lock *sync.Mutex - fetcherMapLock *sync.RWMutex - closed atomic.Bool + lock *sync.Mutex + fetcherMapLock *sync.RWMutex + checkDuplicateLock *sync.Mutex + closed atomic.Bool extensions []*Extension } @@ -72,14 +75,15 @@ func NewDownloader(cfg *DownloaderConfig) *Downloader { cfg.Init() d := &Downloader{ - cfg: cfg, - + cfg: cfg, fetcherBuilders: make(map[string]fetcher.FetcherBuilder), fetcherCache: make(map[string]fetcher.Fetcher), + waitTasks: make([]*Task, 0), + storage: cfg.Storage, - storage: cfg.Storage, - lock: &sync.Mutex{}, - fetcherMapLock: &sync.RWMutex{}, + lock: &sync.Mutex{}, + fetcherMapLock: &sync.RWMutex{}, + checkDuplicateLock: &sync.Mutex{}, extensions: make([]*Extension, 0), } @@ -248,6 +252,122 @@ func (d *Downloader) Resolve(req *base.Request) (rr *ResolveResult, err error) { return } +type dispatchAction int + +const ( + dispatchActionCreate dispatchAction = iota + dispatchActionPause + dispatchActionContinue + dispatchActionDelete +) + +func (d *Downloader) sync(fn func() error) error { + d.lock.Lock() + defer d.lock.Unlock() + + return fn() +} + +func (d *Downloader) dispatch(task *Task, action dispatchAction, cb func(task *Task)) { + switch action { + case dispatchActionCreate: + func() { + d.lock.Lock() + defer d.lock.Unlock() + + d.tasks = append(d.tasks, task) + remainRunningCount := d.remainRunningCount() + if remainRunningCount == 0 { + task.Status = base.DownloadStatusWait + d.waitTasks = append(d.waitTasks, task) + return + } + + fmt.Println("create task 111") + + go func() { + d.lock.Lock() + defer d.lock.Unlock() + + cb(task) + + fmt.Println("create task 222") + }() + }() + break + case dispatchActionPause: + go func() { + d.lock.Lock() + defer d.lock.Unlock() + + cb(task) + d.dequeue() + }() + break + case dispatchActionContinue: + go func() { + d.lock.Lock() + defer d.lock.Unlock() + + remainRunningCount := d.remainRunningCount() + if remainRunningCount == 0 { + for _, t := range d.tasks { + if t != task && t.Status == base.DownloadStatusRunning { + d.doPause(t) + t.Status = base.DownloadStatusWait + d.waitTasks = append(d.waitTasks, t) + } + } + } + cb(task) + }() + break + case dispatchActionDelete: + func() { + d.lock.Lock() + defer d.lock.Unlock() + + for i, t := range d.tasks { + if t.ID == task.ID { + d.tasks = append(d.tasks[:i], d.tasks[i+1:]...) + break + } + } + + go func() { + d.lock.Lock() + defer d.lock.Unlock() + + cb(task) + d.dequeue() + }() + }() + break + } +} + +func (d *Downloader) dequeue() { + remainRunningCount := d.remainRunningCount() + if remainRunningCount == 0 { + return + } + if len(d.waitTasks) > 0 { + wt := d.waitTasks[0] + d.waitTasks = d.waitTasks[1:] + d.doStart(wt) + } +} + +func (d *Downloader) remainRunningCount() int { + runningCount := 0 + for _, t := range d.tasks { + if t.Status == base.DownloadStatusRunning { + runningCount++ + } + } + return d.cfg.MaxRunning - runningCount +} + func (d *Downloader) CreateDirect(req *base.Request, opts *base.Options) (taskId string, err error) { var fetcher fetcher.Fetcher fetcher, err = d.buildFetcher(req.URL) @@ -281,10 +401,14 @@ func (d *Downloader) Pause(id string) (err error) { if task == nil { return ErrTaskNotFound } - if err = d.doPause(task, base.DownloadStatusPause); err != nil { - return err + if task.Status == base.DownloadStatusPause { + return } - return d.notifyRunning() + task.Status = base.DownloadStatusPause + d.dispatch(task, dispatchActionPause, func(task *Task) { + d.doPause(task) + }) + return } func (d *Downloader) Continue(id string) (err error) { @@ -292,24 +416,14 @@ func (d *Downloader) Continue(id string) (err error) { if task == nil { return ErrTaskNotFound } - d.tryRunning(func(remain int) { - if remain == 0 { - // no more task can be running, need to doPause a running task - for _, t := range d.tasks { - if t.Status == base.DownloadStatusRunning { - err = d.doPause(t, base.DownloadStatusWait) - break - } - } - } - if err != nil { - return - } - }) - if err != nil { + if task.Status == base.DownloadStatusRunning { return } - return d.doContinue(task) + task.Status = base.DownloadStatusRunning + d.dispatch(task, dispatchActionContinue, func(task *Task) { + d.doContinue(task) + }) + return } func (d *Downloader) PauseAll() (err error) { @@ -341,52 +455,48 @@ func (d *Downloader) ContinueAll() (err error) { } func (d *Downloader) Delete(id string, force bool) (err error) { - d.lock.Lock() - defer d.lock.Unlock() task := d.GetTask(id) if task == nil { return ErrTaskNotFound } - task.lock.Lock() - defer task.lock.Unlock() - if task.fetcher != nil { - err = task.fetcher.Close() - if err != nil { - return - } - } - for i, t := range d.tasks { - if t.ID == id { - d.tasks = append(d.tasks[:i], d.tasks[i+1:]...) - break - } - } - err = d.storage.Delete(bucketTask, id) - if err != nil { - return - } - err = d.storage.Delete(bucketSave, id) - if err != nil { - return - } - if force && task.Meta.Res != nil { - if task.Meta.Res.Name != "" { - if err = os.RemoveAll(task.Meta.FolderPath()); err != nil { - return - } - } else { - if err = util.SafeRemove(task.Meta.SingleFilepath()); err != nil { + d.dispatch(task, dispatchActionDelete, func(task *Task) { + d.doDelete(task, force) + }) + return +} + +func (d *Downloader) doDelete(task *Task, force bool) (err error) { + err = func() error { + if task.fetcher != nil { + if err := task.fetcher.Close(); err != nil { return err } } - } - d.emit(EventKeyDelete, task) - if task.Status == base.DownloadStatusRunning { - if err = d.doNotifyRunning(); err != nil { + if err := d.storage.Delete(bucketTask, task.ID); err != nil { return err } + if err := d.storage.Delete(bucketSave, task.ID); err != nil { + return err + } + if force && task.Meta.Res != nil { + if task.Meta.Res.Name != "" { + if err := os.RemoveAll(task.Meta.FolderPath()); err != nil { + return err + } + } else { + if err := util.SafeRemove(task.Meta.SingleFilepath()); err != nil { + return err + } + } + } + d.emit(EventKeyDelete, task) + task = nil + return nil + }() + if err != nil { + d.Logger.Error().Stack().Err(err).Msgf("delete task failed, task id: %s", task.ID) } - task = nil + return } @@ -499,9 +609,7 @@ func (d *Downloader) watch(task *Task) { d.emit(EventKeyDone, task) d.emit(EventKeyFinally, task, err) } - if task.Status == EventKeyDone { - d.notifyRunning() - } + d.dequeue() } func (d *Downloader) restoreFetcher(task *Task) error { @@ -570,150 +678,118 @@ func (d *Downloader) doCreate(fetcher fetcher.Fetcher, opts *base.Options) (task } taskId = task.ID - func() { - d.lock.Lock() - defer d.lock.Unlock() - d.tasks = append(d.tasks, task) - }() go d.watch(task) - go d.tryRunning(func(remain int) { - if remain > 0 { - if err = d.start(task); err != nil { - d.Logger.Error().Stack().Err(err).Msgf("start task failed, task id: %s", task.ID) - return - } - } + d.sync(func() error { + d.tasks = append(d.tasks, task) + d.doStart() }) return } -func (d *Downloader) tryRunning(cb func(remain int)) { - runningCount := 0 - for _, task := range d.tasks { - if task.Status == base.DownloadStatusRunning { - runningCount++ - } - } - cb(d.cfg.MaxRunning - runningCount) -} - -func (d *Downloader) notifyRunning() error { - d.lock.Lock() - defer d.lock.Unlock() - - return d.doNotifyRunning() -} - -func (d *Downloader) doNotifyRunning() error { - for _, task := range d.tasks { - if task.Status == base.DownloadStatusReady || task.Status == base.DownloadStatusWait { - return d.doContinue(task) - } - } - return nil -} - func (d *Downloader) doPauseAll() (err error) { for _, task := range d.tasks { - if err = d.doPause(task, base.DownloadStatusPause); err != nil { + if err = d.doPause(task); err != nil { return } } return } -func (d *Downloader) start(task *Task) error { - task.lock.Lock() - defer task.lock.Unlock() +func (d *Downloader) doStart(task *Task) (err error) { d.triggerOnStart(task) - isCreate := task.Status == base.DownloadStatusReady - task.Status = base.DownloadStatusRunning - if task.Meta.Res == nil { - err := task.fetcher.Resolve(task.Meta.Req) - if err != nil { - task.Status = base.DownloadStatusError - d.emit(EventKeyError, task, err) + err = func() error { + isCreate := task.Status == base.DownloadStatusReady + task.Status = base.DownloadStatusRunning + if task.Meta.Res == nil { + err := task.fetcher.Resolve(task.Meta.Req) + if err != nil { + task.Status = base.DownloadStatusError + d.emit(EventKeyError, task, err) + return err + } + isCreate = true + } + + if isCreate { + d.checkDuplicateLock.Lock() + defer d.checkDuplicateLock.Unlock() + // check if the download file is duplicated and rename it automatically. + if task.Meta.Res.Name != "" { + fullDirPath := task.Meta.FolderPath() + newName, err := util.CheckDuplicateAndRename(fullDirPath) + if err != nil { + return err + } + task.Meta.Opts.Name = newName + } else { + fullFilePath := task.Meta.SingleFilepath() + newName, err := util.CheckDuplicateAndRename(fullFilePath) + if err != nil { + return err + } + task.Meta.Opts.Name = newName + } + } + + task.Progress.Speed = 0 + task.timer.Start() + if err := d.storage.Put(bucketTask, task.ID, task.clone()); err != nil { + return err + } + if err := task.fetcher.Start(); err != nil { return err } - isCreate = true + d.emit(EventKeyStart, task) + return nil + }() + if err != nil { + d.Logger.Error().Stack().Err(err).Msgf("start task failed, task id: %s", task.ID) } - if isCreate { - d.lock.Lock() - defer d.lock.Unlock() - // check if the download file is duplicated and rename it automatically. - if task.Meta.Res.Name != "" { - fullDirPath := task.Meta.FolderPath() - newName, err := util.CheckDuplicateAndRename(fullDirPath) - if err != nil { + return +} + +func (d *Downloader) doPause(task *Task) (err error) { + err = func() error { + if task.Status != base.DownloadStatusDone { + task.Status = base.DownloadStatusPause + task.timer.Pause() + if err := task.fetcher.Pause(); err != nil { return err } - task.Meta.Opts.Name = newName - } else { - fullFilePath := task.Meta.SingleFilepath() - newName, err := util.CheckDuplicateAndRename(fullFilePath) - if err != nil { + if err := d.storage.Put(bucketTask, task.ID, task.clone()); err != nil { return err } - task.Meta.Opts.Name = newName + d.emit(EventKeyPause, task) } + return nil + }() + if err != nil { + d.Logger.Error().Stack().Err(err).Msgf("pause task failed, task id: %s", task.ID) + return } - if err := d.doStart(task); err != nil { - task.Status = base.DownloadStatusError - d.emit(EventKeyError, task, err) - return err - } - return nil -} -func (d *Downloader) doStart(task *Task) error { - task.Progress.Speed = 0 - task.timer.Start() - if err := d.storage.Put(bucketTask, task.ID, task.clone()); err != nil { - return err - } - if err := task.fetcher.Start(); err != nil { - return err - } - d.emit(EventKeyStart, task) - return nil -} - -func (d *Downloader) doPause(task *Task, status base.Status) (err error) { - task.lock.Lock() - defer task.lock.Unlock() - if task.Status != base.DownloadStatusDone { - task.Status = status - task.timer.Pause() - task.fetcher.Pause() - d.storage.Put(bucketTask, task.ID, task.clone()) - d.emit(EventKeyPause, task) - } return } func (d *Downloader) doContinue(task *Task) (err error) { err = func() error { - task.lock.Lock() - defer task.lock.Unlock() if task.Status != base.DownloadStatusRunning && task.Status != base.DownloadStatusDone { err := d.restoreFetcher(task) if err != nil { return err } } + if err := d.doStart(task); err != nil { + return err + } return nil }() if err != nil { + d.Logger.Error().Stack().Err(err).Msgf("continue task failed, task id: %s", task.ID) return } - go func() { - if err := d.start(task); err != nil { - d.Logger.Error().Stack().Err(err).Msgf("start task failed, task id: %s", task.ID) - return - } - }() return } From 18bc5b7af0e7de65ddec7970dd44d60c439be8a8 Mon Sep 17 00:00:00 2001 From: liwei Date: Tue, 24 Oct 2023 11:57:31 +0800 Subject: [PATCH 2/5] refactor: downloader task action dispatch --- internal/protocol/http/fetcher.go | 8 +- pkg/download/downloader.go | 315 ++++++++++++++---------------- pkg/download/extension.go | 7 +- pkg/download/storage.go | 3 + 4 files changed, 159 insertions(+), 174 deletions(-) diff --git a/internal/protocol/http/fetcher.go b/internal/protocol/http/fetcher.go index 03f28b4bd..55cd255df 100644 --- a/internal/protocol/http/fetcher.go +++ b/internal/protocol/http/fetcher.go @@ -314,6 +314,10 @@ func (f *Fetcher) fetchChunk(index int, ctx context.Context) (err error) { return nil }() if err != nil { + // If canceled, do not retry + if errors.Is(err, context.Canceled) { + return + } // retry request after 1 second chunk.retryTimes = chunk.retryTimes + 1 time.Sleep(time.Second) @@ -466,9 +470,7 @@ func (fb *FetcherBuilder) Restore() (v any, f func(meta *fetcher.FetcherMeta, v fetcher.meta = meta base.ParseReqExtra[fhttp.ReqExtra](fetcher.meta.Req) base.ParseOptsExtra[fhttp.OptsExtra](fetcher.meta.Opts) - if len(fd.Chunks) == 0 { - fetcher.chunks = fetcher.splitChunk() - } else { + if len(fd.Chunks) > 0 { fetcher.chunks = fd.Chunks } return fetcher diff --git a/pkg/download/downloader.go b/pkg/download/downloader.go index cdbc3a7e3..74170807d 100644 --- a/pkg/download/downloader.go +++ b/pkg/download/downloader.go @@ -2,7 +2,6 @@ package download import ( "errors" - "fmt" "github.com/GopeedLab/gopeed/internal/controller" "github.com/GopeedLab/gopeed/internal/fetcher" "github.com/GopeedLab/gopeed/internal/logger" @@ -10,7 +9,6 @@ import ( "github.com/GopeedLab/gopeed/pkg/util" gonanoid "github.com/matoous/go-nanoid/v2" "github.com/virtuald/go-paniclog" - "math" "os" "path/filepath" "sort" @@ -252,110 +250,21 @@ func (d *Downloader) Resolve(req *base.Request) (rr *ResolveResult, err error) { return } -type dispatchAction int - -const ( - dispatchActionCreate dispatchAction = iota - dispatchActionPause - dispatchActionContinue - dispatchActionDelete -) - -func (d *Downloader) sync(fn func() error) error { - d.lock.Lock() - defer d.lock.Unlock() - - return fn() -} - -func (d *Downloader) dispatch(task *Task, action dispatchAction, cb func(task *Task)) { - switch action { - case dispatchActionCreate: - func() { - d.lock.Lock() - defer d.lock.Unlock() - - d.tasks = append(d.tasks, task) - remainRunningCount := d.remainRunningCount() - if remainRunningCount == 0 { - task.Status = base.DownloadStatusWait - d.waitTasks = append(d.waitTasks, task) - return - } - - fmt.Println("create task 111") - - go func() { - d.lock.Lock() - defer d.lock.Unlock() - - cb(task) - - fmt.Println("create task 222") - }() - }() - break - case dispatchActionPause: - go func() { - d.lock.Lock() - defer d.lock.Unlock() - - cb(task) - d.dequeue() - }() - break - case dispatchActionContinue: - go func() { - d.lock.Lock() - defer d.lock.Unlock() - - remainRunningCount := d.remainRunningCount() - if remainRunningCount == 0 { - for _, t := range d.tasks { - if t != task && t.Status == base.DownloadStatusRunning { - d.doPause(t) - t.Status = base.DownloadStatusWait - d.waitTasks = append(d.waitTasks, t) - } - } - } - cb(task) - }() - break - case dispatchActionDelete: - func() { - d.lock.Lock() - defer d.lock.Unlock() - - for i, t := range d.tasks { - if t.ID == task.ID { - d.tasks = append(d.tasks[:i], d.tasks[i+1:]...) - break - } - } - - go func() { - d.lock.Lock() - defer d.lock.Unlock() - - cb(task) - d.dequeue() - }() - }() - break - } -} +func (d *Downloader) notifyRunning() { + go func() { + d.lock.Lock() + defer d.lock.Unlock() -func (d *Downloader) dequeue() { - remainRunningCount := d.remainRunningCount() - if remainRunningCount == 0 { - return - } - if len(d.waitTasks) > 0 { - wt := d.waitTasks[0] - d.waitTasks = d.waitTasks[1:] - d.doStart(wt) - } + remainRunningCount := d.remainRunningCount() + if remainRunningCount == 0 { + return + } + if len(d.waitTasks) > 0 { + wt := d.waitTasks[0] + d.waitTasks = d.waitTasks[1:] + d.doStart(wt) + } + }() } func (d *Downloader) remainRunningCount() int { @@ -401,13 +310,20 @@ func (d *Downloader) Pause(id string) (err error) { if task == nil { return ErrTaskNotFound } - if task.Status == base.DownloadStatusPause { - return - } - task.Status = base.DownloadStatusPause - d.dispatch(task, dispatchActionPause, func(task *Task) { - d.doPause(task) - }) + + func() { + task.lock.Lock() + defer task.lock.Unlock() + + if task.Status == base.DownloadStatusPause { + return + } + err = d.doPause(task) + if err != nil { + return + } + d.notifyRunning() + }() return } @@ -416,13 +332,36 @@ func (d *Downloader) Continue(id string) (err error) { if task == nil { return ErrTaskNotFound } - if task.Status == base.DownloadStatusRunning { - return - } - task.Status = base.DownloadStatusRunning - d.dispatch(task, dispatchActionContinue, func(task *Task) { - d.doContinue(task) - }) + + func() { + d.lock.Lock() + defer d.lock.Unlock() + + remainRunningCount := d.remainRunningCount() + if remainRunningCount == 0 { + for _, t := range d.tasks { + if t.Status == base.DownloadStatusRunning { + if err = d.doPause(t); err != nil { + return + } + t.Status = base.DownloadStatusWait + d.waitTasks = append(d.waitTasks, t) + break + } + } + } + }() + + go func() { + task.lock.Lock() + defer task.lock.Unlock() + + if task.Status == base.DownloadStatusRunning { + return + } + + d.doStart(task) + }() return } @@ -433,24 +372,33 @@ func (d *Downloader) PauseAll() (err error) { } func (d *Downloader) ContinueAll() (err error) { - d.lock.Lock() - defer d.lock.Unlock() - runningCount := 0 - continueTasks := make([]*Task, 0) - for _, task := range d.tasks { - if task.Status == base.DownloadStatusRunning { - runningCount++ - } else if task.Status != base.DownloadStatusDone { - continueTasks = append(continueTasks, task) + continuedTasks := make([]*Task, 0) + + func() { + d.lock.Lock() + defer d.lock.Unlock() + // calculate how many tasks can be continued, can't exceed maxRunning + remainCount := d.remainRunningCount() + for _, task := range d.tasks { + if task.Status != base.DownloadStatusRunning && task.Status != base.DownloadStatusDone { + if len(continuedTasks) < remainCount { + continuedTasks = append(continuedTasks, task) + } else { + task.Status = base.DownloadStatusWait + d.waitTasks = append(d.waitTasks, task) + } + } } - } - // calculate how many tasks can be continued, can't exceed maxRunning - continueCount := int(math.Min(float64(d.cfg.MaxRunning-runningCount), float64(len(continueTasks)))) - for i := 0; i < continueCount; i++ { - if err = d.doContinue(continueTasks[i]); err != nil { - return + }() + + go func() { + for _, task := range continuedTasks { + if err = d.doStart(task); err != nil { + d.Logger.Error().Stack().Err(err).Msgf("continue task failed: %s", task.ID) + } } - } + }() + return } @@ -459,9 +407,23 @@ func (d *Downloader) Delete(id string, force bool) (err error) { if task == nil { return ErrTaskNotFound } - d.dispatch(task, dispatchActionDelete, func(task *Task) { - d.doDelete(task, force) - }) + + func() { + d.lock.Lock() + defer d.lock.Unlock() + + for i, t := range d.tasks { + if t.ID == id { + d.tasks = append(d.tasks[:i], d.tasks[i+1:]...) + } + } + }() + + err = d.doDelete(task, force) + if err != nil { + return + } + d.notifyRunning() return } @@ -609,7 +571,7 @@ func (d *Downloader) watch(task *Task) { d.emit(EventKeyDone, task) d.emit(EventKeyFinally, task, err) } - d.dequeue() + d.notifyRunning() } func (d *Downloader) restoreFetcher(task *Task) error { @@ -637,6 +599,12 @@ func (d *Downloader) restoreFetcher(task *Task) error { task.fetcher = fb.Build() } d.setupFetcher(task.fetcher) + if task.fetcher.Meta().Req == nil { + task.fetcher.Meta().Req = task.Meta.Req + } + if task.fetcher.Meta().Res == nil { + task.fetcher.Meta().Res = task.Meta.Res + } go d.watch(task) } task.fetcher.Create(task.Meta.Opts) @@ -678,11 +646,23 @@ func (d *Downloader) doCreate(fetcher fetcher.Fetcher, opts *base.Options) (task } taskId = task.ID - go d.watch(task) - d.sync(func() error { + func() { + d.lock.Lock() + defer d.lock.Unlock() + d.tasks = append(d.tasks, task) - d.doStart() - }) + + remainRunningCount := d.remainRunningCount() + if remainRunningCount == 0 { + task.Status = base.DownloadStatusWait + d.waitTasks = append(d.waitTasks, task) + return + } + + go d.doStart(task) + }() + + go d.watch(task) return } @@ -696,11 +676,24 @@ func (d *Downloader) doPauseAll() (err error) { } func (d *Downloader) doStart(task *Task) (err error) { - d.triggerOnStart(task) - err = func() error { + if task.Status != base.DownloadStatusRunning && task.Status != base.DownloadStatusDone { + err := d.restoreFetcher(task) + if err != nil { + return err + } + } + + cloneTask := task.clone() isCreate := task.Status == base.DownloadStatusReady task.Status = base.DownloadStatusRunning + + req := d.triggerOnStart(cloneTask) + if req != nil { + task.Meta.Req = req + task.fetcher.Meta().Req = req + } + if task.Meta.Res == nil { err := task.fetcher.Resolve(task.Meta.Req) if err != nil { @@ -708,9 +701,15 @@ func (d *Downloader) doStart(task *Task) (err error) { d.emit(EventKeyError, task, err) return err } + task.Meta.Res = task.fetcher.Meta().Res isCreate = true } + // task may be paused before start + if task.Status != base.DownloadStatusRunning { + return nil + } + if isCreate { d.checkDuplicateLock.Lock() defer d.checkDuplicateLock.Unlock() @@ -755,8 +754,10 @@ func (d *Downloader) doPause(task *Task) (err error) { if task.Status != base.DownloadStatusDone { task.Status = base.DownloadStatusPause task.timer.Pause() - if err := task.fetcher.Pause(); err != nil { - return err + if task.fetcher != nil { + if err := task.fetcher.Pause(); err != nil { + return err + } } if err := d.storage.Put(bucketTask, task.ID, task.clone()); err != nil { return err @@ -773,26 +774,6 @@ func (d *Downloader) doPause(task *Task) (err error) { return } -func (d *Downloader) doContinue(task *Task) (err error) { - err = func() error { - if task.Status != base.DownloadStatusRunning && task.Status != base.DownloadStatusDone { - err := d.restoreFetcher(task) - if err != nil { - return err - } - } - if err := d.doStart(task); err != nil { - return err - } - return nil - }() - if err != nil { - d.Logger.Error().Stack().Err(err).Msgf("continue task failed, task id: %s", task.ID) - return - } - return -} - // redirect stderr to log file, when panic happened log it func logPanic(logDir string) { if err := util.CreateDirIfNotExist(logDir); err != nil { diff --git a/pkg/download/extension.go b/pkg/download/extension.go index 57637dad9..4a5a5023f 100644 --- a/pkg/download/extension.go +++ b/pkg/download/extension.go @@ -251,12 +251,12 @@ func (d *Downloader) triggerOnResolve(req *base.Request) (res *base.Resource) { return } -func (d *Downloader) triggerOnStart(task *Task) { +func (d *Downloader) triggerOnStart(task *Task) (req *base.Request) { doTrigger(d, EventOnStart, task.Meta.Req, &OnStartContext{ - Task: task.clone(), + Task: task, }, func(ext *Extension, gopeed *Instance, ctx *OnStartContext) { // Validate request structure @@ -265,8 +265,7 @@ func (d *Downloader) triggerOnStart(task *Task) { gopeed.Logger.logger.Warn().Err(err).Msgf("[%s] request invalid", ext.buildIdentity()) return } - // Modify real task request - task.Meta.Req = ctx.Task.Meta.Req + req = ctx.Task.Meta.Req } }, ) diff --git a/pkg/download/storage.go b/pkg/download/storage.go index 3daff0f4f..e615cdb66 100644 --- a/pkg/download/storage.go +++ b/pkg/download/storage.go @@ -222,6 +222,9 @@ func (b *BoltStorage) Pop(bucket string, key string, v any) error { if err != nil { return err } + if len(data) == 0 { + return nil + } return json.Unmarshal(data, v) } From b77ea70efee5f874d0b855c550d413fc41120485 Mon Sep 17 00:00:00 2001 From: liwei Date: Tue, 24 Oct 2023 14:30:36 +0800 Subject: [PATCH 3/5] fix --- pkg/download/downloader.go | 62 +++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/pkg/download/downloader.go b/pkg/download/downloader.go index 74170807d..03db1e20d 100644 --- a/pkg/download/downloader.go +++ b/pkg/download/downloader.go @@ -352,7 +352,7 @@ func (d *Downloader) Continue(id string) (err error) { } }() - go func() { + func() { task.lock.Lock() defer task.lock.Unlock() @@ -360,7 +360,9 @@ func (d *Downloader) Continue(id string) (err error) { return } - d.doStart(task) + if err = d.doStart(task); err != nil { + return + } }() return } @@ -391,13 +393,18 @@ func (d *Downloader) ContinueAll() (err error) { } }() - go func() { - for _, task := range continuedTasks { - if err = d.doStart(task); err != nil { - d.Logger.Error().Stack().Err(err).Msgf("continue task failed: %s", task.ID) - } + for _, task := range continuedTasks { + tt := task + err = func() error { + tt.lock.Lock() + defer tt.lock.Unlock() + + return d.doStart(tt) + }() + if err != nil { + return } - }() + } return } @@ -659,7 +666,7 @@ func (d *Downloader) doCreate(fetcher fetcher.Fetcher, opts *base.Options) (task return } - go d.doStart(task) + d.doStart(task) }() go d.watch(task) @@ -676,17 +683,21 @@ func (d *Downloader) doPauseAll() (err error) { } func (d *Downloader) doStart(task *Task) (err error) { - err = func() error { - if task.Status != base.DownloadStatusRunning && task.Status != base.DownloadStatusDone { - err := d.restoreFetcher(task) - if err != nil { - return err - } + if task.Status != base.DownloadStatusRunning && task.Status != base.DownloadStatusDone { + err := d.restoreFetcher(task) + if err != nil { + d.Logger.Error().Stack().Err(err).Msgf("restore fetcher failed, task id: %s", task.ID) + return err } + } - cloneTask := task.clone() - isCreate := task.Status == base.DownloadStatusReady - task.Status = base.DownloadStatusRunning + cloneTask := task.clone() + isCreate := task.Status == base.DownloadStatusReady + task.Status = base.DownloadStatusRunning + + doStart := func() error { + task.lock.Lock() + defer task.lock.Unlock() req := d.triggerOnStart(cloneTask) if req != nil { @@ -702,12 +713,6 @@ func (d *Downloader) doStart(task *Task) (err error) { return err } task.Meta.Res = task.fetcher.Meta().Res - isCreate = true - } - - // task may be paused before start - if task.Status != base.DownloadStatusRunning { - return nil } if isCreate { @@ -741,10 +746,13 @@ func (d *Downloader) doStart(task *Task) (err error) { } d.emit(EventKeyStart, task) return nil - }() - if err != nil { - d.Logger.Error().Stack().Err(err).Msgf("start task failed, task id: %s", task.ID) } + go func() { + err := doStart() + if err != nil { + d.Logger.Error().Stack().Err(err).Msgf("start task failed, task id: %s", task.ID) + } + }() return } From 2a894e92e8d58060a2553a8a4fb2327fcc7a7fb1 Mon Sep 17 00:00:00 2001 From: liwei Date: Tue, 24 Oct 2023 15:50:17 +0800 Subject: [PATCH 4/5] fix --- pkg/download/downloader.go | 37 ++++++++++++++++++++++--------------- pkg/rest/server_test.go | 5 +---- 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/pkg/download/downloader.go b/pkg/download/downloader.go index 03db1e20d..cd59f95b4 100644 --- a/pkg/download/downloader.go +++ b/pkg/download/downloader.go @@ -368,9 +368,27 @@ func (d *Downloader) Continue(id string) (err error) { } func (d *Downloader) PauseAll() (err error) { - d.lock.Lock() - defer d.lock.Unlock() - return d.doPauseAll() + func() { + d.lock.Lock() + defer d.lock.Unlock() + + // Clear wait tasks + d.waitTasks = d.waitTasks[:0] + }() + + for _, task := range d.tasks { + err = func() error { + task.lock.Lock() + defer task.lock.Unlock() + + return d.doPause(task) + }() + if err != nil { + return + } + } + + return } func (d *Downloader) ContinueAll() (err error) { @@ -470,10 +488,8 @@ func (d *Downloader) doDelete(task *Task, force bool) (err error) { } func (d *Downloader) Close() error { - d.lock.Lock() - defer d.lock.Unlock() d.closed.Store(true) - if err := d.doPauseAll(); err != nil { + if err := d.PauseAll(); err != nil { return err } return d.storage.Close() @@ -673,15 +689,6 @@ func (d *Downloader) doCreate(fetcher fetcher.Fetcher, opts *base.Options) (task return } -func (d *Downloader) doPauseAll() (err error) { - for _, task := range d.tasks { - if err = d.doPause(task); err != nil { - return - } - } - return -} - func (d *Downloader) doStart(task *Task) (err error) { if task.Status != base.DownloadStatusRunning && task.Status != base.DownloadStatusDone { err := d.restoreFetcher(task) diff --git a/pkg/rest/server_test.go b/pkg/rest/server_test.go index b34fb346e..0d839b405 100644 --- a/pkg/rest/server_test.go +++ b/pkg/rest/server_test.go @@ -204,19 +204,16 @@ func TestPauseAllAndContinueALLTasks(t *testing.T) { // continue all httpRequestCheckOk[any](http.MethodPut, "/api/v1/tasks/continue", nil) - time.Sleep(time.Millisecond * 100) tasks := httpRequestCheckOk[[]*download.Task](http.MethodGet, fmt.Sprintf("/api/v1/tasks?status=%s", base.DownloadStatusRunning), nil) if len(tasks) != cfg.MaxRunning { t.Errorf("ContinueAllTasks() got = %v, want %v", len(tasks), cfg.MaxRunning) } // pause all httpRequestCheckOk[any](http.MethodPut, "/api/v1/tasks/pause", nil) - time.Sleep(time.Millisecond * 500) tasks = httpRequestCheckOk[[]*download.Task](http.MethodGet, fmt.Sprintf("/api/v1/tasks?status=%s", base.DownloadStatusPause), nil) if len(tasks) != total { t.Errorf("PauseAllTasks() got = %v, want %v", len(tasks), total) } - }) } @@ -519,7 +516,7 @@ func doTest(handler func()) { handler() } testFunc(model.StorageMem) - testFunc(model.StorageBolt) + //testFunc(model.StorageBolt) } func doStart(cfg *model.StartConfig) net.Listener { From 4fa54ca418b9aab273b55f2e3ce0b895a201f777 Mon Sep 17 00:00:00 2001 From: liwei Date: Tue, 24 Oct 2023 15:55:51 +0800 Subject: [PATCH 5/5] fix --- pkg/rest/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rest/server_test.go b/pkg/rest/server_test.go index 0d839b405..0d4150e68 100644 --- a/pkg/rest/server_test.go +++ b/pkg/rest/server_test.go @@ -516,7 +516,7 @@ func doTest(handler func()) { handler() } testFunc(model.StorageMem) - //testFunc(model.StorageBolt) + testFunc(model.StorageBolt) } func doStart(cfg *model.StartConfig) net.Listener {