Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull out proc_runner from x-pack-kibana into kibana #17120

Merged
merged 13 commits into from
Mar 15, 2018
3 changes: 3 additions & 0 deletions packages/kbn-dev-utils/.babelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"presets": ["@kbn/babel-preset/node"]
}
8 changes: 8 additions & 0 deletions packages/kbn-dev-utils/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# dev utils

## proc runner

For running integration tests in Kibana

## tooling log

24 changes: 24 additions & 0 deletions packages/kbn-dev-utils/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "@kbn/dev-utils",
"main": "./target/index.js",
"version": "1.0.0",
"license": "Apache-2.0",
"private": true,
"scripts": {
"build": "babel src --out-dir target",
"kbn:bootstrap": "yarn build"
},
"dependencies": {
"chalk": "^2.3.0",
"moment": "^2.20.1",
"tree-kill": "1.1.0"
},
"devDependencies": {
"babel-cli": "^6.26.0",
"@kbn/babel-preset": "link:../kbn-babel-preset",
"chance": "1.0.6",
"expect.js": "0.3.1"
}
}


2 changes: 2 additions & 0 deletions packages/kbn-dev-utils/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { withProcRunner } from './proc-runner';
export { createToolingLog, pickLevelFromFlags } from './tooling-log';
3 changes: 3 additions & 0 deletions packages/kbn-dev-utils/src/proc-runner/__tests__/proc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#! /usr/bin/node

console.log(process.argv.join(' '));
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { withProcRunner } from '../with_proc_runner';

describe('proc-runner', () => {
function runProc({ thing = '', procs }) {
return new Promise(resolve => {
setTimeout(() => {
procs.run('proc', {
cmd: './proc',
args: ['these', 'are', 'words'],
});
resolve(thing);
}, 500);
});
}

it('passes procs to a function', async () => {
await withProcRunner(async procs => {
await runProc({ procs });
await procs.stop('proc');
});
});
});
11 changes: 11 additions & 0 deletions packages/kbn-dev-utils/src/proc-runner/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
const $isCliError = Symbol('isCliError');

export function createCliError(message) {
const error = new Error(message);
error[$isCliError] = true;
return error;
}

export function isCliError(error) {
return error && !!error[$isCliError];
}
1 change: 1 addition & 0 deletions packages/kbn-dev-utils/src/proc-runner/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { withProcRunner } from './with_proc_runner';
4 changes: 4 additions & 0 deletions packages/kbn-dev-utils/src/proc-runner/log.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { createToolingLog } from '../tooling-log';

export const log = createToolingLog('debug');
log.pipe(process.stdout);
38 changes: 38 additions & 0 deletions packages/kbn-dev-utils/src/proc-runner/observe_child_process.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import Rx from 'rxjs/Rx';

import { createCliError } from './errors';

/**
* Creates an Observable from a childProcess that:
* - provides the exit code as it's only value (which may be null)
* as soon as the process exits
* - completes once all stdio streams for the child process have closed
* - fails if the childProcess emits an error event
*
* @param {ChildProcess} childProcess
* @return {Rx.Observable}
*/
export function observeChildProcess(name, childProcess) {
// observe first exit event
const exit$ = Rx.Observable.fromEvent(childProcess, 'exit')
.first()
.map(code => {
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat then as errors
if (code > 0 && !(code === 143 || code === 130)) {
throw createCliError(`[${name}] exitted with code ${code}`);
}

return code;
});

// observe first close event
const close$ = Rx.Observable.fromEvent(childProcess, 'close').first();

// observe first error event until there is a close event
const error$ = Rx.Observable.fromEvent(childProcess, 'error')
.first()
.mergeMap(err => Rx.Observable.throw(err))
.takeUntil(close$);

return Rx.Observable.merge(exit$, close$.ignoreElements(), error$);
}
51 changes: 51 additions & 0 deletions packages/kbn-dev-utils/src/proc-runner/observe_lines.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import Rx from 'rxjs/Rx';

const SEP = /\r?\n/;

import { observeReadable } from './observe_readable';

/**
* Creates an Observable from a Readable Stream that:
* - splits data from `readable` into lines
* - completes when `readable` emits "end"
* - fails if `readable` emits "errors"
*
* @param {ReadableStream} readable
* @return {Rx.Observable}
*/
export function observeLines(readable) {
const done$ = observeReadable(readable).share();

const scan$ = Rx.Observable
.fromEvent(readable, 'data')
.scan(({ buffer }, chunk) => {
buffer += chunk;

let match;
const lines = [];
while (match = buffer.match(SEP)) {
lines.push(buffer.slice(0, match.index));
buffer = buffer.slice(match.index + match[0].length);
}

return { buffer, lines };
}, { buffer: '' })
// stop if done completes or errors
.takeUntil(done$.materialize());

return Rx.Observable.merge(
// use done$ to provide completion/errors
done$,

// merge in the "lines" from each step
scan$
.mergeMap(({ lines }) => lines),

// inject the "unsplit" data at the end
scan$
.last()
.mergeMap(({ buffer }) => (buffer ? [buffer] : []))
// if there were no lines, last() will error, so catch and complete
.catch(() => Rx.Observable.empty())
);
}
24 changes: 24 additions & 0 deletions packages/kbn-dev-utils/src/proc-runner/observe_readable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import Rx from 'rxjs/Rx';

/**
* Produces an Observable from a ReadableSteam that:
* - completes on the first "end" event
* - fails on the first "error" event
*
* @param {ReadableStream} readable
* @return {Rx.Observable}
*/
export function observeReadable(readable) {
return Rx.Observable
.race(
Rx.Observable
.fromEvent(readable, 'end')
.first()
.ignoreElements(),

Rx.Observable
.fromEvent(readable, 'error')
.first()
.map(err => Rx.Observable.throw(err))
);
}
16 changes: 16 additions & 0 deletions packages/kbn-dev-utils/src/proc-runner/observe_signals.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import Rx from 'rxjs/Rx';

/**
* Creates an Observable from a Process object that:
* - emits "exit", "SIGINT", or "SIGTERM" events that occur
*
* @param {ReadableStream} readable
* @return {Rx.Observable}
*/
export function observeSignals(process) {
return Rx.Observable.merge(
Rx.Observable.fromEvent(process, 'exit').mapTo('exit'),
Rx.Observable.fromEvent(process, 'SIGINT').mapTo('SIGINT'),
Rx.Observable.fromEvent(process, 'SIGTERM').mapTo('SIGTERM'),
);
}
65 changes: 65 additions & 0 deletions packages/kbn-dev-utils/src/proc-runner/proc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { spawn } from 'child_process';
import { statSync } from 'fs';

import Rx from 'rxjs/Rx';
import { gray } from 'chalk';

import treeKill from 'tree-kill';
import { promisify } from 'util';
const treeKillAsync = promisify(treeKill);

import { log } from './log';
import { observeLines } from './observe_lines';
import { observeChildProcess } from './observe_child_process';

export function createProc(name, { cmd, args, cwd, env, stdin }) {
log.info('[%s] > %s', name, cmd, args.join(' '));

// spawn fails with ENOENT when either the
// cmd or cwd don't exist, so we check for the cwd
// ahead of time so that the error is less ambiguous
try {
if (!statSync(cwd).isDirectory()) {
throw new Error(`cwd "${cwd}" exists but is not a directory`);
}
} catch (err) {
if (err.code === 'ENOENT') {
throw new Error(`cwd "${cwd}" does not exist`);
}
}

const childProcess = spawn(cmd, args, {
cwd,
env,
stdio: [stdin ? 'pipe' : 'ignore', 'pipe', 'pipe'],
});

if (stdin) {
childProcess.stdin.end(stdin, 'utf8');
}

return new class Proc {
name = name;

lines$ = Rx.Observable.merge(
observeLines(childProcess.stdout),
observeLines(childProcess.stderr)
)
.do(line => log.write(` ${gray('proc')} [${gray(name)}] ${line}`))
.share();

outcome$ = observeChildProcess(name, childProcess).share();

outcomePromise = Rx.Observable.merge(
this.lines$.ignoreElements(),
this.outcome$
).toPromise();

closedPromise = this.outcomePromise.then(() => {}, () => {});

async stop(signal) {
await treeKillAsync(childProcess.pid, signal);
await this.closedPromise;
}
}();
}
Loading