Skip to content

Commit

Permalink
fix: task can not completed (#280)
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyWie authored Nov 28, 2023
1 parent bba5107 commit 7d6428c
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 5 deletions.
2 changes: 1 addition & 1 deletion internal/protocol/http/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestFetcher_DownloadError(t *testing.T) {
}

func TestFetcher_DownloadLimit(t *testing.T) {
listener := test.StartTestLimitServer()
listener := test.StartTestLimitServer(4, 0)
defer listener.Close()

downloadNormal(listener, 1, t)
Expand Down
52 changes: 48 additions & 4 deletions internal/test/httptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func StartTestErrorServer() net.Listener {
}

// StartTestLimitServer connections limit server
func StartTestLimitServer() net.Listener {
func StartTestLimitServer(maxConnections int32, delay int64) net.Listener {
var connections atomic.Int32

return startTestServer(func() http.Handler {
Expand All @@ -148,7 +148,7 @@ func StartTestLimitServer() net.Listener {
connections.Add(-1)
}()
connections.Add(1)
if connections.Load() > 4 {
if maxConnections != 0 && connections.Load() > maxConnections {
writer.WriteHeader(403)
return
}
Expand All @@ -162,7 +162,7 @@ func StartTestLimitServer() net.Listener {
panic(err)
}
defer file.Close()
io.Copy(writer, file)
slowCopy(writer, file, delay)
} else {
// split range
s := strings.Split(r, "=")
Expand Down Expand Up @@ -203,13 +203,57 @@ func StartTestLimitServer() net.Listener {
}
defer file.Close()
file.Seek(start, 0)
io.CopyN(writer, file, end-start+1)
slowCopyN(writer, file, end-start+1, delay)
}
})
return mux
})
}

// slowCopyN copies n bytes from src to dst, speed limit is bytes per second
func slowCopy(dst io.Writer, src io.Reader, delay int64) (written int64, err error) {
buf := make([]byte, 32*1024)
for {
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = io.ErrShortWrite
break
}
}
if er != nil {
if er != io.EOF {
err = er
}
break
}
if delay > 0 {
time.Sleep(time.Millisecond * time.Duration(delay))
}
}
return written, err
}

func slowCopyN(dst io.Writer, src io.Reader, n int64, delay int64) (written int64, err error) {
written, err = slowCopy(dst, io.LimitReader(src, n), delay)
if written == n {
return n, nil
}
if written < n && err == nil {
// src stopped early; must have been EOF.
err = io.EOF
}
return
}

func startTestServer(serverHandle func() http.Handler) net.Listener {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/download/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,8 @@ func (d *Downloader) restoreFetcher(task *Task) error {
task.fetcher.Meta().Res = task.Meta.Res
}
go d.watch(task)
} else if task.Status == base.DownloadStatusError {
go d.watch(task)
}
task.fetcher.Create(task.Meta.Opts)
return nil
Expand Down

0 comments on commit 7d6428c

Please sign in to comment.