Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1ab01df
pref(115,123): optimize upload
j2rong4cn Jul 3, 2025
d785509
chore
j2rong4cn Jul 3, 2025
3a5f3df
aliyun_open, google_drive
j2rong4cn Jul 3, 2025
2e67f28
fix bug
j2rong4cn Jul 3, 2025
3f0e1db
chore
j2rong4cn Jul 3, 2025
6b13bf7
cloudreve, cloudreve_v4, onedrive, onedrive_app
xrgzs Jul 3, 2025
e7a63e4
chore(conf): add `max_buffer_limit` option
j2rong4cn Jul 4, 2025
acaf5cc
Merge remote-tracking branch 'upstream/main' into pref-upload
j2rong4cn Jul 4, 2025
5bd0f3b
Merge remote-tracking branch 'upstream/main' into pref-upload
j2rong4cn Jul 5, 2025
139900d
123pan multithread upload
j2rong4cn Jul 5, 2025
6ca64b9
doubao
j2rong4cn Jul 5, 2025
6cb506e
google_drive
j2rong4cn Jul 5, 2025
d2feb37
chore
j2rong4cn Jul 5, 2025
6ae2ab5
chore
j2rong4cn Jul 5, 2025
c2d86d6
Merge remote-tracking branch 'upstream/main' into pref-upload
j2rong4cn Jul 8, 2025
64ff559
Merge remote-tracking branch 'upstream/main' into pref-upload
j2rong4cn Jul 12, 2025
d3fe02b
Merge remote-tracking branch 'upstream/main' into pref-upload
j2rong4cn Jul 14, 2025
70d1bdb
chore: 计算分片数量的代码
j2rong4cn Jun 26, 2025
701d14c
Merge remote-tracking branch 'origin/main' into pref-upload
j2rong4cn Jul 21, 2025
9b9b328
Merge remote-tracking branch 'origin/main' into pref-upload
j2rong4cn Jul 24, 2025
90a0d05
Merge remote-tracking branch 'origin/main' into pref-upload
j2rong4cn Jul 25, 2025
65fe59d
Merge remote-tracking branch 'origin/main' into pref-upload
j2rong4cn Aug 2, 2025
76c9977
MaxBufferLimit自动挡
j2rong4cn Aug 2, 2025
5321351
MaxBufferLimit自动挡
j2rong4cn Aug 2, 2025
e1a5c4f
189pc
j2rong4cn Aug 4, 2025
ae434c1
errorgroup添加Lifecycle
j2rong4cn Aug 4, 2025
db5ba60
查缺补漏
j2rong4cn Aug 4, 2025
605c7f5
Conf.MaxBufferLimit单位为MB
j2rong4cn Aug 4, 2025
eb0f072
j2rong4cn Aug 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions drivers/115_open/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
sdk "github.com/OpenListTeam/115-sdk-go"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/model"
streamPkg "github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/avast/retry-go"
Expand Down Expand Up @@ -69,9 +70,6 @@ func (d *Open115) singleUpload(ctx context.Context, tempF model.File, tokenResp
// }

func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer, up driver.UpdateProgress, tokenResp *sdk.UploadGetTokenResp, initResp *sdk.UploadInitResp) error {
fileSize := stream.GetSize()
chunkSize := calPartSize(fileSize)

ossClient, err := oss.New(tokenResp.Endpoint, tokenResp.AccessKeyId, tokenResp.AccessKeySecret, oss.SecurityToken(tokenResp.SecurityToken))
if err != nil {
return err
Expand All @@ -86,9 +84,15 @@ func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer,
return err
}

fileSize := stream.GetSize()
chunkSize := calPartSize(fileSize)
partNum := (stream.GetSize() + chunkSize - 1) / chunkSize
parts := make([]oss.UploadPart, partNum)
offset := int64(0)
ss, err := streamPkg.NewStreamSectionReader(stream, int(chunkSize))
if err != nil {
return err
}
for i := int64(1); i <= partNum; i++ {
if utils.IsCanceled(ctx) {
return ctx.Err()
Expand All @@ -98,10 +102,13 @@ func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer,
if i == partNum {
partSize = fileSize - (i-1)*chunkSize
}
rd := utils.NewMultiReadable(io.LimitReader(stream, partSize))
rd, err := ss.GetSectionReader(offset, partSize)
if err != nil {
return err
}
rateLimitedRd := driver.NewLimitedUploadStream(ctx, rd)
err = retry.Do(func() error {
_ = rd.Reset()
rateLimitedRd := driver.NewLimitedUploadStream(ctx, rd)
rd.Seek(0, io.SeekStart)
part, err := bucket.UploadPart(imur, rateLimitedRd, partSize, int(i))
if err != nil {
return err
Expand All @@ -112,6 +119,7 @@ func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer,
retry.Attempts(3),
retry.DelayType(retry.BackOffDelay),
retry.Delay(time.Second))
ss.RecycleSectionReader(rd)
if err != nil {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions drivers/123/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ type Addition struct {
driver.RootID
//OrderBy string `json:"order_by" type:"select" options:"file_id,file_name,size,update_at" default:"file_name"`
//OrderDirection string `json:"order_direction" type:"select" options:"asc,desc" default:"asc"`
AccessToken string
AccessToken string
UploadThread int `json:"UploadThread" type:"number" default:"3" help:"the threads of upload"`
}

var config = driver.Config{
Expand All @@ -22,6 +23,11 @@ var config = driver.Config{

func init() {
op.RegisterDriver(func() driver.Driver {
return &Pan123{}
// 新增默认选项 要在RegisterDriver初始化设置 才会对正在使用的用户生效
return &Pan123{
Addition: Addition{
UploadThread: 3,
},
}
})
}
156 changes: 94 additions & 62 deletions drivers/123/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ import (
"io"
"net/http"
"strconv"
"time"

"github.com/OpenListTeam/OpenList/v4/drivers/base"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/pkg/errgroup"
"github.com/OpenListTeam/OpenList/v4/pkg/singleflight"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/avast/retry-go"
"github.com/go-resty/resty/v2"
)

Expand Down Expand Up @@ -69,18 +74,15 @@ func (d *Pan123) completeS3(ctx context.Context, upReq *UploadResp, file model.F
}

func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.FileStreamer, up driver.UpdateProgress) error {
tmpF, err := file.CacheFullInTempFile()
if err != nil {
return err
}
// fetch s3 pre signed urls
size := file.GetSize()
chunkSize := min(size, 16*utils.MB)
chunkCount := int(size / chunkSize)
chunkSize := int64(16 * utils.MB)
chunkCount := 1
if size > chunkSize {
chunkCount = int((size + chunkSize - 1) / chunkSize)
}
lastChunkSize := size % chunkSize
if lastChunkSize > 0 {
chunkCount++
} else {
if lastChunkSize == 0 {
lastChunkSize = chunkSize
}
// only 1 batch is allowed
Expand All @@ -90,73 +92,103 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
batchSize = 10
getS3UploadUrl = d.getS3PreSignedUrls
}
ss, err := stream.NewStreamSectionReader(file, int(chunkSize))
if err != nil {
return err
}

thread := min(int(chunkCount), d.UploadThread)
threadG, uploadCtx := errgroup.NewOrderedGroupWithContext(ctx, thread,
retry.Attempts(3),
retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay))
for i := 1; i <= chunkCount; i += batchSize {
if utils.IsCanceled(ctx) {
return ctx.Err()
if utils.IsCanceled(uploadCtx) {
break
}
start := i
end := min(i+batchSize, chunkCount+1)
s3PreSignedUrls, err := getS3UploadUrl(ctx, upReq, start, end)
s3PreSignedUrls, err := getS3UploadUrl(uploadCtx, upReq, start, end)
if err != nil {
return err
}
// upload each chunk
for j := start; j < end; j++ {
if utils.IsCanceled(ctx) {
return ctx.Err()
for cur := start; cur < end; cur++ {
if utils.IsCanceled(uploadCtx) {
break
}
offset := int64(cur-1) * chunkSize
curSize := chunkSize
if j == chunkCount {
if cur == chunkCount {
curSize = lastChunkSize
}
err = d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, j, end, io.NewSectionReader(tmpF, chunkSize*int64(j-1), curSize), curSize, false, getS3UploadUrl)
if err != nil {
return err
}
up(float64(j) * 100 / float64(chunkCount))
var reader *stream.SectionReader
var rateLimitedRd io.Reader
threadG.GoWithLifecycle(errgroup.Lifecycle{
Before: func(ctx context.Context) error {
if reader == nil {
var err error
reader, err = ss.GetSectionReader(offset, curSize)
if err != nil {
return err
}
rateLimitedRd = driver.NewLimitedUploadStream(ctx, reader)
}
return nil
},
Do: func(ctx context.Context) error {
reader.Seek(0, io.SeekStart)
uploadUrl := s3PreSignedUrls.Data.PreSignedUrls[strconv.Itoa(cur)]
if uploadUrl == "" {
return fmt.Errorf("upload url is empty, s3PreSignedUrls: %+v", s3PreSignedUrls)
}
reader.Seek(0, io.SeekStart)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadUrl, rateLimitedRd)
if err != nil {
return err
}
req.ContentLength = curSize
//req.Header.Set("Content-Length", strconv.FormatInt(curSize, 10))
res, err := base.HttpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode == http.StatusForbidden {
singleflight.AnyGroup.Do(fmt.Sprintf("Pan123.newUpload_%p", threadG), func() (any, error) {
newS3PreSignedUrls, err := getS3UploadUrl(ctx, upReq, cur, end)
if err != nil {
return nil, err
}
s3PreSignedUrls.Data.PreSignedUrls = newS3PreSignedUrls.Data.PreSignedUrls
return nil, nil
})
if err != nil {
return err
}
return fmt.Errorf("upload s3 chunk %d failed, status code: %d", cur, res.StatusCode)
}
if res.StatusCode != http.StatusOK {
body, err := io.ReadAll(res.Body)
if err != nil {
return err
}
return fmt.Errorf("upload s3 chunk %d failed, status code: %d, body: %s", cur, res.StatusCode, body)
}
progress := 10.0 + 85.0*float64(threadG.Success())/float64(chunkCount)
up(progress)
return nil
},
After: func(err error) {
ss.RecycleSectionReader(reader)
},
})
}
}
// complete s3 upload
return d.completeS3(ctx, upReq, file, chunkCount > 1)
}

func (d *Pan123) uploadS3Chunk(ctx context.Context, upReq *UploadResp, s3PreSignedUrls *S3PreSignedURLs, cur, end int, reader *io.SectionReader, curSize int64, retry bool, getS3UploadUrl func(ctx context.Context, upReq *UploadResp, start int, end int) (*S3PreSignedURLs, error)) error {
uploadUrl := s3PreSignedUrls.Data.PreSignedUrls[strconv.Itoa(cur)]
if uploadUrl == "" {
return fmt.Errorf("upload url is empty, s3PreSignedUrls: %+v", s3PreSignedUrls)
}
req, err := http.NewRequest("PUT", uploadUrl, driver.NewLimitedUploadStream(ctx, reader))
if err != nil {
if err := threadG.Wait(); err != nil {
return err
}
req = req.WithContext(ctx)
req.ContentLength = curSize
//req.Header.Set("Content-Length", strconv.FormatInt(curSize, 10))
res, err := base.HttpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode == http.StatusForbidden {
if retry {
return fmt.Errorf("upload s3 chunk %d failed, status code: %d", cur, res.StatusCode)
}
// refresh s3 pre signed urls
newS3PreSignedUrls, err := getS3UploadUrl(ctx, upReq, cur, end)
if err != nil {
return err
}
s3PreSignedUrls.Data.PreSignedUrls = newS3PreSignedUrls.Data.PreSignedUrls
// retry
reader.Seek(0, io.SeekStart)
return d.uploadS3Chunk(ctx, upReq, s3PreSignedUrls, cur, end, reader, curSize, true, getS3UploadUrl)
}
if res.StatusCode != http.StatusOK {
body, err := io.ReadAll(res.Body)
if err != nil {
return err
}
return fmt.Errorf("upload s3 chunk %d failed, status code: %d, body: %s", cur, res.StatusCode, body)
}
return nil
defer up(100)
// complete s3 upload
return d.completeS3(ctx, upReq, file, chunkCount > 1)
}
76 changes: 46 additions & 30 deletions drivers/123_open/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package _123_open

import (
"context"
"io"
"net/http"
"strings"
"time"

"github.com/OpenListTeam/OpenList/v4/drivers/base"
"github.com/OpenListTeam/OpenList/v4/internal/driver"
"github.com/OpenListTeam/OpenList/v4/internal/model"
"github.com/OpenListTeam/OpenList/v4/internal/stream"
"github.com/OpenListTeam/OpenList/v4/pkg/errgroup"
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
"github.com/avast/retry-go"
"github.com/go-resty/resty/v2"
Expand Down Expand Up @@ -79,49 +80,64 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
size := file.GetSize()
chunkSize := createResp.Data.SliceSize
uploadNums := (size + chunkSize - 1) / chunkSize
threadG, uploadCtx := errgroup.NewGroupWithContext(ctx, d.UploadThread,
thread := min(int(uploadNums), d.UploadThread)
threadG, uploadCtx := errgroup.NewOrderedGroupWithContext(ctx, thread,
retry.Attempts(3),
retry.Delay(time.Second),
retry.DelayType(retry.BackOffDelay))

ss, err := stream.NewStreamSectionReader(file, int(chunkSize))
if err != nil {
return err
}
for partIndex := int64(0); partIndex < uploadNums; partIndex++ {
if utils.IsCanceled(uploadCtx) {
return ctx.Err()
break
}
partIndex := partIndex
partNumber := partIndex + 1 // 分片号从1开始
offset := partIndex * chunkSize
size := min(chunkSize, size-offset)
limitedReader, err := file.RangeRead(http_range.Range{
Start: offset,
Length: size})
if err != nil {
return err
}
limitedReader = driver.NewLimitedUploadStream(ctx, limitedReader)

threadG.Go(func(ctx context.Context) error {
uploadPartUrl, err := d.url(createResp.Data.PreuploadID, partNumber)
if err != nil {
return err
}
var reader *stream.SectionReader
var rateLimitedRd io.Reader
threadG.GoWithLifecycle(errgroup.Lifecycle{
Before: func(ctx context.Context) error {
if reader == nil {
var err error
reader, err = ss.GetSectionReader(offset, size)
if err != nil {
return err
}
rateLimitedRd = driver.NewLimitedUploadStream(ctx, reader)
}
return nil
},
Do: func(ctx context.Context) error {
reader.Seek(0, io.SeekStart)
uploadPartUrl, err := d.url(createResp.Data.PreuploadID, partNumber)
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, "PUT", uploadPartUrl, limitedReader)
if err != nil {
return err
}
req = req.WithContext(ctx)
req.ContentLength = size
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadPartUrl, rateLimitedRd)
if err != nil {
return err
}
req.ContentLength = size

res, err := base.HttpClient.Do(req)
if err != nil {
return err
}
_ = res.Body.Close()
res, err := base.HttpClient.Do(req)
if err != nil {
return err
}
_ = res.Body.Close()

progress := 10.0 + 85.0*float64(threadG.Success())/float64(uploadNums)
up(progress)
return nil
progress := 10.0 + 85.0*float64(threadG.Success())/float64(uploadNums)
up(progress)
return nil
},
After: func(err error) {
ss.RecycleSectionReader(reader)
},
})
}

Expand Down
Loading