Skip to content
This repository has been archived by the owner on Oct 19, 2022. It is now read-only.

[WIP] pull-streams #19

Merged
merged 4 commits into from
Sep 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,33 @@ ms.ls(<callback>)

`callback` is a function of type `function (err, protocols)` where `err` is an error object that gets passed if something wrong happend and `protocols` is an array of the supported protocols in the other end.

### This module uses `pull-streams`

We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362).

You can learn more about pull-streams at:

- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ)
- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams)
- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple)
- [pull-streams documentation](https://pull-stream.github.io/)

#### Converting `pull-streams` to Node.js Streams

If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/dominictarr/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example:

```js
const pullToStream = require('pull-stream-to-stream')

const nodeStreamInstance = pullToStream(pullStreamInstance)
// nodeStreamInstance is an instance of a Node.js Stream
```

To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream.




## Maintainers

Captain: [@diasdavid](https://github.com/diasdavid).
Expand All @@ -161,4 +188,3 @@ Small note: If editing the Readme, please conform to the [standard-readme](https
## License

[MIT](LICENSE) © David Dias

26 changes: 15 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,28 @@
"author": "David Dias <daviddias@ipfs.io>",
"license": "MIT",
"dependencies": {
"babel-runtime": "^6.6.1",
"length-prefixed-stream": "^1.5.0",
"lodash.range": "^3.1.5",
"run-series": "^1.1.4",
"varint": "^4.0.0"
"babel-runtime": "^6.11.6",
"debug": "^2.2.0",
"interface-connection": "^0.2.1",
"lodash.isfunction": "^3.0.8",
"lodash.range": "^3.1.7",
"pull-handshake": "^1.1.3",
"pull-length-prefixed": "^1.1.0",
"pull-stream": "^3.4.3",
"varint": "^4.0.1"
},
"devDependencies": {
"aegir": "^3.1.0",
"run-parallel": "^1.1.6",
"bl": "^1.1.2",
"aegir": "^8.0.0",
"chai": "^3.5.0",
"pre-commit": "^1.1.2",
"stream-pair": "^1.0.3"
"pre-commit": "^1.1.3",
"pull-pair": "^1.1.0",
"run-parallel": "^1.1.6",
"run-series": "^1.1.4"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",
"David Dias <mail@daviddias.me>",
"Richard Littauer <richard.littauer@gmail.com>",
"dignifiedquire <dignifiedquire@gmail.com>"
]
}
}
100 changes: 100 additions & 0 deletions src/agreement.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
'use strict'

const handshake = require('pull-handshake')
const lp = require('pull-length-prefixed')
const pull = require('pull-stream')
const Connection = require('interface-connection').Connection
const debug = require('debug')
const log = debug('multistream:agreement')
log.error = debug('multistream:agreement:error')

exports.select = (multicodec, callback) => {
const stream = handshake({
timeout: 60 * 1000
}, callback)

const shake = stream.handshake

log('writing multicodec %s', multicodec)
writeEncoded(shake, new Buffer(multicodec + '\n'), callback)

lp.decodeFromReader(shake, (err, data) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a secret method ;)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it documented now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's released but there is still no api docs on pull-length-prefixed

if (err) {
return callback(err)
}
const protocol = data.toString().slice(0, -1)

if (protocol !== multicodec) {
return callback(new Error(`"${multicodec}" not supported`), shake.rest())
}

log('multicodec ack')
callback(null, shake.rest())
})

return stream
}

exports.handlerSelector = (rawConn, handlersMap) => {
const cb = (err) => {
// incoming errors are irrelevant for the app
log.error(err)
}

const stream = handshake({
timeout: 60 * 1000
}, cb)

const shake = stream.handshake

Copy link
Member

@daviddias daviddias Sep 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bug 🚨
The listener side is no longer acknowledging with a multistream multicodec, see spec here:

https://github.com/multiformats/multistream

the dialer side does the right thing https://github.com/multiformats/js-multistream/pull/19/files#r77424842

Detected this while testing interop with go

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed this further. It also expects that the dialer writes the multistream header first, while it should just write it away

next()

function next () {
lp.decodeFromReader(shake, (err, data) => {
if (err) {
return cb(err)
}
log('received: %s', data.toString())
const protocol = data.toString().slice(0, -1)
const result = Object.keys(handlersMap).filter((id) => id === protocol)
const key = result && result[0]

if (key) {
log('ack: %s', protocol)
writeEncoded(shake, data, cb)
handlersMap[key](new Connection(shake.rest(), rawConn))
} else {
log('received multicodec of not supported protocol: %s', protocol)
writeEncoded(shake, new Buffer('na\n'))
next()
}
})
}

return stream
}

// prefixes a message with a varint
function encode (msg, cb) {
const values = Buffer.isBuffer(msg) ? [msg] : [new Buffer(msg)]

pull(
pull.values(values),
lp.encode(),
pull.collect((err, encoded) => {
if (err) {
return cb(err)
}
cb(null, encoded[0])
})
)
}

function writeEncoded (writer, msg, cb) {
encode(msg, (err, msg) => {
if (err) {
return cb(err)
}
writer.write(msg)
})
}
5 changes: 5 additions & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
'use strict'

module.exports = {
PROTOCOL_ID: '/multistream/1.0.0'
}
128 changes: 75 additions & 53 deletions src/dialer.js
Original file line number Diff line number Diff line change
@@ -1,78 +1,100 @@
'use strict'

const lps = require('length-prefixed-stream')
const PROTOCOL_ID = require('./protocol-id')
const lp = require('pull-length-prefixed')
const varint = require('varint')
const range = require('lodash.range')
const series = require('run-series')
const pull = require('pull-stream')
const Connection = require('interface-connection').Connection
const debug = require('debug')
const log = debug('multistream:dialer')

exports = module.exports = Dialer
const PROTOCOL_ID = require('./constants').PROTOCOL_ID
const agrmt = require('./agreement')

function Dialer () {
if (!(this instanceof Dialer)) {
return new Dialer()
module.exports = class Dialer {
constructor () {
this.conn = null
}

const encode = lps.encode()
const decode = lps.decode()
let conn

// perform the multistream handshake
this.handle = (_conn, callback) => {
encode.pipe(_conn)
_conn.pipe(decode)

decode.once('data', (buffer) => {
const msg = buffer.toString().slice(0, -1)
if (msg === PROTOCOL_ID) {
encode.write(new Buffer(PROTOCOL_ID + '\n'))
conn = _conn
callback()
} else {
callback(new Error('Incompatible multistream'))
handle (rawConn, callback) {
log('handling connection')
const ms = agrmt.select(PROTOCOL_ID, (err, conn) => {
if (err) {
return callback(err)
}
log('handshake success')

this.conn = new Connection(conn, rawConn)

callback()
})
pull(rawConn, ms, rawConn)
}

this.select = (protocol, callback) => {
if (!conn) {
select (protocol, callback) {
log('dialer select %s', protocol)
if (!this.conn) {
return callback(new Error('multistream handshake has not finalized yet'))
}

encode.write(new Buffer(protocol + '\n'))
decode.once('data', function (msgBuffer) {
const msg = msgBuffer.toString().slice(0, -1)
if (msg === protocol) {
return callback(null, conn)
}
if (msg === 'na') {
return callback(new Error(protocol + ' not supported'))
const selectStream = agrmt.select(protocol, (err, conn) => {
if (err) {
this.conn = new Connection(conn, this.conn)
return callback(err)
}
callback(null, new Connection(conn, this.conn))
})

pull(
this.conn,
selectStream,
this.conn
)
}

this.ls = (callback) => {
encode.write(new Buffer('ls' + '\n'))
let protos = []
decode.once('data', function (msgBuffer) {
const size = varint.decode(msgBuffer) // eslint-disable-line
const nProtos = varint.decode(msgBuffer, varint.decode.bytes)

timesSeries(nProtos, (n, next) => {
decode.once('data', function (msgBuffer) {
protos.push(msgBuffer.toString().slice(0, -1))
next()
ls (callback) {
const lsStream = agrmt.select('ls', (err, conn) => {
if (err) {
return callback(err)
}

pull(
conn,
lp.decode(),
collectLs(conn),
pull.map(stringify),
pull.collect((err, list) => {
if (err) {
return callback(err)
}
callback(null, list.slice(1))
})
}, (err) => {
if (err) {
return callback(err)
}
callback(null, protos)
})
)
})

pull(
this.conn,
lsStream,
this.conn
)
}
}

function timesSeries (i, work, callback) {
series(range(i).map((i) => (callback) => work(i, callback)), callback)
function stringify (buf) {
return buf.toString().slice(0, -1)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a func to stringify a buffer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, got it it is for the pull stuff

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and for dropping the trailing \n at


function collectLs (conn) {
let first = true
let counter = 0

return pull.take((msg) => {
if (first) {
varint.decode(msg)
counter = varint.decode(msg, varint.decode.bytes)
return true
}

return counter-- > 0
})
}
Loading