From 06d30767e3501aa457284a09b9946ac92f056e03 Mon Sep 17 00:00:00 2001 From: saltbo Date: Sun, 18 Jul 2021 21:41:18 +0800 Subject: [PATCH] fix: storage used double calc for the netdisk mode --- internal/app/api/matter.go | 2 +- internal/app/dao/matter.go | 8 +--- internal/pkg/fakefs/ffs.go | 7 +--- internal/pkg/fakefs/file.go | 15 ++++++- internal/pkg/fakefs/filewaiter.go | 65 +++++++++++++++++++++++++++++++ internal/pkg/fakefs/worker.go | 62 ----------------------------- 6 files changed, 83 insertions(+), 76 deletions(-) create mode 100644 internal/pkg/fakefs/filewaiter.go delete mode 100644 internal/pkg/fakefs/worker.go diff --git a/internal/app/api/matter.go b/internal/app/api/matter.go index eb23c99..dd01ed4 100644 --- a/internal/app/api/matter.go +++ b/internal/app/api/matter.go @@ -28,7 +28,7 @@ func (rs *FileResource) Register(router *gin.RouterGroup) { router.PATCH("/matters/:alias/location", rs.move) router.PATCH("/matters/:alias/duplicate", rs.copy) router.DELETE("/matters/:alias", rs.delete) - rs.fs.Start() + rs.fs.StartFileAutoDoneWorker() } func (rs *FileResource) findAll(c *gin.Context) { diff --git a/internal/app/dao/matter.go b/internal/app/dao/matter.go index a17b388..1fb9a3e 100644 --- a/internal/app/dao/matter.go +++ b/internal/app/dao/matter.go @@ -93,7 +93,7 @@ func (ms *Matter) FindUserMatter(uid int64, alias string) (*model.Matter, error) func (ms *Matter) Uploaded(matter *model.Matter, incrUsed bool) error { fc := func(tx *gorm.DB) error { - if err := tx.First(matter).Update("uploaded_at", time.Now()).Error; err != nil { + if err := tx.First(matter).Where("uploaded_at is null").Update("uploaded_at", time.Now()).Error; err != nil { return err } @@ -103,11 +103,7 @@ func (ms *Matter) Uploaded(matter *model.Matter, incrUsed bool) error { // update the storage used of the user expr := gorm.Expr("used+?", matter.Size) - if err := tx.Model(&model.UserStorage{}).Where("uid=?", matter.Uid).Update("used", expr).Error; err != nil { - return err - } - - return nil + return tx.Model(&model.UserStorage{}).Where("uid=?", matter.Uid).Update("used", expr).Error } return gdb.Transaction(fc) diff --git a/internal/pkg/fakefs/ffs.go b/internal/pkg/fakefs/ffs.go index dea0b02..dd03b4d 100644 --- a/internal/pkg/fakefs/ffs.go +++ b/internal/pkg/fakefs/ffs.go @@ -11,7 +11,6 @@ import ( ) type FakeFS struct { - worker *Worker dMatter *dao.Matter sFile *File @@ -20,7 +19,6 @@ type FakeFS struct { func New() *FakeFS { return &FakeFS{ - worker: NewWorker(), dMatter: dao.NewMatter(), sFile: NewFile(), @@ -28,8 +26,8 @@ func New() *FakeFS { } } -func (fs *FakeFS) Start() { - go fs.worker.Run() +func (fs *FakeFS) StartFileAutoDoneWorker() { + go fs.sFile.RunFileAutoDoneWorker() } func (fs *FakeFS) List(uid int64, qp *bind.QueryFiles) (list []model.Matter, total int64, err error) { @@ -128,7 +126,6 @@ func (fs *FakeFS) CreateFile(m *model.Matter) (interface{}, error) { return nil, err } - fs.worker.WaitDone(m, fs.TagUploadDone) return gin.H{ "alias": m.Alias, "object": m.Object, diff --git a/internal/pkg/fakefs/file.go b/internal/pkg/fakefs/file.go index c394884..c44c50f 100644 --- a/internal/pkg/fakefs/file.go +++ b/internal/pkg/fakefs/file.go @@ -17,17 +17,23 @@ import ( type File struct { dMatter *dao.Matter - sStorage *service.Storage + sStorage *service.Storage + fileWaiter *FileWaiter } func NewFile() *File { return &File{ dMatter: dao.NewMatter(), - sStorage: service.NewStorage(), + sStorage: service.NewStorage(), + fileWaiter: NewFileWaiter(), } } +func (f *File) RunFileAutoDoneWorker() error { + return f.fileWaiter.Run() +} + func (f *File) PreSignPutURL(matter *model.Matter) (url string, headers http.Header, err error) { if !f.dMatter.ParentExist(matter.Uid, matter.Parent) { return "", nil, fmt.Errorf("dir does not exists") @@ -52,6 +58,11 @@ func (f *File) PreSignPutURL(matter *model.Matter) (url string, headers http.Hea return "", nil, err } + // 只有外链盘才有自动标记上传完成的逻辑 + if storage.Mode == model.StorageModeOutline { + f.fileWaiter.Wait(provider, matter, f.UploadDone) + } + url, headers, err = provider.SignedPutURL(matter.Object, matter.Type, matter.Size, storage.PublicRead()) if err != nil { return diff --git a/internal/pkg/fakefs/filewaiter.go b/internal/pkg/fakefs/filewaiter.go new file mode 100644 index 0000000..fe4d66b --- /dev/null +++ b/internal/pkg/fakefs/filewaiter.go @@ -0,0 +1,65 @@ +package fakefs + +import ( + "fmt" + "math/rand" + "time" + + "github.com/saltbo/zpan/internal/app/model" + "github.com/saltbo/zpan/internal/pkg/provider" +) + +const maxHeadIntervalSec = 5 + +type AutoDoneMsg struct { + Provider provider.Provider + Matter *model.Matter + Handler func(uid int64, alias string) (*model.Matter, error) +} + +type FileWaiter struct { + ch chan *AutoDoneMsg +} + +func NewFileWaiter() *FileWaiter { + return &FileWaiter{ + ch: make(chan *AutoDoneMsg), + } +} + +func (w *FileWaiter) Run() error { + for m := range w.ch { + go w.runWait(m) + } + + return nil +} + +// fixme: 如果在外链上传期间服务重启了,将永远无法标记上传完成 + +func (w *FileWaiter) Wait(p provider.Provider, m *model.Matter, f func(uid int64, alias string) (*model.Matter, error)) { + w.ch <- &AutoDoneMsg{Provider: p, Matter: m, Handler: f} +} + +func (w *FileWaiter) runWait(adm *AutoDoneMsg) { + startAt := time.Now() + for { + // 如果超过上传有效期仍然没有上传完成则判定为失败,不再等待 + if startAt.Sub(time.Now()) > time.Hour { + break + } + + s := time.Now() + if _, err := adm.Provider.Head(adm.Matter.Object); err != nil { + // 加一个时间限制,控制请求频率 + if time.Now().Sub(s).Seconds() < maxHeadIntervalSec { + time.Sleep(time.Second * time.Duration(rand.Intn(maxHeadIntervalSec))) + } + continue + } + + adm.Handler(adm.Matter.Uid, adm.Matter.Alias) + fmt.Printf("object %s uploaed\n", adm.Matter.Object) + return + } +} diff --git a/internal/pkg/fakefs/worker.go b/internal/pkg/fakefs/worker.go deleted file mode 100644 index cfc8c93..0000000 --- a/internal/pkg/fakefs/worker.go +++ /dev/null @@ -1,62 +0,0 @@ -package fakefs - -import ( - "fmt" - "math/rand" - "time" - - "github.com/saltbo/zpan/internal/app/model" - "github.com/saltbo/zpan/internal/app/service" -) - -const maxHeadIntervalSec = 5 - -type Worker struct { - ch chan *model.Matter - done func(uid int64, alias string) (*model.Matter, error) - - sStorage *service.Storage -} - -func NewWorker() *Worker { - return &Worker{ - ch: make(chan *model.Matter), - - sStorage: service.NewStorage(), - } -} - -func (w *Worker) WaitDone(m *model.Matter, f func(uid int64, alias string) (*model.Matter, error)) { - w.ch <- m - w.done = f -} - -func (w *Worker) Run() error { - for m := range w.ch { - go w.waitDone(m) - } - - return nil -} - -func (w *Worker) waitDone(m *model.Matter) { - provider, err := w.sStorage.GetProvider(m.Sid) - if err != nil { - return - } - - for { - s := time.Now() - if _, err := provider.Head(m.Object); err != nil { - // 加一个时间限制,控制请求频率 - if time.Now().Sub(s).Seconds() < maxHeadIntervalSec { - time.Sleep(time.Second * time.Duration(rand.Intn(maxHeadIntervalSec))) - } - continue - } - - w.done(m.Uid, m.Alias) - fmt.Printf("object %s uploaed\n", m.Object) - return - } -}