Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gatsby-parallel-runner package #21733

Closed
wants to merge 11 commits into from
1 change: 1 addition & 0 deletions packages/gatsby-parallel-runner/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules/
Empty file.
76 changes: 76 additions & 0 deletions packages/gatsby-parallel-runner/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Gatsby Parallel Runner

This is an early a parallel runner for gatsby that allows plugins and core parts of Gatsby to parallelize
suited tasks such as image processing.

When gatsby is started from a parent process with the environment variable `ENABLE_GATSBY_EXTERNAL_JOBS` set,
it will communicate some jobs up to the parent process via ipc, instead of running them in it's own internal
queue.

This allows a parent process to orchestrate certain task across multiple workers for better parallelization
through autoscaling cloud functions or the like.

Currently this plugin includes a processing queue implementation based on Google Cloud Functions, but the
general abstractions in place should make it easy to add similar runtimes for other cloud providers or via
different approaches to parallelization.

## Installation and usage

Install in your gatsby project:

```
npm i gatsby-parallel-runner
```

To use with Google Cloud, set relevant env variables in your shell:

```
export GOOGLE_APPLICATION_CREDENTIALS=~/path/to/your/google-credentials.json
export TOPIC=parallel-runner-topic
```

Deploy the cloud function:

```
npx gatsby-parallel-runner deploy

```

Then run your Gatsby build with the parallel runner instead of the default `gatsby build` command.

```
npx gatsby-parallel-runner
```

## Processor Queues, Processors and Implementations

Gatsby Parallel Runner comes with a set of core abstractions for parallelizing jobs.

The main orchestrator is the Processor Queue that gives invididual processors a simple interface for
sending jobs to cloud functions and getting back results:

```js
const result = await queue.process(job)
```

To do it's job, the ProcessorQueue needs a `pubSubImplementation` that must provide
`push(msg)` and `subscribe(handler)` methods for enqueuing new jobs and receiving
results.

Implementations are defined in `src/processor-queue/implementations` and there's currently
just one of them based on Google's Cloud Functions.

The `src/processors` folder has the different processors that can be triggered via Gatsby's
external job feature.

The processor folder must be named after the Redux event that should trigger it. Ie, the
Image Processing processor gets triggered by the sharp plugin via an `IMAGE_PROCESSING` job,
so the folder is called `image-processing`

Each processor can have a set of implementations based on the Processor Queue implementations
available.

There's currently just one processor (image-processing), with an implementation for `google-functions`.

When running `npx gatsby-parallel-runner deploy`, the active processor queue implementation will
make sure to deploy all the cloud function needed for the available processors.
37 changes: 37 additions & 0 deletions packages/gatsby-parallel-runner/bin/run.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env node

const { build } = require("../src/build")
const { deploy } = require("../src/deploy")
const { writeFileSync } = require("fs")

const requiredEnvVars = ["TOPIC", "GOOGLE_APPLICATION_CREDENTIALS"]
requiredEnvVars.forEach(key => {
if (!process.env[key]) {
console.error(`You must set a ${key} environment variable`)
process.exit(1)
}
})

if (!process.env.GOOGLE_APPLICATION_CREDENTIALS.match(/\.json$/)) {
const credentialsFile = "/tmp/credentials.json"
let credentials = null
try {
let buff = Buffer.from(process.env.GOOGLE_APPLICATION_CREDENTIALS, "base64")
credentials = buff.toString("ascii")
} catch (err) {
console.error(
"GOOGLE_APPLICATION_CREDENTIALS must either be a path to a .json file or base 64 encoded json credentials",
err
)
process.exit(1)
}
writeFileSync(credentialsFile, credentials)
process.env.GOOGLE_APPLICATION_CREDENTIALS = credentialsFile
}

if (process.argv.length === 3 && process.argv[2] === "deploy") {
console.log("Deploying Cloud Worker")
deploy().catch(error => console.error(error))
} else {
build()
}
33 changes: 33 additions & 0 deletions packages/gatsby-parallel-runner/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"name": "gatsby-parallel-runner",
"version": "1.2.1",
"description": "Gatsby plugin that allows paralellization of external tasks",
"keywords": [
"gatsby"
],
"main": "index.js",
"bin": {
"gatsby-parallel-runner": "./bin/run.js"
},
"author": "Mathias Biilmann <info@mathias-biilmann.net>",
"license": "MIT",
"repository": {
"type": "git",
"url": "https://github.com/gatsbyjs/gatsby.git",
"directory": "packages/gatsby-parallel-runner"
},
"homepage": "https://github.com/gatsbyjs/gatsby/tree/master/packages/gatsby-parallel-runner#readme",
"dependencies": {
"@google-cloud/pubsub": "^0.29.1",
"@google-cloud/storage": "^4.3.0",
"fs-extra": "^8.1.0",
"grpc": "^1.24.2",
"loglevel": "^1.6.6"
},
"devDependencies": {
"jest": "^25.1.0"
},
"scripts": {
"test": "jest"
}
}
127 changes: 127 additions & 0 deletions packages/gatsby-parallel-runner/src/__tests__/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
const path = require(`path`)
const { messageHandler } = require(`../build`)
const { resolveProcessors } = require(`../utils`)

test(`test message handler with image processor`, async () => {
expect.assertions(2)
const fakeGatsby = {
send: jest.fn(msg => {
expect(msg).toEqual({
type: `JOB_COMPLETED`,
payload: {
id: `1234`,
result: { outputs: [] },
},
})
}),
}

const processors = {
IMAGE_PROCESSING: {
process: jest.fn(async msg => {
expect(msg).toEqual({
id: `1234`,
name: `IMAGE_PROCESSING`,
args: [],
inputPaths: [
{ path: path.join(__dirname, `images`, `gatsby-astronaut.png`) },
],
})
return { outputs: [] }
}),
},
}

const handler = messageHandler(fakeGatsby, processors)
await handler({
type: `JOB_CREATED`,
payload: {
id: `1234`,
name: `IMAGE_PROCESSING`,
args: [],
inputPaths: [
{ path: path.join(__dirname, `images`, `gatsby-astronaut.png`) },
],
},
})
})

test(`test message handler with failing image processor`, async () => {
expect.assertions(2)
const fakeGatsby = {
send: jest.fn(msg => {
expect(msg).toEqual({
type: `JOB_FAILED`,
payload: {
id: `1234`,
error: `Error during processing...`,
},
})
}),
}

const processors = {
IMAGE_PROCESSING: {
process: jest.fn(async msg => {
expect(msg).toEqual({
id: `1234`,
name: `IMAGE_PROCESSING`,
args: [],
inputPaths: [
{ path: path.join(__dirname, `images`, `gatsby-astronaut.png`) },
],
})
return Promise.reject(`Error during processing...`)
}),
},
}

const handler = messageHandler(fakeGatsby, processors)
await handler({
type: `JOB_CREATED`,
payload: {
id: `1234`,
name: `IMAGE_PROCESSING`,
args: [],
inputPaths: [
{ path: path.join(__dirname, `images`, `gatsby-astronaut.png`) },
],
},
})
})

test(`test message handler with unkown processor`, async () => {
expect.assertions(1)
const fakeGatsby = {
send: jest.fn(msg => {
expect(msg).toEqual({
type: `JOB_NOT_WHITELISTED`,
payload: { id: `1234` },
})
}),
}

const handler = messageHandler(fakeGatsby, {})
await handler({
type: `JOB_CREATED`,
payload: {
id: `1234`,
name: `UNKOWN_PROCESSOR`,
args: [],
inputPaths: [
{ path: path.join(__dirname, `images`, `gatsby-astronaut.png`) },
],
},
})
})

test(`resolve processors`, async () => {
const processors = await resolveProcessors()
expect(processors).toEqual([
{
name: `image-processing`,
key: `IMAGE_PROCESSING`,
path: path.join(__dirname, `../processors/image-processing`),
},
])
})
98 changes: 98 additions & 0 deletions packages/gatsby-parallel-runner/src/build.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env node

const cp = require(`child_process`)
const log = require(`loglevel`)
const path = require(`path`)
const { ProcessorQueue } = require(`./processor-queue`)
const {
GoogleFunctions,
} = require(`./processor-queue/implementations/google-functions`)
const { resolveProcessors } = require(`./utils`)

const MESSAGE_TYPES = {
LOG_ACTION: `LOG_ACTION`,
JOB_CREATED: `JOB_CREATED`,
JOB_COMPLETED: `JOB_COMPLETED`,
JOB_FAILED: `JOB_FAILED`,
ACTIVITY_START: `ACTIVITY_START`,
ACTIVITY_END: `ACTIVITY_END`,
ACTIVITY_SUCCESS: `ACTIVITY_SUCCESS`,
ACTIVITY_ERROR: `ACTIVITY_ERROR`,
}

function messageHandler(gatsbyProcess, processors = {}) {
return async function(msg) {
if (
log.getLevel() <= log.levels.TRACE &&
msg.type !== MESSAGE_TYPES.LOG_ACTION
) {
log.trace(`Got gatsby message`, JSON.stringify(msg))
}
switch (msg.type) {
case MESSAGE_TYPES.JOB_CREATED: {
const processor = processors[msg.payload.name]
if (!processor) {
gatsbyProcess.send({
type: `JOB_NOT_WHITELISTED`,
payload: { id: msg.payload.id },
})
return
}
try {
const result = await processor.process(msg.payload)
gatsbyProcess.send({
type: `JOB_COMPLETED`,
payload: {
id: msg.payload.id,
result,
},
})
} catch (error) {
log.error(`Processing failed`, msg.payload.id, ` error:`, error)
gatsbyProcess.send({
type: `JOB_FAILED`,
payload: { id: msg.payload.id, error: error.toString() },
})
}
break
}
case MESSAGE_TYPES.LOG_ACTION:
// msg.action.payload.text && console.log(msg.action.payload.text)
break
default:
log.warn(`Ignoring message: `, msg)
}
}
}

exports.build = async function(cmd = `node_modules/.bin/gatsby build`) {
log.setLevel(process.env.PARALLEL_RUNNER_LOG_LEVEL || `warn`)

process.env.ENABLE_GATSBY_EXTERNAL_JOBS = true

const processors = {}
const processorList = await resolveProcessors()
await Promise.all(
processorList.map(async processorSettings => {
const klass = require(processorSettings.path).Processor
const pubSubImplementation = await new GoogleFunctions({
processorSettings,
})
const processorQueue = new ProcessorQueue({ pubSubImplementation })

processors[processorSettings.key] = new klass(processorQueue)
})
)

const [bin, ...args] = cmd.split(` `)
const gatsbyProcess = cp.fork(path.join(process.cwd(), bin), args)
gatsbyProcess.on(`exit`, async code => {
log.debug(`Gatsby existed with`, code)
process.exit(code)
})

const handler = messageHandler(gatsbyProcess, processors)
gatsbyProcess.on(`message`, handler)
}

exports.messageHandler = messageHandler
10 changes: 10 additions & 0 deletions packages/gatsby-parallel-runner/src/deploy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
const GooglePubSub = require(`./processor-queue/implementations/google-functions/deploy`)

exports.deploy = async function() {
try {
await GooglePubSub.deploy()
} catch (err) {
console.error(`Failed to deploy parallel functions`, err)
process.exit(1)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello, World!
Loading