Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: kill disposable nodes on stop and simplify started status #554

Merged
merged 7 commits into from
Oct 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"lint": "aegir lint",
"docs": "aegir docs",
"build": "aegir build",
"test": "aegir test -t node -t browser --timeout 10000",
"test": "aegir test",
"test:node": "aegir test -t node",
"test:browser": "aegir test -t browser",
"release": "aegir release --timeout 10000",
Expand Down Expand Up @@ -60,6 +60,7 @@
"merge-options": "^3.0.1",
"multiaddr": "^8.0.0",
"nanoid": "^3.1.3",
"p-wait-for": "^3.1.0",
"temp-write": "^4.0.0"
},
"devDependencies": {
Expand Down
119 changes: 75 additions & 44 deletions src/ipfsd-daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const path = require('path')
const os = require('os')
const tempWrite = require('temp-write')
const { checkForRunningApi, repoExists, tmpDir, defaultRepo } = require('./utils')
const waitFor = require('p-wait-for')

const daemonLog = {
info: debug('ipfsd-ctl:daemon:stdout'),
Expand Down Expand Up @@ -157,33 +158,41 @@ class Daemon {
* @returns {Promise<Daemon>}
*/
async start () {
const args = ['daemon']
const opts = this.opts.ipfsOptions
// add custom args
args.push(...this.opts.args)

if (opts.pass && this.opts.type === 'js') {
args.push('--pass', '"' + opts.pass + '"')
}
if (opts.offline) {
args.push('--offline')
}
if (opts.preload && this.opts.type === 'js') {
args.push('--enable-preload', Boolean(opts.preload.enabled))
}
if (opts.EXPERIMENTAL && opts.EXPERIMENTAL.sharding && this.opts.type === 'js') {
args.push('--enable-sharding-experiment')
}
if (opts.EXPERIMENTAL && opts.EXPERIMENTAL.ipnsPubsub) {
args.push('--enable-namesys-pubsub')
}

// Check if a daemon is already running
const api = checkForRunningApi(this.path)

if (api) {
this._setApi(api)
} else if (!this.exec) {
throw new Error('No executable specified')
} else {
const args = ['daemon']
const opts = this.opts.ipfsOptions
// add custom args
args.push(...this.opts.args)

if (opts.pass && this.opts.type === 'js') {
args.push('--pass', '"' + opts.pass + '"')
}

if (opts.offline) {
args.push('--offline')
}

if (opts.preload && this.opts.type === 'js') {
args.push('--enable-preload', Boolean(opts.preload.enabled))
}

if (opts.EXPERIMENTAL && opts.EXPERIMENTAL.sharding && this.opts.type === 'js') {
args.push('--enable-sharding-experiment')
}

if (opts.EXPERIMENTAL && opts.EXPERIMENTAL.ipnsPubsub) {
args.push('--enable-namesys-pubsub')
}

let output = ''

const ready = new Promise((resolve, reject) => {
this.subprocess = execa(this.exec, args, {
env: this.env
Expand Down Expand Up @@ -213,7 +222,17 @@ class Daemon {
}
this.subprocess.stdout.on('data', readyHandler)
this.subprocess.catch(err => reject(translateError(err)))
this.subprocess.on('exit', () => {
this.started = false
this.subprocess.stderr.removeAllListeners()
this.subprocess.stdout.removeAllListeners()

if (this.disposable) {
this.cleanup().catch(() => {})
}
})
})

await ready
}

Expand All @@ -228,43 +247,55 @@ class Daemon {
/**
* Stop the daemon.
*
* @param {object} [options]
* @param {number} [options.timeout=60000] - How long to wait for the daemon to stop
* @returns {Promise<Daemon>}
*/
async stop () {
async stop (options = {}) {
const timeout = options.timeout || 60000

if (!this.started) {
return this
}

let killTimeout
let killed = false
if (this.opts.forceKill !== false) {
killTimeout = setTimeout(() => {
// eslint-disable-next-line no-console
console.error(new Error(`Timeout stopping ${this.opts.type} node. Process ${this.subprocess.pid} will be force killed now.`))
killed = true
if (this.subprocess) {
let killTimeout

if (this.disposable) {
// we're done with this node and will remove it's repo when we are done
// so don't wait for graceful exit, just terminate the process
this.subprocess.kill('SIGKILL')
}, this.opts.forceKillTimeout)
}
} else {
if (this.opts.forceKill !== false) {
killTimeout = setTimeout(() => {
// eslint-disable-next-line no-console
console.error(new Error(`Timeout stopping ${this.opts.type} node after ${this.opts.forceKillTimeout}ms. Process ${this.subprocess.pid} will be force killed now.`))
this.subprocess.kill('SIGKILL')
}, this.opts.forceKillTimeout)
}

try {
await this.api.stop()
} catch (err) {
if (!killed) {
throw err // if was killed then ignore error
this.subprocess.cancel()
}

daemonLog.info('Daemon was force killed')
}
// wait for the subprocess to exit and declare ourselves stopped
await waitFor(() => !this.started, {
timeout
})

clearTimeout(killTimeout)
this.subprocess.stderr.removeAllListeners()
this.subprocess.stdout.removeAllListeners()
this.started = false
clearTimeout(killTimeout)

if (this.disposable) {
// wait for the cleanup routine to run after the subprocess has exited
await waitFor(() => this.clean, {
timeout
})
}
} else {
await this.api.stop()

if (this.disposable) {
await this.cleanup()
this.started = false
}

return this
}

Expand Down
53 changes: 53 additions & 0 deletions test/controller.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const { createFactory, createController } = require('../src')
const { repoExists } = require('../src/utils')
const { isBrowser, isWebWorker, isNode } = require('ipfs-utils/src/env')
const pathJoin = require('ipfs-utils/src/path-join')
const waitFor = require('p-wait-for')

/** @typedef {import("../src/index").ControllerOptions} ControllerOptions */

Expand Down Expand Up @@ -189,6 +190,58 @@ describe('Controller API', function () {
})
}
})

describe('should stop a running node that we have joined', () => {
for (const opts of types) {
it(`type: ${opts.type} remote: ${Boolean(opts.remote)}`, async function () {
if (isBrowser || isWebWorker) {
return this.skip() // browser can't attach to running node
}

// have to use createController so we don't try to shut down
// the node twice during test cleanup
const ctl1 = await createController(merge(
{
type: 'go',
ipfsHttpModule: require('ipfs-http-client'),
ipfsBin: require('go-ipfs').path(),
test: true,
disposable: true,
remote: false,
ipfsOptions: {
init: true,
start: true
}
}))
expect(ctl1.started).to.be.true()

const ctl2 = await createController(merge(
opts, {
ipfsHttpModule: require('ipfs-http-client'),
ipfsModule: require('ipfs'),
test: true,
disposable: true,
ipfsOptions: {
repo: ctl1.path,
start: true
}
}
))
expect(ctl2.started).to.be.true()

await ctl2.stop()
expect(ctl2.started).to.be.false()

// wait for the other subprocess to exit
await waitFor(() => !ctl1.started, { // eslint-disable-line max-nested-callbacks
timeout: 10000,
interval: 100
})

expect(ctl1.started).to.be.false()
})
}
})
})

describe('cleanup', () => {
Expand Down