Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

floodsub api #377

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,33 @@ ipfs.id()
This relies on a global `Promise` object. If you are in an environment where that is not
yet available you need to bring your own polyfill.

### Publish/Subscribe (experimental)

js-ipfs-api supports the up-and-coming feature publish/subscribe. This requires you
to use a development build of go-ipfs from the `feat/floodsub` branch (issue for tracking here: https://github.com/ipfs/go-ipfs/pull/3202).

Usage:

```js
const subscription = ipfsApi.pubsub.subscribe('my-topic')
subscription.on('data', (msg) => {
console.log('message', msg.data)
// => 'Hello there!'
})
setTimeout(() => {
// Stop subscription after 10 seconds
subscription.cancel()
}, 1000 * 10)

ipfsApi.pubsub.publish('my-topic', 'Hello there!', (err, successful) => {
if (err) {
console.log('Something went wrong publishing a message')
throw err
}
// successful = true/false
})
```

## Development

### Testing
Expand Down
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@
"devDependencies": {
"aegir": "^8.0.0",
"chai": "^3.5.0",
"go-ipfs-dep": "https://github.com/haadcode/go-ipfs-dep.git#0a5229816b3a41f17876361314a90c1d4dec79b3",
"gulp": "^3.9.1",
"hapi": "^15.0.2",
"interface-ipfs-core": "^0.15.0",
"ipfsd-ctl": "^0.14.0",
"ipfsd-ctl": "https://github.com/haadcode/js-ipfsd-ctl.git#257712108919a05c625b02399b82d873675bc559",
"js-base64": "^2.1.9",
"pre-commit": "^1.1.3",
"socket.io": "^1.4.8",
"socket.io-client": "^1.4.8",
Expand Down Expand Up @@ -103,4 +105,4 @@
"url": "https://github.com/ipfs/js-ipfs-api/issues"
},
"homepage": "https://github.com/ipfs/js-ipfs-api"
}
}
106 changes: 106 additions & 0 deletions src/api/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
'use strict'

const promisify = require('promisify-es6')
const bs58 = require('bs58')
const Base64 = require('js-base64').Base64
const Stream = require('stream')
const Readable = Stream.Readable
const http = require('http')

let activeSubscriptions = []

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at what activeSubscriptions does, would it make sense to put them in a map?

// add
activeSubscriptions[topic] = {}

// check
Object.keys(activeSubscriptions).include(topic)

// remove
delete activeSubscriptions[topic]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to keep it to be as little mutations as possible, just overwriting the entire variable instead of mutating the existing one.

But if you feel really strongly about this, I can change it

const subscriptionExists = (subscriptions, topic) => {
return subscriptions.indexOf(topic) !== -1
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can use subscriptions.includes(topic) instead of .indexOf(). Perhaps not even needed to wrap in a function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to have been introduced in version 6 of node, what versions are we supporting out of the box?

Copy link
Contributor

@daviddias daviddias Sep 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 and up. We have been following the LTS track

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed now, thanks!

const removeSubscription = (subscriptions, topic) => {
const indexToRemove = subscriptions.indexOf(topic)
return subscriptions.filter((el, index) => {
return index !== indexToRemove
})
}
const addSubscription = (subscriptions, topic) => {
return subscriptions.concat([topic])
}
const parseMessage = (message) => {
return Object.assign({}, message, {
from: bs58.encode(message.from),
data: Base64.decode(message.data),
seqno: Base64.decode(message.seqno)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@whyrusleeping you are base64 encoding all the messages?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, it is just when it comes through the http-api

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

base64 encoding happens on buffers in json

})
}

module.exports = (send, config) => {
return {
subscribe: (topic, options) => {
if (!options) {
options = {}
}

var rs = new Readable({objectMode: true})
rs._read = () => {}

if (!subscriptionExists(activeSubscriptions, topic)) {
activeSubscriptions = addSubscription(activeSubscriptions, topic)
} else {
throw new Error('Already subscribed to ' + topic)
}

let url = '/api/v0/pubsub/sub/' + topic
if (options.discover) {
url = url + '?discover=true'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already depend on qs, which makes adding query parameters nicer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm just thinking this is such a short-lived piece of code, I'm expecting it to change since the go-ipfs feature haven't even been merged yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can still be nice code :P and with qs it's even easier to change things around ;)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it can but the value given by it is less :) I would rather refactor everything to use the request-api instead of it's own http.get, but would require refactor of request-api as well and then your ongoing fetch changes will conflict.

I would say we keep this as is now, and change it in the future when needed, once the API stabilized. Keep in mind, pub/sub is not even released in go-ipfs yet, and usage of this will be minimal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
// we're using http.get here to have more control over the request
// and avoid refactoring of the request-api where wreck is gonna be
// replaced by fetch (https://github.com/ipfs/js-ipfs-api/pull/355)
const request = http.get({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't wreck give you the same level of control? I would like to keep consistency in that sense of using it in both places. Also this does not account for connections over https

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but we're removing wreck so I'm not sure it's a good idea to continue to develop with it...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't worry about it, removing wreck will take some more time.

host: config.host,
port: config.port,
path: url
}, (response) => {
response.on('data', function (d) {
let data
try {
data = JSON.parse(d)
} catch (err) {
return rs.emit('error', err)
}

// skip "double subscription" error
if (!data.Message) {
rs.emit('data', parseMessage(data))
}
})
response.on('end', function () {
rs.emit('end')
})
})
rs.cancel = () => {
request.abort()
activeSubscriptions = removeSubscription(activeSubscriptions, topic)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is now defined twice

Copy link
Contributor Author

@victorb victorb Sep 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my thinking is that if you're canceling the subscription before you got one message, we need to cancel the subscription and update the list of subscriptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, but the outer definition is enough, you are only duplicating the same code in the inner one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, of course. Thanks

return rs
},
publish: promisify((topic, data, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}
if (!options) {
options = {}
}

const isBuffer = Buffer.isBuffer(data)
const buf = isBuffer ? data : new Buffer(data)

send({
path: 'pubsub/pub',
args: [topic, buf]
}, (err, result) => {
if (err) {
return callback(err)
}
callback(null, true)
})
})
}
}
2 changes: 1 addition & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ function IpfsAPI (hostOrMultiaddr, port, opts) {
}

const requestAPI = getRequestAPI(config)
const cmds = loadCommands(requestAPI)
const cmds = loadCommands(requestAPI, config)
cmds.send = requestAPI
cmds.Buffer = Buffer

Expand Down
5 changes: 3 additions & 2 deletions src/load-commands.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ function requireCommands () {
object: require('./api/object'),
pin: require('./api/pin'),
ping: require('./api/ping'),
pubsub: require('./api/pubsub'),
refs: require('./api/refs'),
repo: require('./api/repo'),
swarm: require('./api/swarm'),
Expand Down Expand Up @@ -53,12 +54,12 @@ function requireCommands () {
return cmds
}

function loadCommands (send) {
function loadCommands (send, config) {
const files = requireCommands()
const cmds = {}

Object.keys(files).forEach((file) => {
cmds[file] = files[file](send)
cmds[file] = files[file](send, config)
})

return cmds
Expand Down
130 changes: 130 additions & 0 deletions test/ipfs-api/pubsub.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ['error', 8] */
'use strict'

const expect = require('chai').expect
const isNode = require('detect-node')
const FactoryClient = require('../factory/factory-client')
const map = require('async/map')

const topicName = 'js-ipfs-api-tests'

const publish = (ipfs, data, callback) => {
ipfs.pubsub.publish(topicName, data, (err, successful) => {
expect(err).to.not.exist
expect(successful).to.equal(true)
callback()
})
}

describe('.pubsub', () => {
if (!isNode) {
return
}

let ipfs
let fc

before(function (done) {
fc = new FactoryClient()
fc.spawnNode((err, node) => {
expect(err).to.not.exist
if (err) done(err)
ipfs = node
done()
})
})

after((done) => {
fc.dismantle(done)
})

describe('.publish', () => {
it('message from string', (done) => {
publish(ipfs, 'hello friend', done)
})
it('message from buffer', (done) => {
publish(ipfs, new Buffer('hello friend'), done)
})
})

describe('.subscribe', () => {
it('one topic', (done) => {
const subscription = ipfs.pubsub.subscribe(topicName)
subscription.on('data', (d) => {
expect(d.data).to.equal('hi')
subscription.cancel()
})
subscription.on('end', () => {
done()
})
setTimeout(publish.bind(null, ipfs, 'hi', () => {}), 0)
})
it('fails when already subscribed', () => {
const firstSub = ipfs.pubsub.subscribe(topicName)
let caughtErr = null
try {
ipfs.pubsub.subscribe(topicName)
} catch (err) {
caughtErr = err
}
expect(caughtErr.toString()).to.equal('Error: Already subscribed to ' + topicName)
firstSub.cancel()
})
it('receive multiple messages', (done) => {
let receivedMessages = []
let interval = null
const expectedMessages = 2
const subscription = ipfs.pubsub.subscribe(topicName)
subscription.on('data', (d) => {
receivedMessages.push(d.data)
if (receivedMessages.length === expectedMessages) {
receivedMessages.forEach((msg) => {
expect(msg).to.be.equal('hi')
})
clearInterval(interval)
subscription.cancel()
done()
}
})

setTimeout(() => {
interval = setInterval(publish.bind(null, ipfs, 'hi', () => {}), 10)
}, 10)
})
})
describe('multiple nodes pub/sub', () => {
let clients = {}
before(function (done) {
const keys = ['a', 'b']
fc = new FactoryClient()
map(['a', 'b'], (_, cb) => {
return fc.spawnNode(cb)
}, (err, nodes) => {
if (err) return done(err)
keys.forEach((key, i) => {
clients[key] = nodes[i]
})
done()
})
})
after((done) => {
fc.dismantle(done)
})
it('receive messages from different node', (done) => {
const expectedString = 'hello from the other side'
const subscription = clients.a.pubsub.subscribe(topicName)
subscription.on('data', (d) => {
expect(d.data).to.be.equal(expectedString)
subscription.cancel()
done()
})
setTimeout(() => {
clients.b.pubsub.publish(topicName, expectedString, (err, result) => {
expect(err).to.not.exist
expect(result).to.equal(true)
})
}, 100)
})
})
})