From 7d6428c967c2437c8865ede6fd186ae332fb88ba Mon Sep 17 00:00:00 2001 From: Levi Date: Tue, 28 Nov 2023 18:27:36 +0800 Subject: [PATCH] fix: task can not completed (#280) --- internal/protocol/http/fetcher_test.go | 2 +- internal/test/httptest.go | 52 ++++++++++++++++++++++++-- pkg/download/downloader.go | 2 + 3 files changed, 51 insertions(+), 5 deletions(-) diff --git a/internal/protocol/http/fetcher_test.go b/internal/protocol/http/fetcher_test.go index f0408f0e9..0b552709f 100644 --- a/internal/protocol/http/fetcher_test.go +++ b/internal/protocol/http/fetcher_test.go @@ -123,7 +123,7 @@ func TestFetcher_DownloadError(t *testing.T) { } func TestFetcher_DownloadLimit(t *testing.T) { - listener := test.StartTestLimitServer() + listener := test.StartTestLimitServer(4, 0) defer listener.Close() downloadNormal(listener, 1, t) diff --git a/internal/test/httptest.go b/internal/test/httptest.go index e00f6c3dd..8af025ce9 100644 --- a/internal/test/httptest.go +++ b/internal/test/httptest.go @@ -138,7 +138,7 @@ func StartTestErrorServer() net.Listener { } // StartTestLimitServer connections limit server -func StartTestLimitServer() net.Listener { +func StartTestLimitServer(maxConnections int32, delay int64) net.Listener { var connections atomic.Int32 return startTestServer(func() http.Handler { @@ -148,7 +148,7 @@ func StartTestLimitServer() net.Listener { connections.Add(-1) }() connections.Add(1) - if connections.Load() > 4 { + if maxConnections != 0 && connections.Load() > maxConnections { writer.WriteHeader(403) return } @@ -162,7 +162,7 @@ func StartTestLimitServer() net.Listener { panic(err) } defer file.Close() - io.Copy(writer, file) + slowCopy(writer, file, delay) } else { // split range s := strings.Split(r, "=") @@ -203,13 +203,57 @@ func StartTestLimitServer() net.Listener { } defer file.Close() file.Seek(start, 0) - io.CopyN(writer, file, end-start+1) + slowCopyN(writer, file, end-start+1, delay) } }) return mux }) } +// slowCopyN copies n bytes from src to dst, speed limit is bytes per second +func slowCopy(dst io.Writer, src io.Reader, delay int64) (written int64, err error) { + buf := make([]byte, 32*1024) + for { + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er != nil { + if er != io.EOF { + err = er + } + break + } + if delay > 0 { + time.Sleep(time.Millisecond * time.Duration(delay)) + } + } + return written, err +} + +func slowCopyN(dst io.Writer, src io.Reader, n int64, delay int64) (written int64, err error) { + written, err = slowCopy(dst, io.LimitReader(src, n), delay) + if written == n { + return n, nil + } + if written < n && err == nil { + // src stopped early; must have been EOF. + err = io.EOF + } + return +} + func startTestServer(serverHandle func() http.Handler) net.Listener { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { diff --git a/pkg/download/downloader.go b/pkg/download/downloader.go index c7bf5c02c..1ab2c203c 100644 --- a/pkg/download/downloader.go +++ b/pkg/download/downloader.go @@ -635,6 +635,8 @@ func (d *Downloader) restoreFetcher(task *Task) error { task.fetcher.Meta().Res = task.Meta.Res } go d.watch(task) + } else if task.Status == base.DownloadStatusError { + go d.watch(task) } task.fetcher.Create(task.Meta.Opts) return nil