@@ -16,6 +16,7 @@ import (
1616 "github.com/OpenListTeam/OpenList/v4/internal/model"
1717 sdkUserFile "github.com/halalcloud/golang-sdk-lite/halalcloud/services/userfile"
1818 "github.com/ipfs/go-cid"
19+ log "github.com/sirupsen/logrus"
1920)
2021
2122func (d * HalalCloudOpen ) put (ctx context.Context , dstDir model.Obj , fileStream model.FileStreamer , up driver.UpdateProgress ) (model.Obj , error ) {
@@ -51,52 +52,39 @@ func (d *HalalCloudOpen) put(ctx context.Context, dstDir model.Obj, fileStream m
5152 Version : 1 ,
5253 }
5354 blockSize := uploadTask .BlockSize
54- useSingleUpload := true
5555 //
56- if fileStream .GetSize () <= int64 (blockSize ) || d .uploadThread <= 1 {
57- useSingleUpload = true
58- }
5956 // Not sure whether FileStream supports concurrent read and write operations, so currently using single-threaded upload to ensure safety.
6057 // read file
61- if useSingleUpload {
62- bufferSize := int (blockSize )
63- buffer := make ([]byte , bufferSize )
64- reader := driver .NewLimitedUploadStream (ctx , fileStream )
65- teeReader := io .TeeReader (reader , driver .NewProgress (fileStream .GetSize (), up ))
66- // fileStream.Seek(0, os.SEEK_SET)
67- for {
68- n , err := teeReader .Read (buffer )
69- if n > 0 {
70- data := buffer [:n ]
71- uploadCid , err := postFileSlice (ctx , data , uploadTask .Task , uploadTask .UploadAddress , prefix , retryTimes )
58+ bufferSize := int (blockSize )
59+ buffer := make ([]byte , bufferSize )
60+ offset := 0
61+ teeReader := io .TeeReader (fileStream , driver .NewProgress (fileStream .GetSize (), up ))
62+ for {
63+ n , err := teeReader .Read (buffer [offset :]) // 这里 len(buf[offset:]) <= 4MB
64+ if n > 0 {
65+ offset += n
66+ if offset == int (blockSize ) {
67+ uploadCid , err := postFileSlice (ctx , buffer , uploadTask .Task , uploadTask .UploadAddress , prefix , retryTimes )
7268 if err != nil {
7369 return nil , err
7470 }
7571 slicesList = append (slicesList , uploadCid .String ())
76- }
77- if err == io .EOF || n == 0 {
78- break
72+ offset = 0
7973 }
8074 }
81- } else {
82- // TODO: implement multipart upload, currently using single-threaded upload to ensure safety.
83- bufferSize := int (blockSize )
84- buffer := make ([]byte , bufferSize )
85- reader := driver .NewLimitedUploadStream (ctx , fileStream )
86- teeReader := io .TeeReader (reader , driver .NewProgress (fileStream .GetSize (), up ))
87- for {
88- n , err := teeReader .Read (buffer )
89- if n > 0 {
90- data := buffer [:n ]
91- uploadCid , err := postFileSlice (ctx , data , uploadTask .Task , uploadTask .UploadAddress , prefix , retryTimes )
92- if err != nil {
93- return nil , err
75+
76+ if err != nil {
77+ if err == io .EOF {
78+ if offset > 0 {
79+ uploadCid , err := postFileSlice (ctx , buffer [:offset ], uploadTask .Task , uploadTask .UploadAddress , prefix , retryTimes )
80+ if err != nil {
81+ return nil , err
82+ }
83+ slicesList = append (slicesList , uploadCid .String ())
9484 }
95- slicesList = append (slicesList , uploadCid .String ())
96- }
97- if err == io .EOF || n == 0 {
9885 break
9986 }
87+ return nil , err
10088 }
10189 }
10290 newFile , err := makeFile (ctx , slicesList , uploadTask .Task , uploadTask .UploadAddress , retryTimes )
@@ -118,6 +106,7 @@ func makeFile(ctx context.Context, fileSlice []string, taskID string, uploadAddr
118106 if ctx .Err () != nil {
119107 return nil , err
120108 }
109+ log .Errorf ("make file slice failed, retrying... error: %s" , err .Error ())
121110 if strings .Contains (err .Error (), "not found" ) {
122111 return nil , err
123112 }
@@ -156,15 +145,23 @@ func doMakeFile(fileSlice []string, taskID string, uploadAddress string) (*sdkUs
156145 if httpResponse .StatusCode != http .StatusOK && httpResponse .StatusCode != http .StatusCreated {
157146 b , _ := io .ReadAll (httpResponse .Body )
158147 message := string (b )
148+ log .Errorf ("make file failed, status code: %d, message: %s" , httpResponse .StatusCode , message )
149+
159150 return nil , fmt .Errorf ("mk file slice failed, status code: %d, message: %s" , httpResponse .StatusCode , message )
160151 }
161152 b , _ := io .ReadAll (httpResponse .Body )
162- var result * sdkUserFile. File
153+ var result * UploadedFile
163154 err = json .Unmarshal (b , & result )
164155 if err != nil {
156+ log .Errorf ("make file failed from response, status code: %d, message: %s" , httpResponse .StatusCode , string (b ))
165157 return nil , err
166158 }
167- return result , nil
159+ return & sdkUserFile.File {
160+ Identity : result .Identity ,
161+ Path : result .Path ,
162+ Size : result .Size ,
163+ ContentIdentity : result .ContentIdentity ,
164+ }, nil
168165}
169166func postFileSlice (ctx context.Context , fileSlice []byte , taskID string , uploadAddress string , preix cid.Prefix , retry int ) (cid.Cid , error ) {
170167 var lastError error = nil
@@ -214,9 +211,11 @@ func doPostFileSlice(fileSlice []byte, taskID string, uploadAddress string, prei
214211 }
215212 httpResponse , err := httpClient .Do (& httpRequest )
216213 if err != nil {
214+ log .Errorf ("access %s failed, method: %s" , accessUrl , http .MethodGet )
217215 return cid .Undef , err
218216 }
219217 if httpResponse .StatusCode != http .StatusOK {
218+ log .Errorf ("access %s failed, method: %s, status code: %d" , accessUrl , http .MethodGet , httpResponse .StatusCode )
220219 return cid .Undef , fmt .Errorf ("upload file slice failed, status code: %d" , httpResponse .StatusCode )
221220 }
222221 var result bool
@@ -250,6 +249,7 @@ func doPostFileSlice(fileSlice []byte, taskID string, uploadAddress string, prei
250249 if httpResponse .StatusCode != http .StatusOK && httpResponse .StatusCode != http .StatusCreated {
251250 b , _ := io .ReadAll (httpResponse .Body )
252251 message := string (b )
252+ log .Errorf ("upload file slice failed, status code: %d, message: %s" , httpResponse .StatusCode , message )
253253 return cid .Undef , fmt .Errorf ("upload file slice failed, status code: %d, message: %s" , httpResponse .StatusCode , message )
254254 }
255255 //
0 commit comments