Skip to content

Commit 6de15b6

Browse files
authored
feat(stream): enhance GetRangeReaderFromLink rate limiting (#1528)
* feat(stream): enhance GetRangeReaderFromLink rate limiting * refactor(stream): update GetRangeReaderFromMFile to return *model.FileRangeReader * refactor(stream): simplify context error handling in RateLimitReader, RateLimitWriter, and RateLimitFile * refactor(net): replace custom LimitedReadCloser with readers.NewLimitedReadCloser * fix(model): update Link.ContentLength JSON tag for correct serialization * docs(model): add clarification to FileRangeReader usage comment
1 parent 2844797 commit 6de15b6

File tree

7 files changed

+80
-101
lines changed

7 files changed

+80
-101
lines changed

drivers/ftp/driver.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,7 @@ func (d *FTP) Link(ctx context.Context, file model.Obj, args model.LinkArgs) (*m
113113
}
114114

115115
return &model.Link{
116-
RangeReader: &model.FileRangeReader{
117-
RangeReaderIF: stream.RateLimitRangeReaderFunc(resultRangeReader),
118-
},
116+
RangeReader: stream.RateLimitRangeReaderFunc(resultRangeReader),
119117
SyncClosers: utils.NewSyncClosers(utils.CloseFunc(conn.Quit)),
120118
}, nil
121119
}

drivers/proton_drive/driver.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,7 @@ func (d *ProtonDrive) Link(ctx context.Context, file model.Obj, args model.LinkA
190190

191191
expiration := time.Minute
192192
return &model.Link{
193-
RangeReader: &model.FileRangeReader{
194-
RangeReaderIF: stream.RateLimitRangeReaderFunc(rangeReaderFunc),
195-
},
193+
RangeReader: stream.RateLimitRangeReaderFunc(rangeReaderFunc),
196194
ContentLength: size,
197195
Expiration: &expiration,
198196
}, nil

internal/model/args.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type Link struct {
3434
//for accelerating request, use multi-thread downloading
3535
Concurrency int `json:"concurrency"`
3636
PartSize int `json:"part_size"`
37-
ContentLength int64 `json:"-"` // 转码视频、缩略图
37+
ContentLength int64 `json:"content_length"` // 转码视频、缩略图
3838

3939
utils.SyncClosers `json:"-"`
4040
// 如果SyncClosers中的资源被关闭后Link将不可用,则此值应为 true

internal/model/file.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ func (f *FileCloser) Close() error {
2727
return errors.Join(errs...)
2828
}
2929

30+
// FileRangeReader 是对 RangeReaderIF 的轻量包装,表明由 RangeReaderIF.RangeRead
31+
// 返回的 io.ReadCloser 同时实现了 model.File(即支持 Read/ReadAt/Seek)。
32+
// 只有满足这些才需要使用 FileRangeReader,否则直接使用 RangeReaderIF 即可。
3033
type FileRangeReader struct {
3134
RangeReaderIF
3235
}

internal/net/util.go

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
package net
22

33
import (
4-
"fmt"
54
"io"
6-
"math"
75
"mime/multipart"
86
"net/http"
97
"net/textproto"
@@ -13,6 +11,7 @@ import (
1311

1412
"github.com/OpenListTeam/OpenList/v4/internal/conf"
1513
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
14+
"github.com/rclone/rclone/lib/readers"
1615

1716
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
1817
"github.com/go-resty/resty/v2"
@@ -308,39 +307,9 @@ func rangesMIMESize(ranges []http_range.Range, contentType string, contentSize i
308307
return encSize, nil
309308
}
310309

311-
// LimitedReadCloser wraps a io.ReadCloser and limits the number of bytes that can be read from it.
312-
type LimitedReadCloser struct {
313-
rc io.ReadCloser
314-
remaining int
315-
}
316-
317-
func (l *LimitedReadCloser) Read(buf []byte) (int, error) {
318-
if l.remaining <= 0 {
319-
return 0, io.EOF
320-
}
321-
322-
if len(buf) > l.remaining {
323-
buf = buf[0:l.remaining]
324-
}
325-
326-
n, err := l.rc.Read(buf)
327-
l.remaining -= n
328-
329-
return n, err
330-
}
331-
332-
func (l *LimitedReadCloser) Close() error {
333-
return l.rc.Close()
334-
}
335-
336310
// GetRangedHttpReader some http server doesn't support "Range" header,
337311
// so this function read readCloser with whole data, skip offset, then return ReaderCloser.
338312
func GetRangedHttpReader(readCloser io.ReadCloser, offset, length int64) (io.ReadCloser, error) {
339-
var length_int int
340-
if length > math.MaxInt {
341-
return nil, fmt.Errorf("doesnot support length bigger than int32 max ")
342-
}
343-
length_int = int(length)
344313

345314
if offset > 100*1024*1024 {
346315
log.Warnf("offset is more than 100MB, if loading data from internet, high-latency and wasting of bandwidth is expected")
@@ -351,7 +320,7 @@ func GetRangedHttpReader(readCloser io.ReadCloser, offset, length int64) (io.Rea
351320
}
352321

353322
// return an io.ReadCloser that is limited to `length` bytes.
354-
return &LimitedReadCloser{readCloser, length_int}, nil
323+
return readers.NewLimitedReadCloser(readCloser, length), nil
355324
}
356325

357326
// SetProxyIfConfigured sets proxy for HTTP Transport if configured

internal/stream/limit.go

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/OpenListTeam/OpenList/v4/internal/model"
99
"github.com/OpenListTeam/OpenList/v4/pkg/http_range"
10-
"github.com/OpenListTeam/OpenList/v4/pkg/utils"
1110
"golang.org/x/time/rate"
1211
)
1312

@@ -42,17 +41,14 @@ type RateLimitReader struct {
4241
}
4342

4443
func (r *RateLimitReader) Read(p []byte) (n int, err error) {
45-
if r.Ctx != nil && utils.IsCanceled(r.Ctx) {
46-
return 0, r.Ctx.Err()
44+
if err = r.Ctx.Err(); err != nil {
45+
return 0, err
4746
}
4847
n, err = r.Reader.Read(p)
4948
if err != nil {
5049
return
5150
}
5251
if r.Limiter != nil {
53-
if r.Ctx == nil {
54-
r.Ctx = context.Background()
55-
}
5652
err = r.Limiter.WaitN(r.Ctx, n)
5753
}
5854
return
@@ -72,17 +68,14 @@ type RateLimitWriter struct {
7268
}
7369

7470
func (w *RateLimitWriter) Write(p []byte) (n int, err error) {
75-
if w.Ctx != nil && utils.IsCanceled(w.Ctx) {
76-
return 0, w.Ctx.Err()
71+
if err = w.Ctx.Err(); err != nil {
72+
return 0, err
7773
}
7874
n, err = w.Writer.Write(p)
7975
if err != nil {
8076
return
8177
}
8278
if w.Limiter != nil {
83-
if w.Ctx == nil {
84-
w.Ctx = context.Background()
85-
}
8679
err = w.Limiter.WaitN(w.Ctx, n)
8780
}
8881
return
@@ -102,34 +95,28 @@ type RateLimitFile struct {
10295
}
10396

10497
func (r *RateLimitFile) Read(p []byte) (n int, err error) {
105-
if r.Ctx != nil && utils.IsCanceled(r.Ctx) {
106-
return 0, r.Ctx.Err()
98+
if err = r.Ctx.Err(); err != nil {
99+
return 0, err
107100
}
108101
n, err = r.File.Read(p)
109102
if err != nil {
110103
return
111104
}
112105
if r.Limiter != nil {
113-
if r.Ctx == nil {
114-
r.Ctx = context.Background()
115-
}
116106
err = r.Limiter.WaitN(r.Ctx, n)
117107
}
118108
return
119109
}
120110

121111
func (r *RateLimitFile) ReadAt(p []byte, off int64) (n int, err error) {
122-
if r.Ctx != nil && utils.IsCanceled(r.Ctx) {
123-
return 0, r.Ctx.Err()
112+
if err = r.Ctx.Err(); err != nil {
113+
return 0, err
124114
}
125115
n, err = r.File.ReadAt(p, off)
126116
if err != nil {
127117
return
128118
}
129119
if r.Limiter != nil {
130-
if r.Ctx == nil {
131-
r.Ctx = context.Background()
132-
}
133120
err = r.Limiter.WaitN(r.Ctx, n)
134121
}
135122
return
@@ -145,16 +132,16 @@ func (r *RateLimitFile) Close() error {
145132
type RateLimitRangeReaderFunc RangeReaderFunc
146133

147134
func (f RateLimitRangeReaderFunc) RangeRead(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
135+
if ServerDownloadLimit == nil {
136+
return f(ctx, httpRange)
137+
}
148138
rc, err := f(ctx, httpRange)
149139
if err != nil {
150140
return nil, err
151141
}
152-
if ServerDownloadLimit != nil {
153-
rc = &RateLimitReader{
154-
Ctx: ctx,
155-
Reader: rc,
156-
Limiter: ServerDownloadLimit,
157-
}
158-
}
159-
return rc, nil
142+
return &RateLimitReader{
143+
Ctx: ctx,
144+
Reader: rc,
145+
Limiter: ServerDownloadLimit,
146+
}, nil
160147
}

internal/stream/util.go

Lines changed: 56 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,44 +28,61 @@ func (f RangeReaderFunc) RangeRead(ctx context.Context, httpRange http_range.Ran
2828
}
2929

3030
func GetRangeReaderFromLink(size int64, link *model.Link) (model.RangeReaderIF, error) {
31-
if link.Concurrency > 0 || link.PartSize > 0 {
31+
if link.RangeReader != nil {
32+
if link.Concurrency < 1 && link.PartSize < 1 {
33+
return link.RangeReader, nil
34+
}
3235
down := net.NewDownloader(func(d *net.Downloader) {
3336
d.Concurrency = link.Concurrency
3437
d.PartSize = link.PartSize
38+
d.HttpClient = net.GetRangeReaderHttpRequestFunc(link.RangeReader)
3539
})
36-
var rangeReader RangeReaderFunc = func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
37-
var req *net.HttpRequestParams
38-
if link.RangeReader != nil {
39-
req = &net.HttpRequestParams{
40-
Range: httpRange,
41-
Size: size,
42-
}
43-
} else {
44-
requestHeader, _ := ctx.Value(conf.RequestHeaderKey).(http.Header)
45-
header := net.ProcessHeader(requestHeader, link.Header)
46-
req = &net.HttpRequestParams{
47-
Range: httpRange,
48-
Size: size,
49-
URL: link.URL,
50-
HeaderRef: header,
51-
}
52-
}
53-
return down.Download(ctx, req)
40+
rangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
41+
return down.Download(ctx, &net.HttpRequestParams{
42+
Range: httpRange,
43+
Size: size,
44+
})
5445
}
55-
if link.RangeReader != nil {
56-
down.HttpClient = net.GetRangeReaderHttpRequestFunc(link.RangeReader)
57-
return rangeReader, nil
58-
}
59-
return RateLimitRangeReaderFunc(rangeReader), nil
60-
}
61-
62-
if link.RangeReader != nil {
63-
return link.RangeReader, nil
46+
// RangeReader只能在驱动限速
47+
return RangeReaderFunc(rangeReader), nil
6448
}
6549

6650
if len(link.URL) == 0 {
6751
return nil, errors.New("invalid link: must have at least one of URL or RangeReader")
6852
}
53+
54+
if link.Concurrency > 0 || link.PartSize > 0 {
55+
down := net.NewDownloader(func(d *net.Downloader) {
56+
d.Concurrency = link.Concurrency
57+
d.PartSize = link.PartSize
58+
d.HttpClient = func(ctx context.Context, params *net.HttpRequestParams) (*http.Response, error) {
59+
if ServerDownloadLimit == nil {
60+
return net.DefaultHttpRequestFunc(ctx, params)
61+
}
62+
resp, err := net.DefaultHttpRequestFunc(ctx, params)
63+
if err == nil && resp.Body != nil {
64+
resp.Body = &RateLimitReader{
65+
Ctx: ctx,
66+
Reader: resp.Body,
67+
Limiter: ServerDownloadLimit,
68+
}
69+
}
70+
return resp, err
71+
}
72+
})
73+
rangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
74+
requestHeader, _ := ctx.Value(conf.RequestHeaderKey).(http.Header)
75+
header := net.ProcessHeader(requestHeader, link.Header)
76+
return down.Download(ctx, &net.HttpRequestParams{
77+
Range: httpRange,
78+
Size: size,
79+
URL: link.URL,
80+
HeaderRef: header,
81+
})
82+
}
83+
return RangeReaderFunc(rangeReader), nil
84+
}
85+
6986
rangeReader := func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
7087
if httpRange.Length < 0 || httpRange.Start+httpRange.Length > size {
7188
httpRange.Length = size - httpRange.Start
@@ -81,7 +98,15 @@ func GetRangeReaderFromLink(size int64, link *model.Link) (model.RangeReaderIF,
8198
}
8299
return nil, fmt.Errorf("http request failure, err:%w", err)
83100
}
84-
if httpRange.Start == 0 && (httpRange.Length == -1 || httpRange.Length == size) || response.StatusCode == http.StatusPartialContent ||
101+
if ServerDownloadLimit != nil {
102+
response.Body = &RateLimitReader{
103+
Ctx: ctx,
104+
Reader: response.Body,
105+
Limiter: ServerDownloadLimit,
106+
}
107+
}
108+
if httpRange.Start == 0 && httpRange.Length == size ||
109+
response.StatusCode == http.StatusPartialContent ||
85110
checkContentRange(&response.Header, httpRange.Start) {
86111
return response.Body, nil
87112
} else if response.StatusCode == http.StatusOK {
@@ -94,11 +119,10 @@ func GetRangeReaderFromLink(size int64, link *model.Link) (model.RangeReaderIF,
94119
}
95120
return response.Body, nil
96121
}
97-
return RateLimitRangeReaderFunc(rangeReader), nil
122+
return RangeReaderFunc(rangeReader), nil
98123
}
99124

100-
// RangeReaderIF.RangeRead返回的io.ReadCloser保留file的签名。
101-
func GetRangeReaderFromMFile(size int64, file model.File) model.RangeReaderIF {
125+
func GetRangeReaderFromMFile(size int64, file model.File) *model.FileRangeReader {
102126
return &model.FileRangeReader{
103127
RangeReaderIF: RangeReaderFunc(func(ctx context.Context, httpRange http_range.Range) (io.ReadCloser, error) {
104128
length := httpRange.Length

0 commit comments

Comments
 (0)