Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion drivers/123/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
if cur == chunkCount {
curSize = lastChunkSize
}
var reader *stream.SectionReader
var reader io.ReadSeeker
var rateLimitedRd io.Reader
threadG.GoWithLifecycle(errgroup.Lifecycle{
Before: func(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion drivers/123_open/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
partNumber := partIndex + 1 // 分片号从1开始
offset := partIndex * chunkSize
size := min(chunkSize, size-offset)
var reader *stream.SectionReader
var reader io.ReadSeeker
var rateLimitedRd io.Reader
sliceMD5 := ""
// 表单
Expand Down
2 changes: 1 addition & 1 deletion drivers/189pc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
partSize = lastPartSize
}
partInfo := ""
var reader *stream.SectionReader
var reader io.ReadSeeker
var rateLimitedRd io.Reader
threadG.GoWithLifecycle(errgroup.Lifecycle{
Before: func(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion drivers/doubao/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (d *Doubao) UploadByMultipart(ctx context.Context, config *UploadConfig, fi
if partIndex == totalParts-1 {
size = fileSize - offset
}
var reader *stream.SectionReader
var reader io.ReadSeeker
var rateLimitedRd io.Reader
crc32Value := ""
threadG.GoWithLifecycle(errgroup.Lifecycle{
Expand Down
2 changes: 1 addition & 1 deletion drivers/mediafire/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (d *Mediafire) uploadUnits(ctx context.Context, file model.FileStreamer, ch
size = fileSize - start
}

var reader *stream.SectionReader
var reader io.ReadSeeker
var rateLimitedRd io.Reader
var unitHash string

Expand Down
5 changes: 3 additions & 2 deletions drivers/teldrive/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package teldrive

import (
"context"
"io"
"time"

"github.com/OpenListTeam/OpenList/v4/internal/model"
Expand Down Expand Up @@ -50,8 +51,8 @@ type chunkTask struct {
chunkIdx int
fileName string
chunkSize int64
reader *stream.SectionReader
ss *stream.StreamSectionReader
reader io.ReadSeeker
ss stream.StreamSectionReaderIF
}

type CopyManager struct {
Expand Down
173 changes: 130 additions & 43 deletions internal/stream/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"os"

"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/internal/model"
Expand Down Expand Up @@ -151,32 +152,58 @@ func CacheFullAndHash(stream model.FileStreamer, up *model.UpdateProgress, hashT
return tmpF, hex.EncodeToString(h.Sum(nil)), nil
}

type StreamSectionReader struct {
file model.FileStreamer
off int64
bufPool *pool.Pool[[]byte]
type StreamSectionReaderIF interface {
// 线程不安全
GetSectionReader(off, length int64) (io.ReadSeeker, error)
FreeSectionReader(sr io.ReadSeeker)
// 线程不安全
DiscardSection(off int64, length int64) error
}

func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *model.UpdateProgress) (*StreamSectionReader, error) {
ss := &StreamSectionReader{file: file}
func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *model.UpdateProgress) (StreamSectionReaderIF, error) {
if file.GetFile() != nil {
return ss, nil
return &cachedSectionReader{file.GetFile()}, nil
}

maxBufferSize = min(maxBufferSize, int(file.GetSize()))
if maxBufferSize > conf.MaxBufferLimit {
_, err := file.CacheFullAndWriter(up, nil)
f, err := os.CreateTemp(conf.Conf.TempDir, "file-*")
if err != nil {
return nil, err
}

if f.Truncate((file.GetSize()+int64(maxBufferSize-1))/int64(maxBufferSize)*int64(maxBufferSize)) != nil {
// fallback to full cache
_, _ = f.Close(), os.Remove(f.Name())
cache, err := file.CacheFullAndWriter(up, nil)
if err != nil {
return nil, err
}
return &cachedSectionReader{cache}, nil
}

ss := &fileSectionReader{Reader: file, temp: f}
ss.bufPool = &pool.Pool[*offsetWriterWithBase]{
New: func() *offsetWriterWithBase {
base := ss.fileOff
ss.fileOff += int64(maxBufferSize)
return &offsetWriterWithBase{io.NewOffsetWriter(ss.temp, base), base}
},
}
file.Add(utils.CloseFunc(func() error {
ss.bufPool.Reset()
return errors.Join(ss.temp.Close(), os.Remove(ss.temp.Name()))
}))
return ss, nil
}

ss := &directSectionReader{file: file}
if conf.MmapThreshold > 0 && maxBufferSize >= conf.MmapThreshold {
ss.bufPool = &pool.Pool[[]byte]{
New: func() []byte {
buf, err := mmap.Alloc(maxBufferSize)
if err == nil {
file.Add(utils.CloseFunc(func() error {
ss.file.Add(utils.CloseFunc(func() error {
return mmap.Free(buf)
}))
} else {
Expand All @@ -200,53 +227,113 @@ func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *mode
return ss, nil
}

type cachedSectionReader struct {
cache io.ReaderAt
}

func (*cachedSectionReader) DiscardSection(off int64, length int64) error {
return nil
}
func (s *cachedSectionReader) GetSectionReader(off, length int64) (io.ReadSeeker, error) {
return io.NewSectionReader(s.cache, off, length), nil
}
func (*cachedSectionReader) FreeSectionReader(sr io.ReadSeeker) {}

type fileSectionReader struct {
io.Reader
off int64
temp *os.File
fileOff int64
bufPool *pool.Pool[*offsetWriterWithBase]
}

type offsetWriterWithBase struct {
*io.OffsetWriter
base int64
}

// 线程不安全
func (ss *StreamSectionReader) DiscardSection(off int64, length int64) error {
if ss.file.GetFile() == nil {
if off != ss.off {
return fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
}
_, err := utils.CopyWithBufferN(io.Discard, ss.file, length)
if err != nil {
return fmt.Errorf("failed to skip data: (expect =%d) %w", length, err)
}
func (ss *fileSectionReader) DiscardSection(off int64, length int64) error {
if off != ss.off {
return fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
}
_, err := utils.CopyWithBufferN(io.Discard, ss.Reader, length)
if err != nil {
return fmt.Errorf("failed to skip data: (expect =%d) %w", length, err)
}
ss.off += length
return nil
}

// 线程不安全
func (ss *StreamSectionReader) GetSectionReader(off, length int64) (*SectionReader, error) {
var cache io.ReaderAt = ss.file.GetFile()
var buf []byte
if cache == nil {
if off != ss.off {
return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
}
tempBuf := ss.bufPool.Get()
buf = tempBuf[:length]
n, err := io.ReadFull(ss.file, buf)
if int64(n) != length {
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", length, n, err)
}
ss.off += int64(n)
off = 0
cache = bytes.NewReader(buf)
type fileBufferSectionReader struct {
io.ReadSeeker
fileBuf *offsetWriterWithBase
}

func (ss *fileSectionReader) GetSectionReader(off, length int64) (io.ReadSeeker, error) {
if off != ss.off {
return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
}
fileBuf := ss.bufPool.Get()
_, _ = fileBuf.Seek(0, io.SeekStart)
n, err := utils.CopyWithBufferN(fileBuf, ss.Reader, length)
if err != nil {
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", length, n, err)
}
return &SectionReader{io.NewSectionReader(cache, off, length), buf}, nil
ss.off += length
return &fileBufferSectionReader{io.NewSectionReader(ss.temp, fileBuf.base, length), fileBuf}, nil
}

func (ss *StreamSectionReader) FreeSectionReader(sr *SectionReader) {
if sr != nil {
if sr.buf != nil {
ss.bufPool.Put(sr.buf[0:cap(sr.buf)])
sr.buf = nil
}
func (ss *fileSectionReader) FreeSectionReader(rs io.ReadSeeker) {
if sr, ok := rs.(*fileBufferSectionReader); ok {
ss.bufPool.Put(sr.fileBuf)
sr.fileBuf = nil
sr.ReadSeeker = nil
}
}

type SectionReader struct {
type directSectionReader struct {
file model.FileStreamer
off int64
bufPool *pool.Pool[[]byte]
}

// 线程不安全
func (ss *directSectionReader) DiscardSection(off int64, length int64) error {
if off != ss.off {
return fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
}
_, err := utils.CopyWithBufferN(io.Discard, ss.file, length)
if err != nil {
return fmt.Errorf("failed to skip data: (expect =%d) %w", length, err)
}
ss.off += length
return nil
}

type bufferSectionReader struct {
io.ReadSeeker
buf []byte
}

// 线程不安全
func (ss *directSectionReader) GetSectionReader(off, length int64) (io.ReadSeeker, error) {
if off != ss.off {
return nil, fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off)
}
tempBuf := ss.bufPool.Get()
buf := tempBuf[:length]
n, err := io.ReadFull(ss.file, buf)
if int64(n) != length {
return nil, fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", length, n, err)
}
ss.off += int64(n)
return &bufferSectionReader{bytes.NewReader(buf), buf}, nil
}
func (ss *directSectionReader) FreeSectionReader(rs io.ReadSeeker) {
if sr, ok := rs.(*bufferSectionReader); ok {
ss.bufPool.Put(sr.buf[0:cap(sr.buf)])
sr.buf = nil
sr.ReadSeeker = nil
}
}