Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

feat(pubsub): Add pubsub api #493

Merged
merged 2 commits into from
Mar 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,30 @@ $ ipfs config --json API.HTTPHeaders.Access-Control-Allow-Methods "[\"PUT\", \"P

> `js-ipfs-api` follows the spec defined by [`interface-ipfs-core`](https://github.com/ipfs/interface-ipfs-core), which concerns the interface to expect from IPFS implementations. This interface is a currently active endeavor. You can use it today to consult the methods available.

#### Caveats

##### Pubsub

**Currently, the [PubSub API only works in Node.js envinroment](https://github.com/ipfs/js-ipfs-api/issues/518)**

We currently don't support pubsub when run in the browser, and we test it with separate set of tests to make sure if it's being used in the browser, pubsub errors.

More info: https://github.com/ipfs/js-ipfs-api/issues/518

This means:
- You can use pubsub from js-ipfs-api in Node.js
- You can use pubsub from js-ipfs-api in Electron
(when js-ipfs-api is ran in the main process of Electron)
- You can't use pubsub from js-ipfs-api in the browser
- You can't use pubsub from js-ipfs-api in Electron's
renderer process
- You can use pubsub from js-ipfs in the browsers
- You can use pubsub from js-ipfs in Node.js
- You can use pubsub from js-ipfs in Electron
(in both the main process and the renderer process)
- See https://github.com/ipfs/js-ipfs for details on
pubsub in js-ipfs

##### [bitswap]()

- [`ipfs.bitswap.wantlist()`]()
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"eslint-plugin-react": "^6.10.3",
"gulp": "^3.9.1",
"hapi": "^16.1.0",
"interface-ipfs-core": "~0.26.1",
"interface-ipfs-core": "~0.26.2",
"ipfsd-ctl": "~0.20.0",
"pre-commit": "^1.2.2",
"socket.io": "^1.7.3",
Expand Down
162 changes: 162 additions & 0 deletions src/api/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
'use strict'

const promisify = require('promisify-es6')
const EventEmitter = require('events')
const eos = require('end-of-stream')
const isNode = require('detect-node')
const PubsubMessageStream = require('../pubsub-message-stream')
const stringlistToArray = require('../stringlist-to-array')

const NotSupportedError = () => new Error('pubsub is currently not supported when run in the browser')

/* Public API */
module.exports = (send) => {
/* Internal subscriptions state and functions */
const ps = new EventEmitter()
const subscriptions = {}
ps.id = Math.random()
return {
subscribe: (topic, options, handler, callback) => {
const defaultOptions = {
discover: false
}

if (typeof options === 'function') {
callback = handler
handler = options
options = defaultOptions
}

if (!options) {
options = defaultOptions
}

// Throw an error if ran in the browsers
if (!isNode) {
if (!callback) {
return Promise.reject(NotSupportedError())
}
return callback(NotSupportedError())
}

// promisify doesn't work as we always pass a
// function as last argument (`handler`)
if (!callback) {
return new Promise((resolve, reject) => {
subscribe(topic, options, handler, (err) => {
if (err) {
return reject(err)
}
resolve()
})
})
}

subscribe(topic, options, handler, callback)
},
unsubscribe: (topic, handler) => {
if (!isNode) {
throw NotSupportedError()
}

if (ps.listenerCount(topic) === 0 || !subscriptions[topic]) {
throw new Error(`Not subscribed to '${topic}'`)
}

ps.removeListener(topic, handler)

// Drop the request once we are actualy done
if (ps.listenerCount(topic) === 0) {
subscriptions[topic].abort()
subscriptions[topic] = null
}
},
publish: promisify((topic, data, callback) => {
if (!isNode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this not be supported in browser? It just sends data to a HTTP endpoint?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Publishing should be fine, it is subscribing and being 'able to cancel a subscription' that is not.

return callback(NotSupportedError())
}

if (!Buffer.isBuffer(data)) {
return callback(new Error('data must be a Buffer'))
}

const request = {
path: 'pubsub/pub',
args: [topic, data]
}

send(request, callback)
}),
ls: promisify((callback) => {
if (!isNode) {
return callback(NotSupportedError())
}

const request = {
path: 'pubsub/ls'
}

send.andTransform(request, stringlistToArray, callback)
}),
peers: promisify((topic, callback) => {
if (!isNode) {
return callback(NotSupportedError())
}

const request = {
path: 'pubsub/peers',
args: [topic]
}

send.andTransform(request, stringlistToArray, callback)
}),
setMaxListeners (n) {
return ps.setMaxListeners(n)
}
}

function subscribe (topic, options, handler, callback) {
ps.on(topic, handler)
if (subscriptions[topic]) {
return callback()
}

// Request params
const request = {
path: 'pubsub/sub',
args: [topic],
qs: {
discover: options.discover
}
}

// Start the request and transform the response
// stream to Pubsub messages stream
subscriptions[topic] = send.andTransform(request, PubsubMessageStream.from, (err, stream) => {
if (err) {
subscriptions[topic] = null
ps.removeListener(topic, handler)
return callback(err)
}

stream.on('data', (msg) => {
ps.emit(topic, msg)
})

stream.on('error', (err) => {
ps.emit('error', err)
})

eos(stream, (err) => {
if (err) {
ps.emit('error', err)
}

subscriptions[topic] = null
ps.removeListener(topic, handler)
})

callback()
})
}
}
1 change: 1 addition & 0 deletions src/load-commands.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ function requireCommands () {
refs: require('./api/refs'),
repo: require('./api/repo'),
swarm: require('./api/swarm'),
pubsub: require('./api/pubsub'),
update: require('./api/update'),
version: require('./api/version')
}
Expand Down
33 changes: 33 additions & 0 deletions src/pubsub-message-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

const TransformStream = require('readable-stream').Transform
const PubsubMessage = require('./pubsub-message-utils')

class PubsubMessageStream extends TransformStream {
constructor (options) {
const opts = Object.assign(options || {}, { objectMode: true })
super(opts)
}

static from (inputStream, callback) {
let outputStream = inputStream.pipe(new PubsubMessageStream())
inputStream.on('end', () => outputStream.emit('end'))
callback(null, outputStream)
}

_transform (obj, enc, callback) {
let msg
try {
msg = PubsubMessage.deserialize(obj, 'base64')
} catch (err) {
// Not a valid pubsub message
// go-ipfs returns '{}' as the very first object atm, we skip that
return callback()
}

this.push(msg)
callback()
}
}

module.exports = PubsubMessageStream
39 changes: 39 additions & 0 deletions src/pubsub-message-utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict'

const bs58 = require('bs58')

module.exports = {
deserialize (data, enc) {
enc = enc ? enc.toLowerCase() : 'json'

if (enc === 'json') {
return deserializeFromJson(data)
} else if (enc === 'base64') {
return deserializeFromBase64(data)
}

throw new Error(`Unsupported encoding: '${enc}'`)
}
}

function deserializeFromJson (data) {
const json = JSON.parse(data)
return deserializeFromBase64(json)
}

function deserializeFromBase64 (obj) {
if (!isPubsubMessage(obj)) {
throw new Error(`Not a pubsub message`)
}

return {
from: bs58.encode(new Buffer(obj.from, 'base64')).toString(),
seqno: new Buffer(obj.seqno, 'base64'),
data: new Buffer(obj.data, 'base64'),
topicCIDs: obj.topicIDs || obj.topicCIDs
}
}

function isPubsubMessage (obj) {
return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs)
}
23 changes: 23 additions & 0 deletions test/interface/pubsub.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/* eslint-env mocha */

'use strict'

const test = require('interface-ipfs-core')
const FactoryClient = require('../ipfs-factory/client')
const isNode = require('detect-node')

if (isNode) {
let fc

const common = {
setup: function (callback) {
fc = new FactoryClient()
callback(null, fc)
},
teardown: function (callback) {
fc.dismantle(callback)
}
}

test.pubsub(common)
}
2 changes: 1 addition & 1 deletion test/ipfs-factory/daemon-spawner.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ function spawnEphemeralNode (callback) {
node.setConfig(configKey, configVal, cb)
}, cb)
},
(cb) => node.startDaemon(cb)
(cb) => node.startDaemon(['--enable-pubsub-experiment'], cb)
], (err) => callback(err, node))
})
}
Loading