Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
stream multiplexing done, starting on identify refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
daviddias committed Mar 7, 2016
1 parent 9d8ee67 commit f8e14e4
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 411 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"bl": "^1.1.2",
"chai": "^3.5.0",
"istanbul": "^0.4.2",
"libp2p-spdy": "^0.1.0",
"libp2p-spdy": "^0.2.3",
"libp2p-tcp": "^0.2.1",
"mocha": "^2.4.5",
"multiaddr": "^1.1.1",
Expand Down
63 changes: 35 additions & 28 deletions src/identify.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,47 @@
/*
* Identify is one of the protocols swarms speaks in order to broadcast and learn
* about the ip:port pairs a specific peer is available through
* Identify is one of the protocols swarms speaks in order to
* broadcast and learn about the ip:port pairs a specific peer
* is available through
*/

var Interactive = require('multistream-select').Interactive
var protobufs = require('protocol-buffers-stream')
var fs = require('fs')
var path = require('path')
var schema = fs.readFileSync(path.join(__dirname, 'identify.proto'))
var Address6 = require('ip-address').Address6
var Id = require('peer-id')
var multiaddr = require('multiaddr')

exports = module.exports = identify

var protoId = '/ipfs/identify/1.0.0'
// var multistream = require('multistream-select')
// var protobufs = require('protocol-buffers-stream')
// var fs = require('fs')
// var path = require('path')
// var protobufs = require('protocol-buffers-stream')
// var schema = fs.readFileSync(path.join(__dirname, 'identify.proto'))
// var Address6 = require('ip-address').Address6
// var Id = require('peer-id')
// var multiaddr = require('multiaddr')

exports = module.exports

exports.multicodec = '/ipfs/identify/1.0.0'

exports.exec = (muxedConn, callback) => {
// TODO
// 1. open a stream
// 2. multistream into identify
// 3. send what I see from this other peer
// 4. receive what the other peer sees from me
// 4. callback with (err, peerInfo)
}

exports.protoId = protoId
var createProtoStream = protobufs(schema)
exports.handler = (peerInfo) => {
return function (conn) {
// TODO
// 1. receive incoming observed info about me
// 2. send back what I see from the other
}
}

/*
function identify (muxedConns, peerInfoSelf, socket, conn, muxer) {
var msi = new Interactive()
msi.handle(conn, function () {
msi.select(protoId, function (err, ds) {
if (err) {
return console.log(err) // TODO Treat error
return console.log(err)
}
var ps = createProtoStream()
Expand All @@ -39,16 +56,6 @@ function identify (muxedConns, peerInfoSelf, socket, conn, muxer) {
socket: socket
}
// TODO: Pass the new discovered info about the peer that contacted us
// to something like the Kademlia Router, so the peerInfo for this peer
// is fresh
// - before this was exectued through a event emitter
// self.emit('peer-update', {
// peerId: peerId,
// listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)})
// })
})

var mh = getMultiaddr(socket)
ps.identify({
Expand Down Expand Up @@ -156,4 +163,4 @@ function updateSelf (peerSelf, observedAddr) {
peerSelf.multiaddrs.push(omh)
}
}
}
}*/
64 changes: 53 additions & 11 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const multistream = require('multistream-select')
// const async = require('async')
// const identify = require('./identify')
const identify = require('./identify')
const PassThrough = require('stream').PassThrough

exports = module.exports = Swarm
Expand Down Expand Up @@ -130,16 +130,28 @@ function Swarm (peerInfo) {
// { muxerCodec: <muxer> } e.g { '/spdy/0.3.1': spdy }
this.muxers = {}
this.connection.addStreamMuxer = (muxer) => {
// TODO
// .handle(protocol, () => {
// after attaching the stream muxer, check if identify is enabled
// })
// TODO add to the list of muxers available
// for dialing
this.muxers[muxer.multicodec] = muxer

// for listening
this.handle(muxer.multicodec, (conn) => {
const muxedConn = muxer(conn, true)
muxedConn.on('stream', connHandler)

if (this.identify) {
identify.exec(muxedConn, (err, pi) => {
if (err) {}
// TODO muxedConns[pi.id.toB58String()].muxer = muxedConn
})
}
})
}

// enable the Identify protocol
this.identify = false
this.connection.reuse = () => {
// TODO identify
this.identify = true
this.handle(identify.multicodec, identify.handler(peerInfo))
}

const self = this // couldn't get rid of this
Expand Down Expand Up @@ -224,14 +236,40 @@ function Swarm (peerInfo) {
}

function attemptMuxerUpgrade (conn, cb) {
if (Object.keys(self.muxers).length === 0) {
const muxers = Object.keys(self.muxers)
if (muxers.length === 0) {
return cb(new Error('no muxers available'))
}
// TODO add muxer to the muxedConns object for the peerId
// TODO if it succeeds, add incomming open coons to connHandler

// 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler

nextMuxer(muxers.shift())

function nextMuxer (key) {
var msI = new multistream.Interactive()
msI.handle(conn, function () {
msI.select(key, (err, conn) => {
if (err) {
if (muxers.length === 0) {
cb(new Error('could not upgrade to stream muxing'))
} else {
nextMuxer(muxers.shift())
}
}

const muxedConn = self.muxers[key](conn, false)
self.muxedConns[b58Id] = {}
self.muxedConns[b58Id].muxer = muxedConn
cb(null, muxedConn)
})
})
}
}
function openConnInMuxedConn (muxer, cb) {
// TODO open a conn in this muxer
cb(muxer.newStream())
}

function protocolHandshake (conn, protocol, cb) {
Expand All @@ -255,6 +293,10 @@ function Swarm (peerInfo) {
this.close = (callback) => {
var count = 0

Object.keys(this.muxedConns).forEach((key) => {
this.muxedConns[key].muxer.end()
})

Object.keys(this.transports).forEach((key) => {
this.transports[key].close(() => {
if (++count === Object.keys(this.transports).length) {
Expand Down
Loading

0 comments on commit f8e14e4

Please sign in to comment.