Skip to content

Commit

Permalink
fix(gatsby-source-filesystem): Retry stalled remote file downloads (#…
Browse files Browse the repository at this point in the history
…20843)

* fix(gatsby-source-filesystem): Retry stalled remote file downloads

* Switch to got@8

* Await the recursive callback

* Add tests for loading fix

* Don't reset on data. Start timout after `response`

* Add got timeout

* Don't await the recursive call

* Use constants for connection timeouts
  • Loading branch information
ascorbic authored Jan 31, 2020
1 parent a64cbf5 commit 536686b
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 7 deletions.
2 changes: 1 addition & 1 deletion packages/gatsby-source-filesystem/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"file-type": "^12.4.0",
"fs-extra": "^8.1.0",
"gatsby-core-utils": "^1.0.27",
"got": "^7.1.0",
"got": "^8.3.2",
"md5-file": "^3.2.3",
"mime": "^2.4.4",
"pretty-bytes": "^5.3.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ jest.mock(`fs-extra`, () => {
cb()
}
}),
close: jest.fn(),
}
}),
ensureDir: jest.fn(),
removeSync: jest.fn(),
move: jest.fn(),
stat: jest.fn(),
}
Expand Down Expand Up @@ -214,6 +216,40 @@ describe(`create-remote-file-node`, () => {
)
}
})

it(`retries if stalled`, done => {
const fs = require(`fs-extra`)

fs.createWriteStream.mockReturnValue({
on: jest.fn(),
close: jest.fn(),
})
jest.useFakeTimers()
got.stream.mockReset()
got.stream.mockReturnValueOnce({
pipe: jest.fn(() => {
return {
pipe: jest.fn(),
on: jest.fn(),
}
}),
on: jest.fn((mockType, mockCallback) => {
if (mockType === `response`) {
mockCallback({ statusCode: 200 })

expect(got.stream).toHaveBeenCalledTimes(1)
jest.advanceTimersByTime(1000)
expect(got.stream).toHaveBeenCalledTimes(1)
jest.advanceTimersByTime(30000)

expect(got.stream).toHaveBeenCalledTimes(2)
done()
}
}),
})
setup()
jest.runAllTimers()
})
})
})

Expand Down
49 changes: 44 additions & 5 deletions packages/gatsby-source-filesystem/src/create-remote-file-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ let totalJobs = 0

const CACHE_DIR = `.cache`
const FS_PLUGIN_DIR = `gatsby-source-filesystem`
const STALL_RETRY_LIMIT = 3
const STALL_TIMEOUT = 30000

const CONNECTION_RETRY_LIMIT = 5
const CONNECTION_TIMEOUT = 30000

/********************
* Queue Management *
Expand Down Expand Up @@ -124,31 +129,65 @@ async function pushToQueue(task, cb) {
* @param {Headers} headers
* @param {String} tmpFilename
* @param {Object} httpOpts
* @param {number} attempt
* @return {Promise<Object>} Resolves with the [http Result Object]{@link https://nodejs.org/api/http.html#http_class_http_serverresponse}
*/
const requestRemoteNode = (url, headers, tmpFilename, httpOpts) =>
const requestRemoteNode = (url, headers, tmpFilename, httpOpts, attempt = 1) =>
new Promise((resolve, reject) => {
const opts = Object.assign({}, { timeout: 30000, retries: 5 }, httpOpts)
let timeout

// Called if we stall for 30s without receiving any data
const handleTimeout = async () => {
fsWriteStream.close()
fs.removeSync(tmpFilename)
if (attempt < STALL_RETRY_LIMIT) {
// Retry by calling ourself recursively
resolve(
requestRemoteNode(url, headers, tmpFilename, httpOpts, attempt + 1)
)
} else {
reject(`Failed to download ${url} after ${STALL_RETRY_LIMIT} attempts`)
}
}

const resetTimeout = () => {
if (timeout) {
clearTimeout(timeout)
}
timeout = setTimeout(handleTimeout, STALL_TIMEOUT)
}
const responseStream = got.stream(url, {
headers,
...opts,
timeout: CONNECTION_TIMEOUT,
retries: CONNECTION_RETRY_LIMIT,
...httpOpts,
})
const fsWriteStream = fs.createWriteStream(tmpFilename)
responseStream.pipe(fsWriteStream)
responseStream.on(`downloadProgress`, pro => console.log(pro))

// If there's a 400/500 response or other error.
responseStream.on(`error`, (error, body, response) => {
responseStream.on(`error`, error => {
if (timeout) {
clearTimeout(timeout)
}
fs.removeSync(tmpFilename)
reject(error)
})

fsWriteStream.on(`error`, error => {
if (timeout) {
clearTimeout(timeout)
}
reject(error)
})

responseStream.on(`response`, response => {
resetTimeout()

fsWriteStream.on(`finish`, () => {
if (timeout) {
clearTimeout(timeout)
}
resolve(response)
})
})
Expand Down
2 changes: 1 addition & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -11219,7 +11219,7 @@ got@^6.7.1:
unzip-response "^2.0.1"
url-parse-lax "^1.0.0"

got@^7.0.0, got@^7.1.0:
got@^7.0.0:
version "7.1.0"
resolved "https://registry.yarnpkg.com/got/-/got-7.1.0.tgz#05450fd84094e6bbea56f451a43a9c289166385a"
dependencies:
Expand Down

0 comments on commit 536686b

Please sign in to comment.