From 4e483b9e1be4798317fd3e6cb3518e24d331b231 Mon Sep 17 00:00:00 2001 From: Dmytro Nechai Date: Mon, 10 Jul 2017 22:24:09 +0300 Subject: [PATCH] bench: add simple benchmark --- benchmark/run.js | 141 ++++++++++++++++++++++++++++++++++++++++ benchmark/server.js | 46 +++++++++++++ benchmark/statistics.js | 28 ++++++++ benchmark/worker.js | 97 +++++++++++++++++++++++++++ 4 files changed, 312 insertions(+) create mode 100644 benchmark/run.js create mode 100644 benchmark/server.js create mode 100644 benchmark/statistics.js create mode 100644 benchmark/worker.js diff --git a/benchmark/run.js b/benchmark/run.js new file mode 100644 index 00000000..b4efb489 --- /dev/null +++ b/benchmark/run.js @@ -0,0 +1,141 @@ +'use strict'; + +const cp = require('child_process'); +const path = require('path'); +const fs = require('fs'); +const statistics = require('./statistics'); + +const yargsParser = require('yargs-parser'); + +const args = yargsParser( + process.argv.slice(2), + { + alias: { + workers: ['W'], + connections: ['C'], + requests: ['R'], + size: ['S'], + }, + } +); + +const { + workers: workersAmount, + connections: connectionsPerWorker, + requests: requestsPerConnection, + size: argumentSize, +} = args; + +const server = cp.fork( + path.join(__dirname, 'server'), + [workersAmount * connectionsPerWorker], + { stdin: 'pipe' } +); +let socket; + +let serverExited = false; + +const workers = new Array(workersAmount); +const workersExited = new Array(workersAmount); + +const results = new Array(workersAmount); +let workersConnected = 0; +let workersFinished = 0; + +let benchStartedHR; + +server.on('exit', (exitCode) => { + serverExited = true; + if (exitCode !== 0) { + terminate(); + } +}); + +server.on('error', terminate); + +server.on('message', ([type, payload]) => { + if (type !== 'started') { + return; + } + socket = payload; + + const onWorkerExitFactory = index => (exitCode) => { + workersExited[index] = true; + if (exitCode !== 0) { + terminate(); + } + }; + + for (let i = 0; i < workersAmount; i++) { + workers[i] = cp.fork(path.join(__dirname, 'worker'), [], { stdin: 'pipe' }); + + workers[i].on('exit', onWorkerExitFactory(i)); + workers[i].on('message', workerListener); + workers[i].send(['connect', socket, connectionsPerWorker, argumentSize]); + } +}); + +function workerListener([type, payload]) { + if (type === 'connected') { + workersConnected++; + + if (workersConnected === workersAmount) { + benchStartedHR = process.hrtime(); + for (let i = 0; i < workersAmount; i++) { + workers[i].send(['start', requestsPerConnection]); + } + } + } else if (type === 'finished') { + results[workersFinished] = payload; + workersFinished++; + + if (workersFinished === workersAmount) { + outputResults(process.hrtime(benchStartedHR)); + } + } +} + +function outputResults(benchTimeHR) { + const count = workersAmount * connectionsPerWorker * requestsPerConnection; + const mean = statistics.mean(results.map(result => result[0])); + + const sum = results.reduce((previous, current) => ( + previous + Math.pow(current[1], 2) + Math.pow(current[0] - mean, 2) + ), 0); + const stdev = Math.sqrt(sum / workersAmount); + + const benchTime = benchTimeHR[0] * 1e9 + benchTimeHR[1]; + const erps = count * 1e9 / benchTime; + + server.send(['stop']); + console.log(` + Requests sent: ${count} + Mean time of one request: ${mean * 1e-6} (ms) + Stdev of time of one request: ${stdev * 1e-6} (ms) + Estimated RPS: ${erps} + `); + process.exit(0); +} + +function terminate() { + console.warn( + '\nBenchmark is being terminated due to an error or signal termination\n' + ); + workers.filter((_, index) => !workersExited[index]) + .forEach((worker) => { + worker.kill('SIGKILL'); + }); + + if (!serverExited) { + server.kill('SIGINT'); + setTimeout(() => { + if (!serverExited) { + server.kill('SIGKILL'); + console.warn('Master process was not able to close server gracefully'); + fs.unlinkSync(socket); + } + }, 5000); + } + + process.exit(0); +} diff --git a/benchmark/server.js b/benchmark/server.js new file mode 100644 index 00000000..9e3fccc8 --- /dev/null +++ b/benchmark/server.js @@ -0,0 +1,46 @@ +'use strict'; + +const os = require('os'); +const path = require('path'); + +const jstp = require('..'); + +const maxConnections = +process.argv[2]; + +const app = new jstp.Application('app', { + iface: { + method(connection, argument, callback) { + callback(null); + }, + }, +}); + +const server = jstp.net.createServer([app]); +server.maxConnections = maxConnections; + +const socket = path.join( + process.platform === 'win32' ? '\\\\.\\pipe' : os.tmpdir(), + 'jstp-ipc-test' +); + +const terminate = () => { + server.close(); + process.exit(0); +}; + +process.on('message', ([type]) => { + if (type === 'stop') { + terminate(); + } +}); + +process.on('SIGINT', terminate); + +server.listen(socket, (error) => { + if (error) { + console.error(error); + } + + console.log(`Server listening on ${socket} 🚀`); + process.send(['started', socket]); +}); diff --git a/benchmark/statistics.js b/benchmark/statistics.js new file mode 100644 index 00000000..1484b62e --- /dev/null +++ b/benchmark/statistics.js @@ -0,0 +1,28 @@ +'use strict'; + +const mean = (sample) => { + const len = sample.length; + if (len === 0) + return; + let sum = 0; + for (let i = 0; i < len; i++) { + sum += sample[i]; + } + return sum / len; +}; + +const stdev = (sample, meanValue) => { + const len = sample.length; + if (len === 0) + return; + if (len === 1) + return 0; + let sum = 0; + for (let i = 0; i < len; i++) { + sum += Math.pow(sample[i] - meanValue, 2); + } + const variance = sum / len; + return Math.sqrt(variance); +}; + +module.exports = { mean, stdev }; diff --git a/benchmark/worker.js b/benchmark/worker.js new file mode 100644 index 00000000..2814d75a --- /dev/null +++ b/benchmark/worker.js @@ -0,0 +1,97 @@ +'use strict'; + +const jstp = require('..'); + +const statistics = require('./statistics'); + +let connections; + +let argument; + +process.on('message', ([type, ...payload]) => { + if (type === 'connect') { + connections = new Array(payload[1]); + connect(payload[0]); + argument = '0'.repeat(payload[2]); + } else if (type === 'start') { + start(payload[0]); + } +}); + +function connect(socket) { + let connected = 0; + const createConnection = (index) => { + jstp.net.connectAndInspect( + 'app', null, ['iface'], socket, (error, conn) => { + connected++; + + if (error) { + console.error(`Could not connect to the server: ${error}`); + return; + } + + connections[index] = conn; + + if (connected === connections.length) { + process.send(['connected']); + } + } + ); + }; + + for (let i = 0; i < connections.length; i++) { + createConnection(i); + } +} + +function start(requests) { + const responseTimesHR = new Array(connections.length); + for (let i = 0; i < connections.length; i++) { + responseTimesHR[i] = new Array(requests); + } + let responses = 0; + let startTimeHR = null; + + const sendRequest = (connectionIndex, requestIndex) => { + const timeOfStart = process.hrtime(); + connections[connectionIndex].remoteProxies.iface.method(argument, () => { + + responseTimesHR[connectionIndex][requestIndex] = + process.hrtime(timeOfStart); + + responses++; + if (responses === requests * connections.length) { + process.send([ + 'finished', + prepareResults(responseTimesHR, process.hrtime(startTimeHR)), + ]); + connections.forEach(connection => connection.close()); + process.exit(0); + } + }); + }; + + startTimeHR = process.hrtime(); + + for (let i = 0; i < connections.length; i++) { + for (let j = 0; j < requests; j++) { + sendRequest(i, j); + } + } +} + +function prepareResults(responseTimesHR, timeSpentHR) { + const hrtimeToNSeconds = hrtime => hrtime[0] * 1e9 + hrtime[1]; + + responseTimesHR = responseTimesHR.reduce( + (previous, current) => previous.concat(current), [] + ); + + const responseTimes = responseTimesHR.map(hrtimeToNSeconds); + const timeSpent = hrtimeToNSeconds(timeSpentHR); + + const mean = statistics.mean(responseTimes); + const stdev = statistics.stdev(responseTimes, mean); + + return [mean, stdev, timeSpent]; +}