diff --git a/.gitignore b/.gitignore index 123ae94..254988d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,13 @@ +**/node_modules/ +**/*.log +test/repo-tests* + # Logs logs *.log +coverage + # Runtime data pids *.pid @@ -19,9 +25,11 @@ coverage # node-waf configuration .lock-wscript -# Compiled binary addons (http://nodejs.org/api/addons.html) -build/Release +build # Dependency directory # https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git node_modules + +lib +dist diff --git a/.npmignore b/.npmignore new file mode 100644 index 0000000..59335fd --- /dev/null +++ b/.npmignore @@ -0,0 +1,34 @@ +**/node_modules/ +**/*.log +test/repo-tests* + +# Logs +logs +*.log + +coverage + +# Runtime data +pids +*.pid +*.seed + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage + +# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# node-waf configuration +.lock-wscript + +build + +# Dependency directory +# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git +node_modules + +test diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..52cd14d --- /dev/null +++ b/.travis.yml @@ -0,0 +1,10 @@ +sudo: false +language: node_js +node_js: + - "stable" + +before_install: + - npm install -g npm + +script: + - npm run lint diff --git a/README.md b/README.md index 2340b56..a0de266 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,10 @@ interface-stream-muxer ===================== [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) +[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) +[![Travis CI](https://travis-ci.org/ipfs/interface-stream-muxer.svg?branch=master)](https://travis-ci.org/ipfs/interface-stream-muxer) +[![Dependency Status](https://david-dm.org/ipfs/interface-stream-muxer.svg?style=flat-square)](https://david-dm.org/ipfs/interface-stream-muxer) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard) > A test suite and interface you can use to implement a stream muxer. "A one stop shop for all your muxing needs" @@ -12,57 +15,55 @@ Publishing a test suite as a module lets multiple modules all ensure compatibili The API is presented with both Node.js and Go primitives, however, there is not actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks. -# Modules that implement the interface +## Modules that implement the interface -- [JavaScript libp2p-spdy](https://github.com/diasdavid/js-libp2p-spdy) -- [JavaScript libp2p-multiplex](https://github.com/diasdavid/js-libp2p-multiplex) +- [JavaScript libp2p-spdy](https://github.com/libp2p/js-libp2p-spdy) - [Go spdy, muxado, yamux and multiplex](https://github.com/jbenet/go-stream-muxer) Send a PR to add a new one if you happen to find or write one. -# Badge +## Badge Include this badge in your readme if you make a new module that uses interface-stream-muxer API. ![](/img/badge.png) -# How to use the battery tests +## Usage -## Node.js +### Node.js -Install interface-stream-muxer as one of the dependencies of your project and as a test file, using `tap`, `tape` or a test runner with compatible API, do: +Install `interface-stream-muxer` as one of the dependencies of your project and as a test file. Then, using `mocha` (for JavaScript) or a test runner with compatible API, do: -``` -var tape = require('tape') -var tests = require('interface-stream-muxer/tests') -var yourStreamMuxer = require('../src') +```js +const test = require('interface-stream-muxer') -var common = { - setup: function (t, cb) { - cb(null, yourStreamMuxer) +const common = { + setup (cb) { + cb(null, yourMuxer) }, - teardown: function (t, cb) { + teardown (cb) { cb() } } -tests(tape, common) +// use all of the test suits +test(common) ``` -## Go +### Go -> WIP - being written +> WIP -# API +## API A valid (read: that follows this abstraction) stream muxer, must implement the following API. -### Attach muxer to a transport +### Attach muxer to a Connection -- `Node.js` muxedConn = muxer(transport, isListener) -- `Go` muxedConn, err := muxer.Attach(transport, isListener) +- `JavaScript` muxedConn = muxer(conn, isListener) +- `Go` muxedConn, err := muxer.Attach(conn, isListener) -This method attaches our stream muxer to the desired transport (UDP, TCP) and returns/callbacks with the `err, conn`(error, connection). +This method attaches our stream muxer to an instance of [Connection](https://github.com/libp2p/interface-connection/blob/master/src/connection.js) defined by [interface-connection](https://github.com/libp2p/interface-connection). If `err` is passed, no operation should be made in `conn`. @@ -73,22 +74,20 @@ If `err` is passed, no operation should be made in `conn`. ### Dial(open/create) a new stream -- `Node.js` stream = muxedConn.newStream([function (err, stream)]) +- `JavaScript` stream = muxedConn.newStream([function (err, stream)]) - `Go` stream, err := muxedConn.newStream() This method negotiates and opens a new stream with the other endpoint. If `err` is passed, no operation should be made in `stream`. -`stream` interface our established Stream with the other endpoint, it must implement the [Duplex Stream interface](https://nodejs.org/api/stream.html#stream_class_stream_duplex) in Node.js or the [ReadWriteCloser](http://golang.org/pkg/io/#ReadWriteCloser) in Go. - -In the Node.js case, if no callback is passed, stream will emit an 'ready' event when it is prepared or a 'error' event if it fails to establish the connection, until then, it will buffer the 'write' calls. +`stream` interface our established Stream with the other endpoint, it must implement the [Duplex pull-stream interface](https://pull-stream.github.io) in JavaScript or the [ReadWriteCloser](http://golang.org/pkg/io/#ReadWriteCloser) in Go. ### Listen(wait/accept) a new incoming stream -- `Node.js` muxedConn.on('stream', function (stream) {}) +- `JavaScript` muxedConn.on('stream', function (stream) {}) - `Go` stream := muxedConn.Accept() Each time a dialing peer initiates the new stream handshake, a new stream is created on the listening side. -In Node.js, the Event Emitter pattern is expected to be used in order to receive new incoming streams, while in Go, it expects to wait when Accept is called. +In JavaScript, the Event Emitter pattern is expected to be used in order to receive new incoming streams, while in Go, it expects to wait when Accept is called. diff --git a/package.json b/package.json index 7e935c9..fcec055 100644 --- a/package.json +++ b/package.json @@ -2,31 +2,47 @@ "name": "interface-stream-muxer", "version": "0.3.1", "description": "A test suite and interface you can use to implement a stream muxer.", - "main": "tests/index.js", - "directories": { - "test": "tests" - }, + "main": "lib/index.js", + "jsnext:main": "src/index.js", "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" + "test": "exit(0)", + "build": "aegir-build --env node", + "lint": "aegir-lint", + "release": "aegir-release --env node", + "release-minor": "aegir-release --env node --type minor", + "release-major": "aegir-release --env node --type major", + "coverage": "exit(0)", + "coverage-publish": "exit(0)" }, "repository": { "type": "git", - "url": "https://github.com/diasdavid/interface-stream-muxer.git" + "url": "https://github.com/libp2p/interface-stream-muxer.git" }, "keywords": [ "Streams", "Muxer", - "interface", "Interface" ], "author": "David Dias ", "license": "MIT", "bugs": { - "url": "https://github.com/diasdavid/interface-stream-muxer/issues" + "url": "https://github.com/libp2p/interface-stream-muxer/issues" }, - "homepage": "https://github.com/diasdavid/interface-stream-muxer", + "homepage": "https://github.com/libp2p/interface-stream-muxer", "dependencies": { - "stream-pair": "^1.0.3", - "timed-tape": "^0.1.1" + "async": "^2.0.1", + "chai": "^3.5.0", + "chai-checkmark": "^1.0.1", + "detect-node": "^2.0.3", + "libp2p-tcp": "^0.8.1", + "multiaddr": "^2.0.2", + "pull-generate": "^2.2.0", + "pull-pair": "^1.1.0", + "pull-stream": "^3.4.3", + "run-parallel": "^1.1.6", + "run-series": "^1.1.4" + }, + "devDependencies": { + "aegir": "^8.0.0" } } diff --git a/src/base-test.js b/src/base-test.js new file mode 100644 index 0000000..8fa3a4c --- /dev/null +++ b/src/base-test.js @@ -0,0 +1,146 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('chai-checkmark')) +const expect = chai.expect +const pair = require('pull-pair/duplex') +const pull = require('pull-stream') + +function closeAndWait (stream) { + pull( + pull.empty(), + stream, + pull.onEnd((err) => { + expect(err).to.not.exist.mark() + }) + ) +} + +module.exports = (common) => { + describe('base', () => { + let muxer + + beforeEach((done) => { + common.setup((err, _muxer) => { + if (err) return done(err) + muxer = _muxer + done() + }) + }) + + it('Open a stream from the dialer', (done) => { + const p = pair() + const dialer = muxer.dialer(p[0]) + const listener = muxer.listener(p[1]) + + expect(4).checks(done) + + listener.on('stream', (stream) => { + expect(stream).to.exist.mark() + closeAndWait(stream) + }) + + const conn = dialer.newStream((err) => { + expect(err).to.not.exist.mark() + }) + + closeAndWait(conn) + }) + + it('Open a stream from the listener', (done) => { + const p = pair() + const dialer = muxer.dialer(p[0]) + const listener = muxer.listener(p[1]) + + expect(4).check(done) + + dialer.on('stream', (stream) => { + expect(stream).to.exist.mark() + closeAndWait(stream) + }) + + const conn = listener.newStream((err) => { + expect(err).to.not.exist.mark() + }) + + closeAndWait(conn) + }) + + it('Open a stream on both sides', (done) => { + const p = pair() + const dialer = muxer.dialer(p[0]) + const listener = muxer.listener(p[1]) + + expect(8).check(done) + + dialer.on('stream', (stream) => { + expect(stream).to.exist.mark() + closeAndWait(stream) + }) + + const listenerConn = listener.newStream((err) => { + expect(err).to.not.exist.mark() + }) + + listener.on('stream', (stream) => { + expect(stream).to.exist.mark() + closeAndWait(stream) + }) + + const dialerConn = dialer.newStream((err) => { + expect(err).to.not.exist.mark() + }) + + closeAndWait(dialerConn) + closeAndWait(listenerConn) + }) + + it('Open a stream on one side, write, open a stream in the other side', (done) => { + const p = pair() + const dialer = muxer.dialer(p[0]) + const listener = muxer.listener(p[1]) + + expect(6).check(done) + + const dialerConn = dialer.newStream((err) => { + expect(err).to.not.exist.mark() + }) + + pull( + pull.values(['hey']), + dialerConn + ) + + listener.on('stream', (stream) => { + pull( + stream, + pull.collect((err, chunks) => { + expect(err).to.not.exist.mark() + expect(chunks).to.be.eql([Buffer('hey')]).mark() + }) + ) + + const listenerConn = listener.newStream((err) => { + expect(err).to.not.exist.mark() + }) + + pull( + pull.values(['hello']), + listenerConn + ) + + dialer.on('stream', onDialerStream) + function onDialerStream (stream) { + pull( + stream, + pull.collect((err, chunks) => { + expect(err).to.not.exist.mark() + expect(chunks).to.be.eql([Buffer('hello')]).mark() + }) + ) + } + }) + }) + }) +} diff --git a/src/close-test.js b/src/close-test.js new file mode 100644 index 0000000..601ee99 --- /dev/null +++ b/src/close-test.js @@ -0,0 +1,162 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const chai = require('chai') +chai.use(require('chai-checkmark')) +const expect = chai.expect +const pair = require('pull-pair/duplex') +const pull = require('pull-stream') +const parallel = require('run-parallel') +const series = require('run-series') +const Tcp = require('libp2p-tcp') +const multiaddr = require('multiaddr') + +const mh = multiaddr('/ip4/127.0.0.1/tcp/10000') + +function closeAndWait (stream) { + pull( + pull.empty(), + stream, + pull.onEnd((err) => { + expect(err).to.not.exist.mark() + }) + ) +} + +module.exports = (common) => { + describe('close', () => { + let muxer + + beforeEach((done) => { + common.setup((err, _muxer) => { + if (err) return done(err) + muxer = _muxer + done() + }) + }) + + it('closing underlying closes streams (tcp)', (done) => { + expect(2).checks(done) + + const tcp = new Tcp() + const tcpListener = tcp.createListener((socket) => { + const listener = muxer.listener(socket) + listener.on('stream', (stream) => { + pull(stream, stream) + }) + }) + + tcpListener.listen(mh, () => { + const dialer = muxer.dialer(tcp.dial(mh, () => { + tcpListener.close() + })) + + const s1 = dialer.newStream(() => { + pull( + s1, + pull.onEnd((err) => { + expect(err).to.exist.mark() + }) + ) + + const s2 = dialer.newStream(() => { + pull( + s2, + pull.onEnd((err) => { + expect(err).to.exist.mark() + }) + ) + }) + }) + }) + }) + + it('closing one of the muxed streams doesn\'t close others', (done) => { + const p = pair() + const dialer = muxer.dialer(p[0]) + const listener = muxer.listener(p[1]) + + expect(6).checks(done) + + const conns = [] + + listener.on('stream', (stream) => { + expect(stream).to.exist.mark() + pull(stream, stream) + }) + + for (let i = 0; i < 5; i++) { + conns.push(dialer.newStream()) + } + + conns.forEach((conn, i) => { + if (i === 2) { + closeAndWait(conn) + } else { + pull( + conn, + pull.onEnd(() => { + throw new Error('should not end') + }) + ) + } + }) + }) + + it.skip('closing on spdy doesn\'t close until all the streams that are being muxed are closed', (done) => { + const p = pair() + const dialer = muxer.dial(p[0]) + const listener = muxer.listen(p[1]) + + expect(15).checks(done) + + const conns = [] + const count = [] + for (let i = 0; i < 5; i++) { + count.push(i) + } + + series(count.map((i) => (cb) => { + parallel([ + (cb) => listener.once('stream', (stream) => { + console.log('pipe') + expect(stream).to.exist.mark() + pull(stream, stream) + cb() + }), + (cb) => conns.push(dialer.newStream(cb)) + ], cb) + }), (err) => { + if (err) return done(err) + + conns.forEach((conn, i) => { + pull( + pull.values([Buffer('hello')]), + pull.asyncMap((val, cb) => { + setTimeout(() => { + cb(null, val) + }, i * 10) + }), + pull.through((val) => console.log('send', val)), + conn, + pull.through((val) => console.log('recv', val)), + pull.collect((err, data) => { + console.log('end', i) + expect(err).to.not.exist.mark() + expect(data).to.be.eql([Buffer('hello')]).mark() + }) + ) + }) + + listener.on('close', () => { + console.log('closed listener') + }) + + dialer.end(() => { + console.log('CLOSED') + }) + }) + }) + }) +} diff --git a/src/index.js b/src/index.js new file mode 100644 index 0000000..9006539 --- /dev/null +++ b/src/index.js @@ -0,0 +1,19 @@ +/* eslint-env mocha */ +'use strict' + +const baseTest = require('./base-test') +const stressTest = require('./stress-test') +const megaStressTest = require('./mega-stress-test') +const isNode = require('detect-node') + +module.exports = (common) => { + describe('interface-stream-muxer', () => { + baseTest(common) + if (isNode) { + const closeTest = require('./close-test') + closeTest(common) + } + stressTest(common) + megaStressTest(common) + }) +} diff --git a/src/mega-stress-test.js b/src/mega-stress-test.js new file mode 100644 index 0000000..be5a74e --- /dev/null +++ b/src/mega-stress-test.js @@ -0,0 +1,23 @@ +/* eslint-env mocha */ +'use strict' + +const spawn = require('./spawner') + +module.exports = (common) => { + describe.skip('mega stress test', function () { + this.timeout(100 * 200 * 1000) + let muxer + + beforeEach((done) => { + common.setup((err, _muxer) => { + if (err) return done(err) + muxer = _muxer + done() + }) + }) + + it('10000 messages of 10000 streams', (done) => { + spawn(muxer, 10000, 10000, done, 5000) + }) + }) +} diff --git a/src/spawner.js b/src/spawner.js new file mode 100644 index 0000000..c6bbdc5 --- /dev/null +++ b/src/spawner.js @@ -0,0 +1,88 @@ +'use strict' + +const expect = require('chai').expect + +const pair = require('pull-pair/duplex') +const pull = require('pull-stream') +const generate = require('pull-generate') +const each = require('async/each') +const eachLimit = require('async/eachLimit') + +module.exports = (muxer, nStreams, nMsg, done, limit) => { + const p = pair() + const dialerSocket = p[0] + const listenerSocket = p[1] + + const check = marker((6 * nStreams) + (nStreams * nMsg), done) + + const msg = 'simple msg' + + const listener = muxer.listener(listenerSocket) + const dialer = muxer.dialer(dialerSocket) + + listener.on('stream', (stream) => { + expect(stream).to.exist + check() + pull( + stream, + pull.through((chunk) => { + expect(chunk).to.exist + check() + }), + pull.onEnd((err) => { + expect(err).to.not.exist + check() + pull(pull.empty(), stream) + }) + ) + }) + + const numbers = [] + for (let i = 0; i < nStreams; i++) { + numbers.push(i) + } + + const spawnStream = (n, cb) => { + const stream = dialer.newStream((err) => { + expect(err).to.not.exist + check() + expect(stream).to.exist + check() + pull( + generate(0, (s, cb) => { + cb(s === nMsg ? true : null, msg, s + 1) + }), + stream, + pull.collect((err, res) => { + expect(err).to.not.exist + check() + expect(res).to.be.eql([]) + check() + cb() + }) + ) + }) + } + + if (limit) { + eachLimit(numbers, limit, spawnStream, () => {}) + } else { + each(numbers, spawnStream, () => {}) + } +} + +function marker (n, done) { + let i = 0 + return (err) => { + i++ + + if (err) { + console.error('Failed after %s iterations', i) + return done(err) + } + + if (i === n) { + done() + } + } +} diff --git a/src/stress-test.js b/src/stress-test.js new file mode 100644 index 0000000..c42cc2d --- /dev/null +++ b/src/stress-test.js @@ -0,0 +1,66 @@ +/* eslint-env mocha */ +'use strict' + +const spawn = require('./spawner') + +module.exports = (common) => { + describe('stress test', () => { + let muxer + + beforeEach((done) => { + common.setup((err, _muxer) => { + if (err) return done(err) + muxer = _muxer + done() + }) + }) + + it('1 stream with 1 msg', (done) => { + spawn(muxer, 1, 1, done) + }) + + it('1 stream with 10 msg', (done) => { + spawn(muxer, 1, 10, done) + }) + + it('1 stream with 100 msg', (done) => { + spawn(muxer, 1, 100, done) + }) + + it('10 streams with 1 msg', (done) => { + spawn(muxer, 10, 1, done) + }) + + it('10 streams with 10 msg', (done) => { + spawn(muxer, 10, 10, done) + }) + + it('10 streams with 100 msg', (done) => { + spawn(muxer, 10, 100, done) + }) + + it('100 streams with 1 msg', (done) => { + spawn(muxer, 100, 1, done) + }) + + it('100 streams with 10 msg', (done) => { + spawn(muxer, 100, 10, done) + }) + + it('100 streams with 100 msg', (done) => { + spawn(muxer, 100, 100, done) + }) + + it('1000 streams with 1 msg', (done) => { + spawn(muxer, 1000, 1, done) + }) + + it('1000 streams with 10 msg', (done) => { + spawn(muxer, 1000, 10, done) + }) + + it('1000 streams with 100 msg', (done) => { + spawn(muxer, 1000, 100, done) + }) + }) +} diff --git a/tests/.DS_Store b/tests/.DS_Store deleted file mode 100644 index 5008ddf..0000000 Binary files a/tests/.DS_Store and /dev/null differ diff --git a/tests/base-test.js b/tests/base-test.js deleted file mode 100644 index a36373b..0000000 --- a/tests/base-test.js +++ /dev/null @@ -1,167 +0,0 @@ -var streamPair = require('stream-pair') - -module.exports.all = function (test, common) { - test('Open a stream from the dialer', function (t) { - common.setup(test, function (err, muxer) { - t.plan(4) - t.ifError(err, 'Should not throw') - - var pair = streamPair.create() - var dialer = muxer(pair, false) - var listener = muxer(pair.other, true) - - listener.on('stream', (stream) => { - t.pass('got stream') - }) - - dialer.newStream((err, stream) => { - t.ifError(err, 'Should not throw') - t.pass('dialed stream') - }) - }) - }) - test('Open a stream from the listener', function (t) { - common.setup(test, function (err, muxer) { - t.plan(4) - t.ifError(err, 'Should not throw') - - var pair = streamPair.create() - var dialer = muxer(pair, false) - var listener = muxer(pair.other, true) - - dialer.on('stream', (stream) => { - t.pass('got stream') - }) - - listener.newStream((err, stream) => { - t.ifError(err, 'Should not throw') - t.pass('dialed stream') - }) - }) - }) - - test('Open a stream on both sides', function (t) { - common.setup(test, function (err, muxer) { - t.plan(7) - t.ifError(err, 'Should not throw') - - var pair = streamPair.create() - var dialer = muxer(pair, false) - var listener = muxer(pair.other, true) - - dialer.on('stream', (stream) => { - t.pass('got stream') - }) - - listener.newStream((err, stream) => { - t.ifError(err, 'Should not throw') - t.pass('dialed stream') - }) - - listener.on('stream', (stream) => { - t.pass('got stream') - }) - - dialer.newStream((err, stream) => { - t.ifError(err, 'Should not throw') - t.pass('dialed stream') - }) - }) - }) - - test('Open a stream on one side, write, open a stream in the other side', function (t) { - common.setup(test, function (err, muxer) { - t.plan(9) - t.ifError(err, 'Should not throw') - - var pair = streamPair.create() - var dialer = muxer(pair, false) - var listener = muxer(pair.other, true) - - dialer.newStream(function (err, stream) { - t.ifError(err, 'Should not throw') - t.pass('dialed stream from dialer') - stream.write('hey') - }) - - listener.on('stream', function (stream) { - t.pass('listener got stream') - - stream.on('data', function (chunk) { - t.equal(chunk.toString(), 'hey') - }) - - listener.newStream(function (err, stream) { - t.ifError(err, 'Should not throw') - t.pass('dialed stream from listener') - - stream.write('hello') - }) - }) - - dialer.on('stream', function (stream) { - t.pass('dialer got stream') - - stream.on('data', function (chunk) { - t.equal(chunk.toString(), 'hello') - }) - }) - }) - }) - - test('Open a stream using the net.connect pattern', function (t) { - common.setup(test, function (err, muxer) { - t.plan(2) - t.ifError(err, 'Should not throw') - - var pair = streamPair.create() - var dialer = muxer(pair, false) - var listener = muxer(pair.other, true) - - var stream = dialer.newStream() - - stream.on('ready', function () { - t.pass('dialed stream') - }) - - stream.on('error', function (err) { - t.ifError(err, 'Should not throw') - }) - - listener.on('stream', function (stream) { - t.pass('got stream') - }) - }) - }) - - test('Buffer writes Open a stream using the net.connect pattern', function (t) { - common.setup(test, function (err, muxer) { - t.plan(3) - t.ifError(err, 'Should not throw') - - var pair = streamPair.create() - var dialer = muxer(pair, false) - var listener = muxer(pair.other, true) - - var stream = dialer.newStream() - - stream.write('buffer this') - - stream.on('ready', function () { - t.pass('dialed stream') - }) - - stream.on('error', function (err) { - t.ifError(err, 'Should not throw') - }) - - listener.on('stream', function (stream) { - t.pass('got stream') - - stream.on('data', function (chunk) { - t.equal(chunk.toString(), 'buffer this') - }) - }) - }) - }) -} diff --git a/tests/index.js b/tests/index.js deleted file mode 100644 index dddff1c..0000000 --- a/tests/index.js +++ /dev/null @@ -1,8 +0,0 @@ -var timed = require('timed-tape') - -module.exports = function (test, common, mega) { - test = timed(test) - require('./base-test.js').all(test, common) - require('./stress-test.js').all(test, common) - // require('./mega-stress-test.js').all(test, common) -} diff --git a/tests/mega-stress-test.js b/tests/mega-stress-test.js deleted file mode 100644 index a073985..0000000 --- a/tests/mega-stress-test.js +++ /dev/null @@ -1,55 +0,0 @@ -var streamPair = require('stream-pair') - -module.exports.all = function (test, common) { - test('10000 messages of 10000 streams', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 10000, 10000) - }) - }) -} - -function spawnGeneration (t, muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) { - t.plan(1 + (5 * nStreams) + (nStreams * nMsg)) - - var msg = !size ? 'simple msg' : 'make the msg bigger' - - var listener = muxer(listenerSocket, true) - var dialer = muxer(dialerSocket, false) - - listener.on('stream', function (stream) { - t.pass('Incoming stream') - - stream.on('data', function (chunk) { - t.pass('Received message') - }) - - stream.on('end', function () { - t.pass('Stream ended on Listener') - stream.end() - }) - }) - - for (var i = 0; i < nStreams; i++) { - dialer.newStream(function (err, stream) { - t.ifError(err, 'Should not throw') - t.pass('Dialed stream') - - for (var j = 0; j < nMsg; j++) { - stream.write(msg) - } - - stream.on('data', function (chunk) { - t.fail('Should not happen') - }) - - stream.on('end', function () { - t.pass('Stream ended on Dialer') - }) - - stream.end() - }) - } -} diff --git a/tests/stress-test.js b/tests/stress-test.js deleted file mode 100644 index c96e57d..0000000 --- a/tests/stress-test.js +++ /dev/null @@ -1,153 +0,0 @@ -var streamPair = require('stream-pair') - -module.exports.all = function (test, common) { - test('1 stream with 1 msg', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 1, 1) - }) - }) - - test('1 stream with 10 msg', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 1, 10) - }) - }) - - test('1 stream with 100 msg', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 1, 100) - }) - }) - - test('10 stream with 1 msg', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 10, 1) - }) - }) - - test('10 stream with 10 msg', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 10, 10) - }) - }) - - test('10 stream with 100 msg', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 10, 10) - }) - }) - - test('100 stream with 1 msg', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 100, 1) - }) - }) - - test('100 stream with 10 msg', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 100, 10) - }) - }) - - test('100 stream with 100 msg', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 100, 10) - }) - }) - - test('1000 stream with 1 msg', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 1000, 1) - }) - }) - - test('1000 stream with 10 msg', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 1000, 10) - }) - }) - - test('1000 stream with 100 msg', function (t) { - common.setup(test, function (err, muxer) { - t.ifError(err, 'should not throw') - var pair = streamPair.create() - - spawnGeneration(t, muxer, pair, pair.other, 1000, 100) - }) - }) -} - -function spawnGeneration (t, muxer, dialerSocket, listenerSocket, nStreams, nMsg, size) { - t.plan(1 + (5 * nStreams) + (nStreams * nMsg)) - - var msg = !size ? 'simple msg' : 'make the msg bigger' - - var listener = muxer(listenerSocket, true) - var dialer = muxer(dialerSocket, false) - - listener.on('stream', function (stream) { - t.pass('Incoming stream') - stream.on('data', function (chunk) { - t.pass('Received message') - }) - - stream.on('end', function () { - t.pass('Stream ended on Listener') - stream.end() - }) - }) - - for (var i = 0; i < nStreams; i++) { - dialer.newStream(function (err, stream) { - t.ifError(err, 'Should not throw') - t.pass('Dialed stream') - - for (var j = 0; j < nMsg; j++) { - stream.write(msg) - } - - stream.on('data', function (chunk) { - t.fail('Should not happen') - }) - - stream.on('end', function () { - t.pass('Stream ended on Dialer') - }) - - stream.end() - }) - } -}