Skip to content

Commit

Permalink
Queue requests from createRemoteFileNode and control concurrency of r…
Browse files Browse the repository at this point in the history
…equests (gatsbyjs#4616)

* Chunk the requests to download media objects from WP.
The blog I work on has over 9,000 media objects and currently, it tries
to download them all. This PR chunks them in groups of 100, but that
setting can be increased.

* Remove prettier forrmatting from the readme

* Clean up and document create-remote-file-node
Add Better Queue for more control over processing

* Rollback changes to wp source files

* Add queue for requesting wp objects
update readme with new config option

* Revert files to master

* No longer throw an exception when an error occurs. Just resolve with
null and move on

* Remove file lock lookup for now. 200 concurrent requests is a safe
number and we can look to change this in the future

* Cosmoetic updates

* Remove console.log
  • Loading branch information
tsimons authored and KyleAMathews committed Apr 3, 2018
1 parent aa03b9f commit d8e1732
Showing 1 changed file with 250 additions and 87 deletions.
337 changes: 250 additions & 87 deletions src/create-remote-file-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,112 +3,275 @@ const got = require(`got`)
const crypto = require(`crypto`)
const path = require(`path`)
const { isWebUri } = require(`valid-url`)
const Queue = require(`better-queue`)

const { createFileNode } = require(`./create-file-node`)
const cacheId = url => `create-remote-file-node-${url}`

/********************
* Type Definitions *
********************/

/**
* Index of promises resolving to File node from remote url
* @typedef {Redux}
* @see [Redux Docs]{@link https://redux.js.org/api-reference}
*/
const processingCache = {}

module.exports = ({ url, store, cache, createNode, auth = {} }) => {
// Check if we already requested node for this remote file
// and return stored promise if we did.
if (processingCache[url]) {
return processingCache[url]
/**
* @typedef {GatsbyCache}
* @see gatsby/packages/gatsby/utils/cache.js
*/

/**
* @typedef {Auth}
* @type {Object}
* @property {String} htaccess_pass
* @property {String} htaccess_user
*/

/**
* @typedef {CreateRemoteFileNodePayload}
* @typedef {Object}
* @description Create Remote File Node Payload
*
* @param {String} options.url
* @param {Redux} options.store
* @param {GatsbyCache} options.cache
* @param {Function} options.createNode
* @param {Auth} [options.auth]
*/

/*********
* utils *
*********/

/**
* createHash
* --
*
* Create an md5 hash of the given str
* @param {Stringq} str
* @return {String}
*/
const createHash = (str) => crypto
.createHash(`md5`)
.update(str)
.digest(`hex`)

const CACHE_DIR = `.cache`
const FS_PLUGIN_DIR = `gatsby-source-filesystem`

/**
* createFilePath
* --
*
* @param {String} directory
* @param {String} filename
* @param {String} url
* @return {String}
*/
const createFilePath = (directory, filename, ext) => path.join(
directory,
CACHE_DIR,
FS_PLUGIN_DIR,
`${filename}${ext}`
)

/********************
* Queue Management *
********************/

/**
* Queue
* Use the task's url as the id
* When pushing a task with a similar id, prefer the original task
* as it's already in the processing cache
*/
const queue = new Queue(pushToQueue, {
id: `url`,
merge: (old, _, cb) => cb(old),
concurrent: 200,
})

/**
* @callback {Queue~queueCallback}
* @param {*} error
* @param {*} result
*/

/**
* pushToQueue
* --
* Handle tasks that are pushed in to the Queue
*
*
* @param {CreateRemoteFileNodePayload} task
* @param {Queue~queueCallback} cb
* @return {Promise<null>}
*/
async function pushToQueue (task, cb) {
try {
const node = await processRemoteNode(task)
return cb(null, node)
} catch (e) {
return cb(null, e)
}
}

return (processingCache[url] = new Promise(async (resolve, reject) => {
if (!url || isWebUri(url) === undefined) {
resolve()
return
}
/******************
* Core Functions *
******************/

/**
* requestRemoteNode
* --
* Download the requested file
*
* @param {String} url
* @param {Headers} headers
* @param {String} tmpFilename
* @param {String} filename
* @return {Promise<Object>} Resolves with the [http Result Object]{@link https://nodejs.org/api/http.html#http_class_http_serverresponse}
*/
const requestRemoteNode = (url, headers, tmpFilename, filename) => new Promise((resolve, reject) => {
const responseStream = got.stream(url, { ...headers, timeout: 30000 })
responseStream.pipe(fs.createWriteStream(tmpFilename))
responseStream.on(`downloadProgress`, pro => console.log(pro))

// If there's a 400/500 response or other error.
responseStream.on(`error`, (error, body, response) => {
fs.removeSync(tmpFilename)
reject({ error, body, response })
})

// Ensure our cache directory exists.
await fs.ensureDir(
path.join(
store.getState().program.directory,
`.cache`,
`gatsby-source-filesystem`
)
responseStream.on(`response`, response => {
resolve(response)
})
})

/**
* processRemoteNode
* --
* Request the remote file and return the fileNode
*
* @param {CreateRemoteFileNodePayload} options
* @return {Promise<Object>} Resolves with the fileNode
*/
async function processRemoteNode ({ url, store, cache, createNode, auth = {} }) {
// Ensure our cache directory exists.
const programDir = store.getState().program.directory
await fs.ensureDir(
path.join(
programDir,
CACHE_DIR,
FS_PLUGIN_DIR
)
)

// See if there's response headers for this url
// from a previous request.
const cachedHeaders = await cache.get(cacheId(url))
const headers = {}
// See if there's response headers for this url
// from a previous request.
const cachedHeaders = await cache.get(cacheId(url))
const headers = {}

// Add htaccess authentication if passed in. This isn't particularly
// extensible. We should define a proper API that we validate.
if (auth && auth.htaccess_pass && auth.htaccess_user) {
headers.auth = `${auth.htaccess_user}:${auth.htaccess_pass}`
}
// Add htaccess authentication if passed in. This isn't particularly
// extensible. We should define a proper API that we validate.
if (auth && auth.htaccess_pass && auth.htaccess_user) {
headers.auth = `${auth.htaccess_user}:${auth.htaccess_pass}`
}

if (cachedHeaders && cachedHeaders.etag) {
headers[`If-None-Match`] = cachedHeaders.etag
}
if (cachedHeaders && cachedHeaders.etag) {
headers[`If-None-Match`] = cachedHeaders.etag
}

// Create the temp and permanent file names for the url.
const digest = crypto
.createHash(`md5`)
.update(url)
.digest(`hex`)
const tmpFilename = path.join(
store.getState().program.directory,
`.cache`,
`gatsby-source-filesystem`,
`tmp-` + digest + path.parse(url).ext
)
const filename = path.join(
store.getState().program.directory,
`.cache`,
`gatsby-source-filesystem`,
digest + path.parse(url).ext
)
// Create the temp and permanent file names for the url.
const digest = createHash(url)
const ext = path.parse(url).ext

// Fetch the file.
let statusCode
let responseHeaders
let responseError = false
const responseStream = got.stream(url, headers)
responseStream.pipe(fs.createWriteStream(tmpFilename))
responseStream.on(`downloadProgress`, pro => console.log(pro))

// If there's a 400/500 response or other error.
responseStream.on(`error`, (error, body, response) => {
responseError = true
fs.removeSync(tmpFilename)
reject(error, body, response)
})
const tmpFilename = createFilePath(programDir, `tmp-${digest}`, ext)
const filename = createFilePath(programDir, digest, ext)

// Fetch the file.
try {
const response = await requestRemoteNode(url, headers, tmpFilename, filename)
// Save the response headers for future requests.
cache.set(cacheId(url), response.headers)

// If the status code is 200, move the piped temp file to the real name.
if (response.statusCode === 200) {
await fs.move(tmpFilename, filename, { overwrite: true })
// Else if 304, remove the empty response.
responseStream.on(`response`, response => {
statusCode = response.statusCode
responseHeaders = response.headers
})
} else {
await fs.remove(tmpFilename)
}

// Create the file node.
const fileNode = await createFileNode(filename, {})

// Override the default plugin as gatsby-source-filesystem needs to
// be the owner of File nodes or there'll be conflicts if any other
// File nodes are created through normal usages of
// gatsby-source-filesystem.
createNode(fileNode, { name: `gatsby-source-filesystem` })

responseStream.on(`end`, response => {
if (responseError) return

// Save the response headers for future requests.
cache.set(cacheId(url), responseHeaders)
if (statusCode === 200) {
fs.moveSync(tmpFilename, filename, { overwrite: true })
} else {
fs.removeSync(tmpFilename)
}

// Create the file node and return.
createFileNode(filename, {}).then(fileNode => {
// Override the default plugin as gatsby-source-filesystem needs to
// be the owner of File nodes or there'll be conflicts if any other
// File nodes are created through normal usages of
// gatsby-source-filesystem.
createNode(fileNode, { name: `gatsby-source-filesystem` })
resolve(fileNode)
})
return fileNode
} catch (err) {
// ignore
}
return null
}

/**
* Index of promises resolving to File node from remote url
*/
const processingCache = {}
/**
* pushTask
* --
* pushes a task in to the Queue and the processing cache
*
* Promisfy a task in queue
* @param {CreateRemoteFileNodePayload} task
* @return {Promise<Object>}
*/
const pushTask = (task) => new Promise((resolve, reject) => {
queue
.push(task)
.on(`finish`, (task) => {
resolve(task)
})
.on(`failed`, () => {
resolve()
})
}))
})

/***************
* Entry Point *
***************/

/**
* createRemoteFileNode
* --
*
* Download a remote file
* First checks cache to ensure duplicate requests aren't processed
* Then pushes to a queue
*
* @param {CreateRemoteFileNodePayload} options
* @return {Promise<Object>} Returns the created node
*/
module.exports = ({ url, store, cache, createNode, auth = {} }) => {
// Check if we already requested node for this remote file
// and return stored promise if we did.
if (processingCache[url]) {
return processingCache[url]
}


if (!url || isWebUri(url) === undefined) {
// should we resolve here, or reject?
// Technically, it's invalid input
return Promise.resolve()
}

return (processingCache[url] = pushTask({ url, store, cache, createNode, auth }))
}

0 comments on commit d8e1732

Please sign in to comment.