diff --git a/.gitignore b/.gitignore index fb8d1c9..aadc853 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,4 @@ build node_modules dist +package-lock.json diff --git a/.travis.yml b/.travis.yml index bec3772..f38be03 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,7 +10,6 @@ jobs: include: - stage: check script: - - npx aegir commitlint --travis - npx aegir dep-check - npm run lint diff --git a/README.md b/README.md index e01d9b0..59bd9e0 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ -interface-stream-muxer -===================== +# interface-stream-muxer [![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://protocol.ai) [![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) @@ -15,7 +14,7 @@ The primary goal of this module is to enable developers to pick and swap their s Publishing a test suite as a module lets multiple modules all ensure compatibility since they use the same test suite. -The API is presented with both Node.js and Go primitives, however, there is not actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks. +The API is presented with both Node.js and Go primitives, however, there is no actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through different stacks. ## Lead Maintainer @@ -37,7 +36,7 @@ Include this badge in your readme if you make a new module that uses interface-s ## Usage -### Node.js +### JS Install `interface-stream-muxer` as one of the dependencies of your project and as a test file. Then, using `mocha` (for JavaScript) or a test runner with compatible API, do: @@ -45,11 +44,11 @@ Install `interface-stream-muxer` as one of the dependencies of your project and const test = require('interface-stream-muxer') const common = { - setup (cb) { - cb(null, yourMuxer) + async setup () { + return yourMuxer }, - teardown (cb) { - cb() + async teardown () { + // cleanup } } @@ -63,12 +62,91 @@ test(common) ## API -A valid (read: that follows this abstraction) stream muxer, must implement the following API. +### JS -### Attach muxer to a Connection +A valid (one that follows this abstraction) stream muxer, must implement the following API: -- `JavaScript` muxedConn = muxer(conn, isListener) -- `Go` muxedConn, err := muxer.Attach(conn, isListener) +#### `const muxer = new Muxer([options])` + +Create a new _duplex_ stream that can be piped together with a connection in order to allow multiplexed communications. + +e.g. + +```js +const Muxer = require('your-muxer-module') +const pipe = require('it-pipe') + +// Create a duplex muxer +const muxer = new Muxer() + +// Use the muxer in a pipeline +pipe(conn, muxer, conn) // conn is duplex connection to another peer +``` + +`options` is an optional `Object` that may have the following properties: + +* `onStream` - A function called when receiving a new stream from the remote. e.g. + ```js + // Receive a new stream on the muxed connection + const onStream = stream => { + // Read from this stream and write back to it (echo server) + pipe( + stream, + source => (async function * () { + for await (const data of source) yield data + })() + stream + ) + } + const muxer = new Muxer({ onStream }) + // ... + ``` + **Note:** The `onStream` function can be passed in place of the `options` object. i.e. + ```js + new Mplex(stream => { /* ... */ }) + ``` +* `signal` - An [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) which can be used to abort the muxer, _including_ all of it's multiplexed connections. e.g. + ```js + const controller = new AbortController() + const muxer = new Muxer({ signal: controller.signal }) + + pipe(conn, muxer, conn) + + controller.abort() + ``` +* `maxMsgSize` - The maximum size in bytes the data field of multiplexed messages may contain (default 1MB) + +#### `muxer.onStream` + +Use this property as an alternative to passing `onStream` as an option to the `Muxer` constructor. + +```js +const muxer = new Muxer() +// ...later +muxer.onStream = stream => { /* ... */ } +``` + +#### `const stream = muxer.newStream([options])` + +Initiate a new stream with the remote. Returns a [duplex stream](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#duplex-it). + +e.g. + +```js +// Create a new stream on the muxed connection +const stream = muxer.newStream() + +// Use this new stream like any other duplex stream: +pipe([1, 2, 3], stream, consume) +``` + +### Go + +#### Attach muxer to a Connection + +```go +muxedConn, err := muxer.Attach(conn, isListener) +``` This method attaches our stream muxer to an instance of [Connection](https://github.com/libp2p/interface-connection/blob/master/src/connection.js) defined by [interface-connection](https://github.com/libp2p/interface-connection). @@ -78,22 +156,22 @@ If `err` is passed, no operation should be made in `conn`. `muxedConn` interfaces our established Connection with the other endpoint, it must offer an interface to open a stream inside this connection and to receive incomming stream requests. -### Dial(open/create) a new stream +#### Dial(open/create) a new stream -- `JavaScript` stream = muxedConn.newStream([function (err, stream)]) -- `Go` stream, err := muxedConn.newStream() +```go +stream, err := muxedConn.newStream() +``` This method negotiates and opens a new stream with the other endpoint. If `err` is passed, no operation should be made in `stream`. -`stream` interface our established Stream with the other endpoint, it must implement the [Duplex pull-stream interface](https://pull-stream.github.io) in JavaScript or the [ReadWriteCloser](http://golang.org/pkg/io/#ReadWriteCloser) in Go. +`stream` interface our established Stream with the other endpoint, it must implement the [ReadWriteCloser](http://golang.org/pkg/io/#ReadWriteCloser). -### Listen(wait/accept) a new incoming stream +#### Listen(wait/accept) a new incoming stream -- `JavaScript` muxedConn.on('stream', function (stream) {}) -- `Go` stream := muxedConn.Accept() +```go +stream := muxedConn.Accept() +``` Each time a dialing peer initiates the new stream handshake, a new stream is created on the listening side. - -In JavaScript, the Event Emitter pattern is expected to be used in order to receive new incoming streams, while in Go, it expects to wait when Accept is called. diff --git a/package.json b/package.json index b9b69f3..2d932ac 100644 --- a/package.json +++ b/package.json @@ -33,18 +33,20 @@ }, "homepage": "https://github.com/libp2p/interface-stream-muxer", "dependencies": { - "async": "^2.6.2", + "abort-controller": "^3.0.0", + "abortable-iterator": "^2.1.0", "chai": "^4.2.0", "chai-checkmark": "^1.0.1", "detect-node": "^2.0.4", - "libp2p-tcp": "~0.13.0", - "multiaddr": "^6.0.6", - "pull-generate": "^2.2.0", - "pull-pair": "^1.1.0", - "pull-stream": "^3.6.9" + "it-pair": "^1.0.0", + "it-pipe": "^1.0.1", + "libp2p-tcp": "^0.14.0", + "multiaddr": "^7.1.0", + "p-limit": "^2.2.0", + "streaming-iterables": "^4.1.0" }, "devDependencies": { - "aegir": "^18.2.2" + "aegir": "^20.0.0" }, "contributors": [ "David Dias ", diff --git a/src/base-test.js b/src/base-test.js index 7f50144..01a0e02 100644 --- a/src/base-test.js +++ b/src/base-test.js @@ -3,145 +3,113 @@ const chai = require('chai') chai.use(require('chai-checkmark')) -const expect = chai.expect -const pair = require('pull-pair/duplex') -const pull = require('pull-stream') - -function closeAndWait (stream) { - pull( - pull.empty(), - stream, - pull.onEnd((err) => { - expect(err).to.not.exist.mark() - }) - ) +const { expect } = chai +const pair = require('it-pair/duplex') +const pipe = require('it-pipe') +const { collect, map, consume } = require('streaming-iterables') + +async function closeAndWait (stream) { + await pipe([], stream, consume) + expect(true).to.be.true.mark() } module.exports = (common) => { describe('base', () => { - let muxer + let Muxer - beforeEach((done) => { - common.setup((err, _muxer) => { - if (err) return done(err) - muxer = _muxer - done() - }) + beforeEach(async () => { + Muxer = await common.setup() }) it('Open a stream from the dialer', (done) => { const p = pair() - const dialer = muxer.dialer(p[0]) - const listener = muxer.listener(p[1]) + const dialer = new Muxer() - expect(4).checks(done) - - listener.on('stream', (stream) => { + const listener = new Muxer(stream => { expect(stream).to.exist.mark() closeAndWait(stream) }) - const conn = dialer.newStream((err) => { - expect(err).to.not.exist.mark() - }) + pipe(p[0], dialer, p[0]) + pipe(p[1], listener, p[1]) + + expect(3).checks(done) + + const conn = dialer.newStream() closeAndWait(conn) }) it('Open a stream from the listener', (done) => { const p = pair() - const dialer = muxer.dialer(p[0]) - const listener = muxer.listener(p[1]) - - expect(4).check(done) - - dialer.on('stream', (stream) => { + const dialer = new Muxer(stream => { expect(stream).to.exist.mark() closeAndWait(stream) }) + const listener = new Muxer() - const conn = listener.newStream((err) => { - expect(err).to.not.exist.mark() - }) + pipe(p[0], dialer, p[0]) + pipe(p[1], listener, p[1]) + + expect(3).check(done) + + const conn = listener.newStream() closeAndWait(conn) }) it('Open a stream on both sides', (done) => { const p = pair() - const dialer = muxer.dialer(p[0]) - const listener = muxer.listener(p[1]) - - expect(8).check(done) - - dialer.on('stream', (stream) => { + const dialer = new Muxer(stream => { expect(stream).to.exist.mark() closeAndWait(stream) }) - - const listenerConn = listener.newStream((err) => { - expect(err).to.not.exist.mark() - }) - - listener.on('stream', (stream) => { + const listener = new Muxer(stream => { expect(stream).to.exist.mark() closeAndWait(stream) }) - const dialerConn = dialer.newStream((err) => { - expect(err).to.not.exist.mark() - }) + pipe(p[0], dialer, p[0]) + pipe(p[1], listener, p[1]) + + expect(6).check(done) + + const listenerConn = listener.newStream() + const dialerConn = dialer.newStream() closeAndWait(dialerConn) closeAndWait(listenerConn) }) - it('Open a stream on one side, write, open a stream in the other side', (done) => { + it('Open a stream on one side, write, open a stream on the other side', (done) => { + const toString = map(c => c.slice().toString()) const p = pair() - const dialer = muxer.dialer(p[0]) - const listener = muxer.listener(p[1]) - - expect(6).check(done) - - const dialerConn = dialer.newStream((err) => { - expect(err).to.not.exist.mark() - }) + const dialer = new Muxer() + const listener = new Muxer(stream => { + pipe(stream, toString, collect).then(chunks => { + expect(chunks).to.be.eql(['hey']).mark() + }) - listener.on('stream', (stream) => { - pull( - stream, - pull.collect((err, chunks) => { - expect(err).to.not.exist.mark() - expect(chunks).to.be.eql([Buffer.from('hey')]).mark() - }) - ) + dialer.onStream = onDialerStream - dialer.on('stream', onDialerStream) + const listenerConn = listener.newStream() - const listenerConn = listener.newStream((err) => { - expect(err).to.not.exist.mark() - }) + pipe(['hello'], listenerConn) - pull( - pull.values(['hello']), - listenerConn - ) - - function onDialerStream (stream) { - pull( - stream, - pull.collect((err, chunks) => { - expect(err).to.not.exist.mark() - expect(chunks).to.be.eql([Buffer.from('hello')]).mark() - }) - ) + async function onDialerStream (stream) { + const chunks = await pipe(stream, toString, collect) + expect(chunks).to.be.eql(['hello']).mark() } }) - pull( - pull.values(['hey']), - dialerConn - ) + pipe(p[0], dialer, p[0]) + pipe(p[1], listener, p[1]) + + expect(2).check(done) + + const dialerConn = dialer.newStream() + + pipe(['hey'], dialerConn) }) }) } diff --git a/src/close-test.js b/src/close-test.js index 2a64159..9068aca 100644 --- a/src/close-test.js +++ b/src/close-test.js @@ -2,157 +2,117 @@ /* eslint max-nested-callbacks: ["error", 8] */ 'use strict' -const chai = require('chai') -chai.use(require('chai-checkmark')) -const expect = chai.expect -const pair = require('pull-pair/duplex') -const pull = require('pull-stream') -const parallel = require('async/parallel') -const series = require('async/series') +const pair = require('it-pair/duplex') +const pipe = require('it-pipe') +const { consume } = require('streaming-iterables') const Tcp = require('libp2p-tcp') const multiaddr = require('multiaddr') +const abortable = require('abortable-iterator') +const AbortController = require('abort-controller') -const mh = multiaddr('/ip4/127.0.0.1/tcp/10000') +const mh = multiaddr('/ip4/127.0.0.1/tcp/0') -function closeAndWait (stream, callback) { - pull( - pull.empty(), - stream, - pull.onEnd(callback) - ) +function pause (ms) { + return new Promise(resolve => setTimeout(resolve, ms)) +} + +function randomBuffer () { + return Buffer.from(Math.random().toString()) +} + +const infiniteRandom = { + [Symbol.asyncIterator]: async function * () { + while (true) { + yield randomBuffer() + await pause(10) + } + } } module.exports = (common) => { describe('close', () => { - let muxer + let Muxer - beforeEach((done) => { - common.setup((err, _muxer) => { - if (err) return done(err) - muxer = _muxer - done() - }) + beforeEach(async () => { + Muxer = await common.setup() }) - it('closing underlying socket closes streams (tcp)', (done) => { - const tcp = new Tcp() - const tcpListener = tcp.createListener((conn) => { - const listener = muxer.listener(conn) - listener.on('stream', (stream) => { - pull(stream, stream) - }) + it('closing underlying socket closes streams (tcp)', async () => { + const mockConn = muxer => ({ + newStream: (...args) => muxer.newStream(...args) }) - // Wait for the streams to open - expect(2).checks(() => { - // Once everything is closed, we're done - expect(3).checks(done) - tcpListener.close((err) => { - expect(err).to.not.exist.mark() - }) - }) + const mockUpgrade = () => maConn => { + const muxer = new Muxer(stream => pipe(stream, stream)) + pipe(maConn, muxer, maConn) + return mockConn(muxer) + } - tcpListener.listen(mh, () => { - const dialerConn = tcp.dial(mh, () => { - const dialerMuxer = muxer.dialer(dialerConn) - const s1 = dialerMuxer.newStream((err) => { - expect(err).to.not.exist.mark() - pull( - s1, - pull.onEnd((err) => { - expect(err).to.exist.mark() - }) - ) - }) - - const s2 = dialerMuxer.newStream((err) => { - expect(err).to.not.exist.mark() - pull( - s2, - pull.onEnd((err) => { - expect(err).to.exist.mark() - }) - ) - }) - }) + const mockUpgrader = () => ({ + upgradeInbound: mockUpgrade(), + upgradeOutbound: mockUpgrade() }) - }) - it('closing one of the muxed streams doesn\'t close others', (done) => { - const p = pair() - const dialer = muxer.dialer(p[0]) - const listener = muxer.listener(p[1]) + const tcp = new Tcp({ upgrader: mockUpgrader() }) + const tcpListener = tcp.createListener() - expect(6).checks(done) + await tcpListener.listen(mh) + const dialerConn = await tcp.dial(tcpListener.getAddrs()[0]) - const conns = [] + const s1 = await dialerConn.newStream() + const s2 = await dialerConn.newStream() - listener.on('stream', (stream) => { - expect(stream).to.exist.mark() - pull(stream, stream) - }) + // close the listener in a bit + setTimeout(() => tcpListener.close(), 50) - for (let i = 0; i < 5; i++) { - conns.push(dialer.newStream()) - } + const s1Result = pipe(infiniteRandom, s1, consume) + const s2Result = pipe(infiniteRandom, s2, consume) - conns.forEach((conn, i) => { - if (i === 1) { - closeAndWait(conn, (err) => { - expect(err).to.not.exist.mark() - }) - } else { - pull( - conn, - pull.onEnd(() => { - throw new Error('should not end') - }) - ) - } - }) + // test is complete when all muxed streams have closed + await s1Result + await s2Result }) - it.skip('closing on spdy doesn\'t close until all the streams that are being muxed are closed', (done) => { + it('closing one of the muxed streams doesn\'t close others', async () => { const p = pair() - const dialer = muxer.dial(p[0]) - const listener = muxer.listen(p[1]) + const dialer = new Muxer() - expect(15).checks(done) + // Listener is echo server :) + const listener = new Muxer(stream => pipe(stream, stream)) - const conns = [] - const count = [] - for (let i = 0; i < 5; i++) { - count.push(i) - } + pipe(p[0], dialer, p[0]) + pipe(p[1], listener, p[1]) + + const stream = dialer.newStream() + const streams = Array.from(Array(5), () => dialer.newStream()) + let closed = false + const controllers = [] + + const streamResults = streams.map(async stream => { + const controller = new AbortController() + controllers.push(controller) - series(count.map((i) => (cb) => { - parallel([ - (cb) => listener.once('stream', (stream) => { - expect(stream).to.exist.mark() - pull(stream, stream) - cb() - }), - (cb) => conns.push(dialer.newStream(cb)) - ], cb) - }), (err) => { - if (err) return done(err) - - conns.forEach((conn, i) => { - pull( - pull.values([Buffer.from('hello')]), - pull.asyncMap((val, cb) => { - setTimeout(() => { - cb(null, val) - }, i * 10) - }), - conn, - pull.collect((err, data) => { - expect(err).to.not.exist.mark() - expect(data).to.be.eql([Buffer.from('hello')]).mark() - }) - ) - }) + try { + const abortableRand = abortable(infiniteRandom, controller.signal, { abortCode: 'ERR_TEST_ABORT' }) + await pipe(abortableRand, stream, consume) + } catch (err) { + if (err.code !== 'ERR_TEST_ABORT') throw err + } + + if (!closed) throw new Error('stream should not have ended yet!') }) + + // Pause, and then send some data and close the first stream + await pause(50) + await pipe([randomBuffer()], stream, consume) + closed = true + + // Abort all the other streams later + await pause(50) + controllers.forEach(c => c.abort()) + + // These should now all resolve without error + await Promise.all(streamResults) }) }) } diff --git a/src/mega-stress-test.js b/src/mega-stress-test.js index be5a74e..b012ea1 100644 --- a/src/mega-stress-test.js +++ b/src/mega-stress-test.js @@ -6,18 +6,12 @@ const spawn = require('./spawner') module.exports = (common) => { describe.skip('mega stress test', function () { this.timeout(100 * 200 * 1000) - let muxer + let Muxer - beforeEach((done) => { - common.setup((err, _muxer) => { - if (err) return done(err) - muxer = _muxer - done() - }) + beforeEach(async () => { + Muxer = await common.setup() }) - it('10000 messages of 10000 streams', (done) => { - spawn(muxer, 10000, 10000, done, 5000) - }) + it('10,000 streams with 10,000 msg', () => spawn(Muxer, 10000, 10000, 5000)) }) } diff --git a/src/spawner.js b/src/spawner.js index 577349e..cf8b787 100644 --- a/src/spawner.js +++ b/src/spawner.js @@ -1,92 +1,82 @@ 'use strict' -const expect = require('chai').expect +const { expect } = require('chai') +const pair = require('it-pair/duplex') +const pipe = require('it-pipe') +const pLimit = require('p-limit') +const { collect, tap, consume } = require('streaming-iterables') -const pair = require('pull-pair/duplex') -const pull = require('pull-stream') -const generate = require('pull-generate') -const each = require('async/each') -const eachLimit = require('async/eachLimit') -const setImmediate = require('async/setImmediate') +module.exports = async (Muxer, nStreams, nMsg, limit) => { + const [dialerSocket, listenerSocket] = pair() + const { check, done } = marker((4 * nStreams) + (nStreams * nMsg)) -module.exports = (muxer, nStreams, nMsg, done, limit) => { - const p = pair() - const dialerSocket = p[0] - const listenerSocket = p[1] + const msg = 'simple msg' - const check = marker((6 * nStreams) + (nStreams * nMsg), done) + const listener = new Muxer(async stream => { + expect(stream).to.exist // eslint-disable-line + check() - const msg = 'simple msg' + await pipe( + stream, + tap(chunk => check()), + consume + ) + + check() + pipe([], stream) + }) + + const dialer = new Muxer() - const listener = muxer.listener(listenerSocket) - const dialer = muxer.dialer(dialerSocket) + pipe(listenerSocket, listener, listenerSocket) + pipe(dialerSocket, dialer, dialerSocket) - listener.on('stream', (stream) => { + const spawnStream = async n => { + const stream = dialer.newStream() expect(stream).to.exist // eslint-disable-line check() - pull( + + const res = await pipe( + (function * () { + for (let i = 0; i < nMsg; i++) { + // console.log('n', n, 'msg', i) + yield new Promise(resolve => resolve(msg)) + } + })(), stream, - pull.through((chunk) => { - expect(chunk).to.exist // eslint-disable-line - check() - }), - pull.onEnd((err) => { - expect(err).to.not.exist // eslint-disable-line - check() - pull(pull.empty(), stream) - }) + collect ) - }) - const numbers = [] - for (let i = 0; i < nStreams; i++) { - numbers.push(i) + expect(res).to.be.eql([]) + check() } - const spawnStream = (n, cb) => { - const stream = dialer.newStream((err) => { - expect(err).to.not.exist // eslint-disable-line - check() - expect(stream).to.exist // eslint-disable-line - check() - pull( - generate(0, (s, cb) => { - setImmediate(() => { - cb(s === nMsg ? true : null, msg, s + 1) - }) - }), - stream, - pull.collect((err, res) => { - expect(err).to.not.exist // eslint-disable-line - check() - expect(res).to.be.eql([]) - check() - cb() - }) - ) - }) - } + const limiter = pLimit(limit || Infinity) - if (limit) { - eachLimit(numbers, limit, spawnStream, () => {}) - } else { - each(numbers, spawnStream, () => {}) - } + await Promise.all( + Array.from(Array(nStreams), (_, i) => limiter(() => spawnStream(i))) + ) + + return done } -function marker (n, done) { +function marker (n) { + let check let i = 0 - return (err) => { - i++ + const done = new Promise((resolve, reject) => { + check = err => { + i++ - if (err) { - /* eslint-disable-next-line */ - console.error('Failed after %s iterations', i) - return done(err) - } + if (err) { + /* eslint-disable-next-line */ + console.error('Failed after %s iterations', i) + return reject(err) + } - if (i === n) { - done() + if (i === n) { + resolve() + } } - } + }) + return { check, done } } diff --git a/src/stress-test.js b/src/stress-test.js index 6a1125e..f32307b 100644 --- a/src/stress-test.js +++ b/src/stress-test.js @@ -5,63 +5,26 @@ const spawn = require('./spawner') module.exports = (common) => { describe('stress test', () => { - let muxer - - beforeEach((done) => { - common.setup((err, _muxer) => { - if (err) return done(err) - muxer = _muxer - done() - }) - }) - - it('1 stream with 1 msg', (done) => { - spawn(muxer, 1, 1, done) - }) - - it('1 stream with 10 msg', (done) => { - spawn(muxer, 1, 10, done) - }) - - it('1 stream with 100 msg', (done) => { - spawn(muxer, 1, 100, done) - }) - - it('10 streams with 1 msg', (done) => { - spawn(muxer, 10, 1, done) - }) - - it('10 streams with 10 msg', (done) => { - spawn(muxer, 10, 10, done) - }) - - it('10 streams with 100 msg', (done) => { - spawn(muxer, 10, 100, done) - }) - - it('100 streams with 1 msg', (done) => { - spawn(muxer, 100, 1, done) - }) - - it('100 streams with 10 msg', (done) => { - spawn(muxer, 100, 10, done) - }) - - it('100 streams with 100 msg', (done) => { - spawn(muxer, 100, 100, done) - }) - - it('1000 streams with 1 msg', (done) => { - spawn(muxer, 1000, 1, done) - }) - - it('1000 streams with 10 msg', (done) => { - spawn(muxer, 1000, 10, done) - }) - - it('1000 streams with 100 msg', function (done) { - this.timeout(80 * 1000) - spawn(muxer, 1000, 100, done) + let Muxer + + beforeEach(async () => { + Muxer = await common.setup() + }) + + it('1 stream with 1 msg', () => spawn(Muxer, 1, 1)) + it('1 stream with 10 msg', () => spawn(Muxer, 1, 10)) + it('1 stream with 100 msg', () => spawn(Muxer, 1, 100)) + it('10 streams with 1 msg', () => spawn(Muxer, 10, 1)) + it('10 streams with 10 msg', () => spawn(Muxer, 10, 10)) + it('10 streams with 100 msg', () => spawn(Muxer, 10, 100)) + it('100 streams with 1 msg', () => spawn(Muxer, 100, 1)) + it('100 streams with 10 msg', () => spawn(Muxer, 100, 10)) + it('100 streams with 100 msg', () => spawn(Muxer, 100, 100)) + it('1000 streams with 1 msg', () => spawn(Muxer, 1000, 1)) + it('1000 streams with 10 msg', () => spawn(Muxer, 1000, 10)) + it('1000 streams with 100 msg', function () { + this.timeout(30 * 1000) + return spawn(Muxer, 1000, 100) }) }) }