@@ -3,6 +3,7 @@ package ftp
33import (
44 "bytes"
55 "context"
6+ "fmt"
67 "io"
78 "net/http"
89 "os"
@@ -62,10 +63,10 @@ func (f *FileUploadProxy) Read(p []byte) (n int, err error) {
6263func (f * FileUploadProxy ) Write (p []byte ) (n int , err error ) {
6364 n , err = f .buffer .Write (p )
6465 if err != nil {
65- return
66+ return n , err
6667 }
6768 err = stream .ClientUploadLimit .WaitN (f .ctx , n )
68- return
69+ return n , err
6970}
7071
7172func (f * FileUploadProxy ) Seek (offset int64 , whence int ) (int64 , error ) {
@@ -89,6 +90,25 @@ func (f *FileUploadProxy) Close() error {
8990 if _ , err := f .buffer .Seek (0 , io .SeekStart ); err != nil {
9091 return err
9192 }
93+ user := f .ctx .Value (conf .UserKey ).(* model.User )
94+ sf , borrow , err := MakeStage (f .ctx , f .buffer , size , f .path , func (target string ) {
95+ ctx := context .WithValue (context .Background (), conf .UserKey , user )
96+ dstDir , dstBase := stdpath .Split (target )
97+ if dir == dstDir {
98+ _ = fs .Rename (ctx , f .path , dstBase )
99+ } else {
100+ if name != dstBase {
101+ e := fs .Rename (ctx , f .path , dstBase , true )
102+ if e != nil {
103+ return
104+ }
105+ }
106+ _ , _ = fs .Move (ctx , stdpath .Join (dir , dstBase ), dstDir )
107+ }
108+ })
109+ if err != nil {
110+ return fmt .Errorf ("failed make stage for [%s]: %+v" , f .path , err )
111+ }
92112 if f .trunc {
93113 _ = fs .Remove (f .ctx , f .path )
94114 }
@@ -100,10 +120,18 @@ func (f *FileUploadProxy) Close() error {
100120 },
101121 Mimetype : contentType ,
102122 WebPutAsTask : true ,
123+ Reader : f .buffer ,
103124 }
104- s .SetTmpFile (f .buffer )
105- _ , err = fs .PutAsTask (f .ctx , dir , s )
106- return err
125+ s .Add (borrow )
126+ task , err := fs .PutAsTask (f .ctx , dir , s )
127+ if err != nil {
128+ _ = s .Close ()
129+ return err
130+ }
131+ sf .SetRemoveCallback (func () {
132+ fs .UploadTaskManager .Cancel (task .GetID ())
133+ })
134+ return nil
107135}
108136
109137type FileUploadWithLengthProxy struct {
@@ -182,10 +210,10 @@ func (f *FileUploadWithLengthProxy) write(p []byte) (n int, err error) {
182210func (f * FileUploadWithLengthProxy ) Write (p []byte ) (n int , err error ) {
183211 n , err = f .write (p )
184212 if err != nil {
185- return
213+ return n , err
186214 }
187215 err = stream .ClientUploadLimit .WaitN (f .ctx , n )
188- return
216+ return n , err
189217}
190218
191219func (f * FileUploadWithLengthProxy ) Seek (offset int64 , whence int ) (int64 , error ) {
@@ -214,6 +242,6 @@ func (f *FileUploadWithLengthProxy) Close() error {
214242 WebPutAsTask : false ,
215243 Reader : bytes .NewReader (data ),
216244 }
217- return fs .PutDirectly (f .ctx , dir , s , true )
245+ return fs .PutDirectly (f .ctx , dir , s )
218246 }
219247}
0 commit comments