Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

[WIP] Pull-streams #7

Merged
merged 5 commits into from
Sep 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
**/node_modules/
**/*.log
test/repo-tests*

# Logs
logs
*.log

coverage

# Runtime data
pids
*.pid
Expand All @@ -19,9 +25,11 @@ coverage
# node-waf configuration
.lock-wscript

# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release
build

# Dependency directory
# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git
node_modules

lib
dist
34 changes: 34 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
**/node_modules/
**/*.log
test/repo-tests*

# Logs
logs
*.log

coverage

# Runtime data
pids
*.pid
*.seed

# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov

# Coverage directory used by tools like istanbul
coverage

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt

# node-waf configuration
.lock-wscript

build

# Dependency directory
# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git
node_modules

test
10 changes: 10 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
sudo: false
language: node_js
node_js:
- "stable"

before_install:
- npm install -g npm

script:
- npm run lint
57 changes: 28 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ interface-stream-muxer
=====================

[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![Travis CI](https://travis-ci.org/ipfs/interface-stream-muxer.svg?branch=master)](https://travis-ci.org/ipfs/interface-stream-muxer)
[![Dependency Status](https://david-dm.org/ipfs/interface-stream-muxer.svg?style=flat-square)](https://david-dm.org/ipfs/interface-stream-muxer) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)

> A test suite and interface you can use to implement a stream muxer. "A one stop shop for all your muxing needs"

Expand All @@ -12,57 +15,55 @@ Publishing a test suite as a module lets multiple modules all ensure compatibili

The API is presented with both Node.js and Go primitives, however, there is not actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks.

# Modules that implement the interface
## Modules that implement the interface

- [JavaScript libp2p-spdy](https://github.com/diasdavid/js-libp2p-spdy)
- [JavaScript libp2p-multiplex](https://github.com/diasdavid/js-libp2p-multiplex)
- [JavaScript libp2p-spdy](https://github.com/libp2p/js-libp2p-spdy)
- [Go spdy, muxado, yamux and multiplex](https://github.com/jbenet/go-stream-muxer)

Send a PR to add a new one if you happen to find or write one.

# Badge
## Badge

Include this badge in your readme if you make a new module that uses interface-stream-muxer API.

![](/img/badge.png)

# How to use the battery tests
## Usage

## Node.js
### Node.js

Install interface-stream-muxer as one of the dependencies of your project and as a test file, using `tap`, `tape` or a test runner with compatible API, do:
Install `interface-stream-muxer` as one of the dependencies of your project and as a test file. Then, using `mocha` (for JavaScript) or a test runner with compatible API, do:

```
var tape = require('tape')
var tests = require('interface-stream-muxer/tests')
var yourStreamMuxer = require('../src')
```js
const test = require('interface-stream-muxer')

var common = {
setup: function (t, cb) {
cb(null, yourStreamMuxer)
const common = {
setup (cb) {
cb(null, yourMuxer)
},
teardown: function (t, cb) {
teardown (cb) {
cb()
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚲🏚👮🏽


tests(tape, common)
// use all of the test suits
test(common)
```

## Go
### Go

> WIP - being written
> WIP

# API
## API

A valid (read: that follows this abstraction) stream muxer, must implement the following API.

### Attach muxer to a transport
### Attach muxer to a Connection

- `Node.js` muxedConn = muxer(transport, isListener)
- `Go` muxedConn, err := muxer.Attach(transport, isListener)
- `JavaScript` muxedConn = muxer(conn, isListener)
- `Go` muxedConn, err := muxer.Attach(conn, isListener)

This method attaches our stream muxer to the desired transport (UDP, TCP) and returns/callbacks with the `err, conn`(error, connection).
This method attaches our stream muxer to an instance of [Connection](https://github.com/libp2p/interface-connection/blob/master/src/connection.js) defined by [interface-connection](https://github.com/libp2p/interface-connection).

If `err` is passed, no operation should be made in `conn`.

Expand All @@ -73,22 +74,20 @@ If `err` is passed, no operation should be made in `conn`.
### Dial(open/create) a new stream


- `Node.js` stream = muxedConn.newStream([function (err, stream)])
- `JavaScript` stream = muxedConn.newStream([function (err, stream)])
- `Go` stream, err := muxedConn.newStream()

This method negotiates and opens a new stream with the other endpoint.

If `err` is passed, no operation should be made in `stream`.

`stream` interface our established Stream with the other endpoint, it must implement the [Duplex Stream interface](https://nodejs.org/api/stream.html#stream_class_stream_duplex) in Node.js or the [ReadWriteCloser](http://golang.org/pkg/io/#ReadWriteCloser) in Go.

In the Node.js case, if no callback is passed, stream will emit an 'ready' event when it is prepared or a 'error' event if it fails to establish the connection, until then, it will buffer the 'write' calls.
`stream` interface our established Stream with the other endpoint, it must implement the [Duplex pull-stream interface](https://pull-stream.github.io) in JavaScript or the [ReadWriteCloser](http://golang.org/pkg/io/#ReadWriteCloser) in Go.

### Listen(wait/accept) a new incoming stream

- `Node.js` muxedConn.on('stream', function (stream) {})
- `JavaScript` muxedConn.on('stream', function (stream) {})
- `Go` stream := muxedConn.Accept()

Each time a dialing peer initiates the new stream handshake, a new stream is created on the listening side.

In Node.js, the Event Emitter pattern is expected to be used in order to receive new incoming streams, while in Go, it expects to wait when Accept is called.
In JavaScript, the Event Emitter pattern is expected to be used in order to receive new incoming streams, while in Go, it expects to wait when Accept is called.
38 changes: 27 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,47 @@
"name": "interface-stream-muxer",
"version": "0.3.1",
"description": "A test suite and interface you can use to implement a stream muxer.",
"main": "tests/index.js",
"directories": {
"test": "tests"
},
"main": "lib/index.js",
"jsnext:main": "src/index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
"test": "exit(0)",
"build": "aegir-build --env node",
"lint": "aegir-lint",
"release": "aegir-release --env node",
"release-minor": "aegir-release --env node --type minor",
"release-major": "aegir-release --env node --type major",
"coverage": "exit(0)",
"coverage-publish": "exit(0)"
},
"repository": {
"type": "git",
"url": "https://github.com/diasdavid/interface-stream-muxer.git"
"url": "https://github.com/libp2p/interface-stream-muxer.git"
},
"keywords": [
"Streams",
"Muxer",
"interface",
"Interface"
],
"author": "David Dias <daviddias@ipfs.io>",
"license": "MIT",
"bugs": {
"url": "https://github.com/diasdavid/interface-stream-muxer/issues"
"url": "https://github.com/libp2p/interface-stream-muxer/issues"
},
"homepage": "https://github.com/diasdavid/interface-stream-muxer",
"homepage": "https://github.com/libp2p/interface-stream-muxer",
"dependencies": {
"stream-pair": "^1.0.3",
"timed-tape": "^0.1.1"
"async": "^2.0.1",
"chai": "^3.5.0",
"chai-checkmark": "^1.0.1",
"detect-node": "^2.0.3",
"libp2p-tcp": "^0.8.1",
"multiaddr": "^2.0.2",
"pull-generate": "^2.2.0",
"pull-pair": "^1.1.0",
"pull-stream": "^3.4.3",
"run-parallel": "^1.1.6",
"run-series": "^1.1.4"
},
"devDependencies": {
"aegir": "^8.0.0"
}
}
146 changes: 146 additions & 0 deletions src/base-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/* eslint-env mocha */
'use strict'

const chai = require('chai')
chai.use(require('chai-checkmark'))
const expect = chai.expect
const pair = require('pull-pair/duplex')
const pull = require('pull-stream')

function closeAndWait (stream) {
pull(
pull.empty(),
stream,
pull.onEnd((err) => {
expect(err).to.not.exist.mark()
})
)
}

module.exports = (common) => {
describe('base', () => {
let muxer

beforeEach((done) => {
common.setup((err, _muxer) => {
if (err) return done(err)
muxer = _muxer
done()
})
})

it('Open a stream from the dialer', (done) => {
const p = pair()
const dialer = muxer.dialer(p[0])
const listener = muxer.listener(p[1])

expect(4).checks(done)

listener.on('stream', (stream) => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})

const conn = dialer.newStream((err) => {
expect(err).to.not.exist.mark()
})

closeAndWait(conn)
})

it('Open a stream from the listener', (done) => {
const p = pair()
const dialer = muxer.dialer(p[0])
const listener = muxer.listener(p[1])

expect(4).check(done)

dialer.on('stream', (stream) => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})

const conn = listener.newStream((err) => {
expect(err).to.not.exist.mark()
})

closeAndWait(conn)
})

it('Open a stream on both sides', (done) => {
const p = pair()
const dialer = muxer.dialer(p[0])
const listener = muxer.listener(p[1])

expect(8).check(done)

dialer.on('stream', (stream) => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})

const listenerConn = listener.newStream((err) => {
expect(err).to.not.exist.mark()
})

listener.on('stream', (stream) => {
expect(stream).to.exist.mark()
closeAndWait(stream)
})

const dialerConn = dialer.newStream((err) => {
expect(err).to.not.exist.mark()
})

closeAndWait(dialerConn)
closeAndWait(listenerConn)
})

it('Open a stream on one side, write, open a stream in the other side', (done) => {
const p = pair()
const dialer = muxer.dialer(p[0])
const listener = muxer.listener(p[1])

expect(6).check(done)

const dialerConn = dialer.newStream((err) => {
expect(err).to.not.exist.mark()
})

pull(
pull.values(['hey']),
dialerConn
)

listener.on('stream', (stream) => {
pull(
stream,
pull.collect((err, chunks) => {
expect(err).to.not.exist.mark()
expect(chunks).to.be.eql([Buffer('hey')]).mark()
})
)

const listenerConn = listener.newStream((err) => {
expect(err).to.not.exist.mark()
})

pull(
pull.values(['hello']),
listenerConn
)

dialer.on('stream', onDialerStream)
function onDialerStream (stream) {
pull(
stream,
pull.collect((err, chunks) => {
expect(err).to.not.exist.mark()
expect(chunks).to.be.eql([Buffer('hello')]).mark()
})
)
}
})
})
})
}
Loading