Skip to content
This repository has been archived by the owner on Oct 19, 2022. It is now read-only.

Commit

Permalink
feat: refactor, simplify, make handling of logs nicer
Browse files Browse the repository at this point in the history
  • Loading branch information
daviddias committed Nov 3, 2016
1 parent 83e2f70 commit 6699023
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 162 deletions.
101 changes: 0 additions & 101 deletions src/agreement.js

This file was deleted.

5 changes: 2 additions & 3 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
'use strict'

module.exports = {
PROTOCOL_ID: '/multistream/1.0.0'
}
exports = module.exports
exports.PROTOCOL_ID = '/multistream/1.0.0'
39 changes: 17 additions & 22 deletions src/dialer.js → src/dialer/index.js
Original file line number Diff line number Diff line change
@@ -1,76 +1,71 @@
'use strict'

const lp = require('pull-length-prefixed')
const varint = require('varint')
const pull = require('pull-stream')
const pullLP = require('pull-length-prefixed')
const Connection = require('interface-connection').Connection
const debug = require('debug')
const log = debug('multistream:dialer')
const util = require('../util')
const select = require('../select')

const PROTOCOL_ID = require('./constants').PROTOCOL_ID
const agrmt = require('./agreement')

function getRandomId () {
return ((~~(Math.random() * 1e9)).toString(36))
}
const PROTOCOL_ID = require('./../constants').PROTOCOL_ID

module.exports = class Dialer {
constructor () {
this.conn = null
this.msThreadId = getRandomId()
this.log = util.log.dialer()
}

// perform the multistream handshake
handle (rawConn, callback) {
log('(%s) dialer handle conn', this.msThreadId)
const ms = agrmt.select(PROTOCOL_ID, (err, conn) => {
this.log('dialer handle conn')
const s = select(PROTOCOL_ID, (err, conn) => {
if (err) {
return callback(err)
}
log('(%s) handshake success', this.msThreadId)
this.log('handshake success')

this.conn = new Connection(conn, rawConn)

callback()
}, this.msThreadId)
}, this.log)

pull(
rawConn,
ms,
s,
rawConn
)
}

select (protocol, callback) {
log('(%s) dialer select %s', this.msThreadId, protocol)
this.log('dialer select ' + protocol)
if (!this.conn) {
return callback(new Error('multistream handshake has not finalized yet'))
}

const selectStream = agrmt.select(protocol, (err, conn) => {
const s = select(protocol, (err, conn) => {
if (err) {
this.conn = new Connection(conn, this.conn)
return callback(err)
}
callback(null, new Connection(conn, this.conn))
}, this.msThreadId)
}, this.log)

pull(
this.conn,
selectStream,
s,
this.conn
)
}

ls (callback) {
const lsStream = agrmt.select('ls', (err, conn) => {
const lsStream = select('ls', (err, conn) => {
if (err) {
return callback(err)
}

pull(
conn,
lp.decode(),
pullLP.decode(),
collectLs(conn),
pull.map(stringify),
pull.collect((err, list) => {
Expand All @@ -80,7 +75,7 @@ module.exports = class Dialer {
callback(null, list.slice(1))
})
)
})
}, this.log)

pull(
this.conn,
Expand Down
40 changes: 17 additions & 23 deletions src/listener.js → src/listener/index.js
Original file line number Diff line number Diff line change
@@ -1,47 +1,46 @@
'use strict'

const lp = require('pull-length-prefixed')
const pull = require('pull-stream')
const pullLP = require('pull-length-prefixed')
const varint = require('varint')
const isFunction = require('lodash.isfunction')
const assert = require('assert')
const debug = require('debug')
const log = debug('multistream:listener')
const select = require('../select')
const selectHandler = require('./selectHandler')
const util = require('./../util')
const Connection = require('interface-connection').Connection

const PROTOCOL_ID = require('./constants').PROTOCOL_ID
const agrmt = require('./agreement')
const PROTOCOL_ID = require('./../constants').PROTOCOL_ID

module.exports = class Listener {
constructor () {
this.handlers = {
ls: (conn) => this._ls(conn)
}
this.log = util.log.listener()
}

// perform the multistream handshake
handle (rawConn, callback) {
const msThreadId = getRandomId()
log('(%s) listener handle conn', msThreadId)
this.log('listener handle conn')

const selectStream = agrmt.select(PROTOCOL_ID, (err, conn) => {
const selectStream = select(PROTOCOL_ID, (err, conn) => {
if (err) {
return callback(err)
}

const hsConn = new Connection(conn, rawConn)
const shConn = new Connection(conn, rawConn)

const handlerSelector =
agrmt.handlerSelector(hsConn, this.handlers, msThreadId)
const sh = selectHandler(shConn, this.handlers, this.log)

pull(
hsConn,
handlerSelector,
hsConn
shConn,
sh,
shConn
)

callback()
}, msThreadId)
}, this.log)

pull(
rawConn,
Expand All @@ -52,12 +51,11 @@ module.exports = class Listener {

// be ready for a given `protocol`
addHandler (protocol, handler) {
log('adding handler: %s', protocol)

this.log('adding handler: ' + protocol)
assert(isFunction(handler), 'handler must be a function')

if (this.handlers[protocol]) {
log('overwriting handler for %s', protocol)
this.log('overwriting handler for ' + protocol)
}

this.handlers[protocol] = handler
Expand Down Expand Up @@ -88,12 +86,8 @@ module.exports = class Listener {

pull(
pull.values(values),
lp.encode(),
pullLP.encode(),
conn
)
}
}

function getRandomId () {
return ((~~(Math.random() * 1e9)).toString(36))
}
43 changes: 43 additions & 0 deletions src/listener/selectHandler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
'use strict'

const handshake = require('pull-handshake')
const lp = require('pull-length-prefixed')
const Connection = require('interface-connection').Connection
const writeEncoded = require('../util.js').writeEncoded

function selectHandler (rawConn, handlersMap, log) {
const cb = (err) => {
// incoming errors are irrelevant for the app
log.error(err)
}

const stream = handshake({ timeout: 60 * 1000 }, cb)
const shake = stream.handshake

next()
return stream

function next () {
lp.decodeFromReader(shake, (err, data) => {
if (err) {
return cb(err)
}
log('received:', data.toString())
const protocol = data.toString().slice(0, -1)
const result = Object.keys(handlersMap).filter((id) => id === protocol)
const key = result && result[0]

if (key) {
log('send ack back of: ' + protocol)
writeEncoded(shake, data, cb)
handlersMap[key](new Connection(shake.rest(), rawConn))
} else {
log('not supported protocol: ' + protocol)
writeEncoded(shake, new Buffer('na\n'))
next()
}
})
}
}

module.exports = selectHandler
35 changes: 35 additions & 0 deletions src/select.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict'

const handshake = require('pull-handshake')
const pullLP = require('pull-length-prefixed')
const util = require('./util')
const writeEncoded = util.writeEncoded

function select (multicodec, callback, log) {
const stream = handshake({
timeout: 60 * 1000
}, callback)

const shake = stream.handshake

log('writing multicodec: ' + multicodec)
writeEncoded(shake, new Buffer(multicodec + '\n'), callback)

pullLP.decodeFromReader(shake, (err, data) => {
if (err) {
return callback(err)
}
const protocol = data.toString().slice(0, -1)

if (protocol !== multicodec) {
return callback(new Error(`"${multicodec}" not supported`), shake.rest())
}

log('received ack: ' + protocol)
callback(null, shake.rest())
})

return stream
}

module.exports = select
Loading

0 comments on commit 6699023

Please sign in to comment.