diff --git a/README_nl.md b/README_nl.md index 56260243a..469e86028 100644 --- a/README_nl.md +++ b/README_nl.md @@ -64,6 +64,7 @@ Dank u voor uw ondersteuning en begrip - [x] [UPYUN Storage Service](https://www.upyun.com/products/file-storage) - [x] [WebDAV](https://en.wikipedia.org/wiki/WebDAV) - [x] Teambition([China](https://www.teambition.com), [Internationaal](https://us.teambition.com)) + - [x] [MediaFire](https://www.mediafire.com) - [x] [Mediatrack](https://www.mediatrack.cn) - [x] [139yun](https://yun.139.com) (Persoonlijk, Familie, Groep) - [x] [YandexDisk](https://disk.yandex.com) diff --git a/drivers/mediafire/driver.go b/drivers/mediafire/driver.go index 86f5d12a6..bd2502590 100644 --- a/drivers/mediafire/driver.go +++ b/drivers/mediafire/driver.go @@ -9,11 +9,15 @@ D@' 3z K!7 - The King Of Cracking Modifications by ILoveScratch2 Date: 2025-09-21 + +Date: 2025-09-26 +Final opts by @Suyunjing @j2rong4cn @KirCute @Da3zKi7 */ import ( "context" "fmt" + "math/rand" "net/http" "strconv" "time" @@ -22,6 +26,7 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/driver" "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/stream" + "github.com/OpenListTeam/OpenList/v4/pkg/cron" "github.com/OpenListTeam/OpenList/v4/pkg/utils" "golang.org/x/time/rate" ) @@ -30,6 +35,8 @@ type Mediafire struct { model.Storage Addition + cron *cron.Cron + actionToken string limiter *rate.Limiter @@ -66,7 +73,18 @@ func (d *Mediafire) Init(ctx context.Context) error { } // Validate and refresh session token if needed if _, err := d.getSessionToken(ctx); err != nil { - return d.renewToken(ctx) + + d.renewToken(ctx) + + // Avoids 10 mins token expiry (6- 9) + num := rand.Intn(4) + 6 + + d.cron = cron.NewCron(time.Minute * time.Duration(num)) + d.cron.Do(func() { + // Crazy, but working way to refresh session token + d.renewToken(ctx) + }) + } return nil @@ -76,6 +94,10 @@ func (d *Mediafire) Init(ctx context.Context) error { func (d *Mediafire) Drop(ctx context.Context) error { // Clear cached resources d.actionToken = "" + if d.cron != nil { + d.cron.Stop() + d.cron = nil + } return nil } diff --git a/drivers/mediafire/meta.go b/drivers/mediafire/meta.go index e3fc9414f..e80b11f1e 100644 --- a/drivers/mediafire/meta.go +++ b/drivers/mediafire/meta.go @@ -9,6 +9,9 @@ D@' 3z K!7 - The King Of Cracking Modifications by ILoveScratch2 Date: 2025-09-21 + +Date: 2025-09-26 +Final opts by @Suyunjing @j2rong4cn @KirCute @Da3zKi7 */ import ( @@ -26,6 +29,7 @@ type Addition struct { OrderBy string `json:"order_by" type:"select" options:"name,time,size" default:"name"` OrderDirection string `json:"order_direction" type:"select" options:"asc,desc" default:"asc"` ChunkSize int64 `json:"chunk_size" type:"number" default:"100"` + UploadThreads int `json:"upload_threads" type:"number" default:"3" help:"concurrent upload threads"` LimitRate float64 `json:"limit_rate" type:"float" default:"2" help:"limit all api request rate ([limit]r/1s)"` } diff --git a/drivers/mediafire/util.go b/drivers/mediafire/util.go index 0467fa2ab..fc8dd38ae 100644 --- a/drivers/mediafire/util.go +++ b/drivers/mediafire/util.go @@ -9,6 +9,9 @@ D@' 3z K!7 - The King Of Cracking Modifications by ILoveScratch2 Date: 2025-09-21 + +Date: 2025-09-26 +Final opts by @Suyunjing @j2rong4cn @KirCute @Da3zKi7 */ import ( @@ -19,6 +22,7 @@ import ( "net/http" "strconv" "strings" + "sync" "time" "github.com/OpenListTeam/OpenList/v4/drivers/base" @@ -26,7 +30,9 @@ import ( "github.com/OpenListTeam/OpenList/v4/internal/model" "github.com/OpenListTeam/OpenList/v4/internal/op" "github.com/OpenListTeam/OpenList/v4/internal/stream" + "github.com/OpenListTeam/OpenList/v4/pkg/errgroup" "github.com/OpenListTeam/OpenList/v4/pkg/utils" + "github.com/avast/retry-go" "github.com/go-resty/resty/v2" ) @@ -403,131 +409,185 @@ func (d *Mediafire) uploadCheck(ctx context.Context, filename string, filesize i return &resp, nil } -func (d *Mediafire) resumableUpload(ctx context.Context, folderKey, uploadKey string, unitData io.ReadSeeker, unitID int, fileHash, filename string, totalFileSize, unitSize int64) (string, error) { - actionToken, err := d.getActionToken(ctx) - if err != nil { - return "", err - } - if d.limiter != nil { - if err := d.limiter.Wait(ctx); err != nil { - return "", fmt.Errorf("rate limit wait failed: %w", err) - } - } - - url := d.apiBase + "/upload/resumable.php" - reader := driver.NewLimitedUploadStream(ctx, unitData) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, reader) - if err != nil { - return "", err - } - unitHash, err := utils.HashReader(utils.SHA256, reader) - if err != nil { - return "", err - } - unitData.Seek(0, io.SeekStart) - - q := req.URL.Query() - q.Add("folder_key", folderKey) - q.Add("response_format", "json") - q.Add("session_token", actionToken) - q.Add("key", uploadKey) - req.URL.RawQuery = q.Encode() - - req.Header.Set("x-filehash", fileHash) - req.Header.Set("x-filesize", strconv.FormatInt(totalFileSize, 10)) - req.Header.Set("x-unit-id", strconv.Itoa(unitID)) - req.Header.Set("x-unit-size", strconv.FormatInt(unitSize, 10)) - req.Header.Set("x-unit-hash", unitHash) - req.Header.Set("x-filename", filename) - req.Header.Set("Content-Type", "application/octet-stream") - req.ContentLength = unitSize - - /* fmt.Printf("Debug resumable upload request:\n") - fmt.Printf(" URL: %s\n", req.URL.String()) - fmt.Printf(" Headers: %+v\n", req.Header) - fmt.Printf(" Unit ID: %d\n", unitID) - fmt.Printf(" Unit Size: %d\n", len(unitData)) - fmt.Printf(" Upload Key: %s\n", uploadKey) - fmt.Printf(" Action Token: %s\n", actionToken) */ - - res, err := base.HttpClient.Do(req) - if err != nil { - return "", err - } - defer res.Body.Close() - - body, err := io.ReadAll(res.Body) - if err != nil { - return "", fmt.Errorf("failed to read response body: %v", err) - } - - // fmt.Printf("MediaFire resumable upload response (status %d): %s\n", res.StatusCode, string(body)) - - var uploadResp struct { - Response struct { - Doupload struct { - Key string `json:"key"` - } `json:"doupload"` - Result string `json:"result"` - } `json:"response"` - } - - if err := json.Unmarshal(body, &uploadResp); err != nil { - return "", fmt.Errorf("failed to parse response: %v", err) - } - - if res.StatusCode != 200 { - return "", fmt.Errorf("resumable upload failed with status %d", res.StatusCode) - } - - return uploadResp.Response.Doupload.Key, nil -} - func (d *Mediafire) uploadUnits(ctx context.Context, file model.FileStreamer, checkResp *MediafireCheckResponse, filename, fileHash, folderKey string, up driver.UpdateProgress) (string, error) { unitSize, _ := strconv.ParseInt(checkResp.Response.ResumableUpload.UnitSize, 10, 64) numUnits, _ := strconv.Atoi(checkResp.Response.ResumableUpload.NumberOfUnits) uploadKey := checkResp.Response.ResumableUpload.UploadKey stringWords := checkResp.Response.ResumableUpload.Bitmap.Words - intWords := make([]int, 0, len(stringWords)) // Pre-allocate with capacity + intWords := make([]int, 0, len(stringWords)) for _, word := range stringWords { if intWord, err := strconv.Atoi(word); err == nil { intWords = append(intWords, intWord) } } - ss, err := stream.NewStreamSectionReader(file, int(unitSize), &up) + // Intelligent buffer sizing for large files + bufferSize := int(unitSize) + fileSize := file.GetSize() + + // Split in chunks + if fileSize > d.ChunkSize*1024*1024 { + + // Large, use ChunkSize (default = 100MB) + bufferSize = min(int(fileSize), int(d.ChunkSize)*1024*1024) + } else if fileSize > 10*1024*1024 { + // Medium, use full file size for concurrent access + bufferSize = int(fileSize) + } + + // Create stream section reader for efficient chunking + ss, err := stream.NewStreamSectionReader(file, bufferSize, &up) if err != nil { return "", err } + + // Cal minimal parallel upload threads, allows MediaFire resumable upload to rule it over custom value + // If file is big, likely will respect d.UploadThreads instead of MediaFire's suggestion i.e. 5 threads + thread := min(numUnits, d.UploadThreads) + + // Create ordered group for sequential upload processing with retry logic + threadG, uploadCtx := errgroup.NewOrderedGroupWithContext(ctx, thread, + retry.Attempts(3), + retry.Delay(time.Second), + retry.DelayType(retry.BackOffDelay)) + var finalUploadKey string + var keyMutex sync.Mutex - for unitID := 0; unitID < numUnits; unitID++ { - if utils.IsCanceled(ctx) { - return "", ctx.Err() + fileSize = file.GetSize() + for unitID := range numUnits { + if utils.IsCanceled(uploadCtx) { + break } start := int64(unitID) * unitSize size := unitSize - if start+size > file.GetSize() { - size = file.GetSize() - start - } - section, err := ss.GetSectionReader(start, size) - if err != nil { - return "", err - } - - if !d.isUnitUploaded(intWords, unitID) { - uploadKeyResult, err := d.resumableUpload(ctx, folderKey, uploadKey, section, unitID, fileHash, filename, file.GetSize(), size) - ss.FreeSectionReader(section) - if err != nil { - return "", err - } - finalUploadKey = uploadKeyResult - } else { - ss.FreeSectionReader(section) + if start+size > fileSize { + size = fileSize - start } + var reader *stream.SectionReader + var rateLimitedRd io.Reader + var unitHash string + + // Use lifecycle pattern for proper resource management + threadG.GoWithLifecycle(errgroup.Lifecycle{ + Before: func(ctx context.Context) error { + // Skip already uploaded units + if d.isUnitUploaded(intWords, unitID) { + return ss.DiscardSection(start, size) + } + + var err error + reader, err = ss.GetSectionReader(start, size) + if err != nil { + return err + } + unitHash, err = utils.HashReader(utils.SHA256, reader) + if err != nil { + return err + } + + rateLimitedRd = driver.NewLimitedUploadStream(ctx, reader) + return nil + }, + Do: func(ctx context.Context) error { + if reader == nil { + return nil // Skip if reader is not initialized (already uploaded) + } + reader.Seek(0, io.SeekStart) + + // Perform upload + + actionToken, err := d.getActionToken(ctx) + if err != nil { + return err + } + if d.limiter != nil { + if err := d.limiter.Wait(ctx); err != nil { + return fmt.Errorf("rate limit wait failed: %w", err) + } + } + + url := d.apiBase + "/upload/resumable.php" + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, rateLimitedRd) + if err != nil { + return err + } + + q := req.URL.Query() + q.Add("folder_key", folderKey) + q.Add("response_format", "json") + q.Add("session_token", actionToken) + q.Add("key", uploadKey) + req.URL.RawQuery = q.Encode() + + req.Header.Set("x-filehash", fileHash) + req.Header.Set("x-filesize", strconv.FormatInt(fileSize, 10)) + req.Header.Set("x-unit-id", strconv.Itoa(unitID)) + req.Header.Set("x-unit-size", strconv.FormatInt(size, 10)) + req.Header.Set("x-unit-hash", unitHash) + req.Header.Set("x-filename", filename) + req.Header.Set("Content-Type", "application/octet-stream") + req.ContentLength = size + + /* fmt.Printf("Debug resumable upload request:\n") + fmt.Printf(" URL: %s\n", req.URL.String()) + fmt.Printf(" Headers: %+v\n", req.Header) + fmt.Printf(" Unit ID: %d\n", unitID) + fmt.Printf(" Unit Size: %d\n", len(unitData)) + fmt.Printf(" Upload Key: %s\n", uploadKey) + fmt.Printf(" Action Token: %s\n", actionToken) */ + + res, err := base.HttpClient.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %v", err) + } + + // fmt.Printf("MediaFire resumable upload response (status %d): %s\n", res.StatusCode, string(body)) + + var uploadResp struct { + Response struct { + Doupload struct { + Key string `json:"key"` + } `json:"doupload"` + Result string `json:"result"` + } `json:"response"` + } + + if err := json.Unmarshal(body, &uploadResp); err != nil { + return fmt.Errorf("failed to parse response: %v", err) + } + + if res.StatusCode != 200 { + return fmt.Errorf("resumable upload failed with status %d", res.StatusCode) + } + + // Thread-safe update of final upload key + keyMutex.Lock() + finalUploadKey = uploadResp.Response.Doupload.Key + keyMutex.Unlock() + + return nil + }, + After: func(err error) { + if reader != nil { + // Cleanup resources + ss.FreeSectionReader(reader) + } + }, + }) + } + + if err := threadG.Wait(); err != nil { + return "", err } return finalUploadKey, nil diff --git a/internal/stream/util.go b/internal/stream/util.go index 4f51a46d2..ee1ed96ac 100644 --- a/internal/stream/util.go +++ b/internal/stream/util.go @@ -200,6 +200,21 @@ func NewStreamSectionReader(file model.FileStreamer, maxBufferSize int, up *mode return ss, nil } +// 线程不安全 +func (ss *StreamSectionReader) DiscardSection(off int64, length int64) error { + if ss.file.GetFile() == nil { + if off != ss.off { + return fmt.Errorf("stream not cached: request offset %d != current offset %d", off, ss.off) + } + _, err := utils.CopyWithBufferN(io.Discard, ss.file, length) + if err != nil { + return fmt.Errorf("failed to skip data: (expect =%d) %w", length, err) + } + } + ss.off += length + return nil +} + // 线程不安全 func (ss *StreamSectionReader) GetSectionReader(off, length int64) (*SectionReader, error) { var cache io.ReaderAt = ss.file.GetFile()