Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit 7d7e4c1

Browse files
haadcodevictorb
authored andcommitted
feat(pubsub): Add publish/subscribe
This commit adds publish/subscribe methods to js-ipfs-api. These changes requires special version of go-ipfs currently, with pub/sub activated.
1 parent 4e7b8e4 commit 7d7e4c1

File tree

6 files changed

+271
-5
lines changed

6 files changed

+271
-5
lines changed

README.md

+27
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,33 @@ ipfs.id()
187187
This relies on a global `Promise` object. If you are in an environment where that is not
188188
yet available you need to bring your own polyfill.
189189

190+
### Publish/Subscribe (experimental)
191+
192+
js-ipfs-api supports the up-and-coming feature publish/subscribe. This requires you
193+
to use a development build of go-ipfs from the `feat/floodsub` branch (issue for tracking here: https://github.com/ipfs/go-ipfs/pull/3202).
194+
195+
Usage:
196+
197+
```js
198+
const subscription = ipfsApi.pubsub.subscribe('my-topic')
199+
subscription.on('data', (msg) => {
200+
console.log('message', msg.data)
201+
// => 'Hello there!'
202+
})
203+
setTimeout(() => {
204+
// Stop subscription after 10 seconds
205+
subscription.cancel()
206+
}, 1000 * 10)
207+
208+
ipfsApi.pubsub.publish('my-topic', 'Hello there!', (err, successful) => {
209+
if (err) {
210+
console.log('Something went wrong publishing a message')
211+
throw err
212+
}
213+
// successful = true/false
214+
})
215+
```
216+
190217
## Development
191218

192219
### Testing

package.json

+4-2
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,12 @@
4949
"devDependencies": {
5050
"aegir": "^8.0.0",
5151
"chai": "^3.5.0",
52+
"go-ipfs-dep": "https://github.com/haadcode/go-ipfs-dep.git#0a5229816b3a41f17876361314a90c1d4dec79b3",
5253
"gulp": "^3.9.1",
5354
"hapi": "^15.0.2",
5455
"interface-ipfs-core": "^0.15.0",
55-
"ipfsd-ctl": "^0.14.0",
56+
"ipfsd-ctl": "https://github.com/haadcode/js-ipfsd-ctl.git#257712108919a05c625b02399b82d873675bc559",
57+
"js-base64": "^2.1.9",
5658
"pre-commit": "^1.1.3",
5759
"socket.io": "^1.4.8",
5860
"socket.io-client": "^1.4.8",
@@ -103,4 +105,4 @@
103105
"url": "https://github.com/ipfs/js-ipfs-api/issues"
104106
},
105107
"homepage": "https://github.com/ipfs/js-ipfs-api"
106-
}
108+
}

src/api/pubsub.js

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
'use strict'
2+
3+
const promisify = require('promisify-es6')
4+
const bs58 = require('bs58')
5+
const Base64 = require('js-base64').Base64
6+
const Stream = require('stream')
7+
const Readable = Stream.Readable
8+
const http = require('http')
9+
10+
let activeSubscriptions = []
11+
12+
const subscriptionExists = (subscriptions, topic) => {
13+
return subscriptions.indexOf(topic) !== -1
14+
}
15+
const removeSubscription = (subscriptions, topic) => {
16+
const indexToRemove = subscriptions.indexOf(topic)
17+
return subscriptions.filter((el, index) => {
18+
return index !== indexToRemove
19+
})
20+
}
21+
const addSubscription = (subscriptions, topic) => {
22+
return subscriptions.concat([topic])
23+
}
24+
const parseMessage = (message) => {
25+
return Object.assign({}, message, {
26+
from: bs58.encode(message.from),
27+
data: Base64.decode(message.data),
28+
seqno: Base64.decode(message.seqno)
29+
})
30+
}
31+
32+
module.exports = (send, config) => {
33+
return {
34+
subscribe: (topic, options) => {
35+
if (!options) {
36+
options = {}
37+
}
38+
39+
var rs = new Readable({objectMode: true})
40+
rs._read = () => {}
41+
42+
if (!subscriptionExists(activeSubscriptions, topic)) {
43+
activeSubscriptions = addSubscription(activeSubscriptions, topic)
44+
} else {
45+
throw new Error('Already subscribed to ' + topic)
46+
}
47+
48+
let url = '/api/v0/pubsub/sub/' + topic
49+
if (options.discover) {
50+
url = url + '?discover=true'
51+
}
52+
// we're using http.get here to have more control over the request
53+
// and avoid refactoring of the request-api where wreck is gonna be
54+
// replaced by fetch (https://github.com/ipfs/js-ipfs-api/pull/355)
55+
const request = http.get({
56+
host: config.host,
57+
port: config.port,
58+
path: url
59+
}, (response) => {
60+
response.on('data', function (d) {
61+
let data
62+
try {
63+
data = JSON.parse(d)
64+
} catch (err) {
65+
return rs.emit('error', err)
66+
}
67+
68+
// skip "double subscription" error
69+
if (!data.Message) {
70+
rs.emit('data', parseMessage(data))
71+
}
72+
})
73+
response.on('end', function () {
74+
rs.emit('end')
75+
})
76+
})
77+
rs.cancel = () => {
78+
request.abort()
79+
activeSubscriptions = removeSubscription(activeSubscriptions, topic)
80+
}
81+
return rs
82+
},
83+
publish: promisify((topic, data, options, callback) => {
84+
if (typeof options === 'function') {
85+
callback = options
86+
options = {}
87+
}
88+
if (!options) {
89+
options = {}
90+
}
91+
92+
const isBuffer = Buffer.isBuffer(data)
93+
const buf = isBuffer ? data : new Buffer(data)
94+
95+
send({
96+
path: 'pubsub/pub',
97+
args: [topic, buf]
98+
}, (err, result) => {
99+
if (err) {
100+
return callback(err)
101+
}
102+
callback(null, true)
103+
})
104+
})
105+
}
106+
}

src/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ function IpfsAPI (hostOrMultiaddr, port, opts) {
3636
}
3737

3838
const requestAPI = getRequestAPI(config)
39-
const cmds = loadCommands(requestAPI)
39+
const cmds = loadCommands(requestAPI, config)
4040
cmds.send = requestAPI
4141
cmds.Buffer = Buffer
4242

src/load-commands.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ function requireCommands () {
2222
object: require('./api/object'),
2323
pin: require('./api/pin'),
2424
ping: require('./api/ping'),
25+
pubsub: require('./api/pubsub'),
2526
refs: require('./api/refs'),
2627
repo: require('./api/repo'),
2728
swarm: require('./api/swarm'),
@@ -53,12 +54,12 @@ function requireCommands () {
5354
return cmds
5455
}
5556

56-
function loadCommands (send) {
57+
function loadCommands (send, config) {
5758
const files = requireCommands()
5859
const cmds = {}
5960

6061
Object.keys(files).forEach((file) => {
61-
cmds[file] = files[file](send)
62+
cmds[file] = files[file](send, config)
6263
})
6364

6465
return cmds

test/ipfs-api/pubsub.spec.js

+130
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/* eslint-env mocha */
2+
/* eslint max-nested-callbacks: ['error', 8] */
3+
'use strict'
4+
5+
const expect = require('chai').expect
6+
const isNode = require('detect-node')
7+
const FactoryClient = require('../factory/factory-client')
8+
const map = require('async/map')
9+
10+
const topicName = 'js-ipfs-api-tests'
11+
12+
const publish = (ipfs, data, callback) => {
13+
ipfs.pubsub.publish(topicName, data, (err, successful) => {
14+
expect(err).to.not.exist
15+
expect(successful).to.equal(true)
16+
callback()
17+
})
18+
}
19+
20+
describe('.pubsub', () => {
21+
if (!isNode) {
22+
return
23+
}
24+
25+
let ipfs
26+
let fc
27+
28+
before(function (done) {
29+
fc = new FactoryClient()
30+
fc.spawnNode((err, node) => {
31+
expect(err).to.not.exist
32+
if (err) done(err)
33+
ipfs = node
34+
done()
35+
})
36+
})
37+
38+
after((done) => {
39+
fc.dismantle(done)
40+
})
41+
42+
describe('.publish', () => {
43+
it('message from string', (done) => {
44+
publish(ipfs, 'hello friend', done)
45+
})
46+
it('message from buffer', (done) => {
47+
publish(ipfs, new Buffer('hello friend'), done)
48+
})
49+
})
50+
51+
describe('.subscribe', () => {
52+
it('one topic', (done) => {
53+
const subscription = ipfs.pubsub.subscribe(topicName)
54+
subscription.on('data', (d) => {
55+
expect(d.data).to.equal('hi')
56+
subscription.cancel()
57+
})
58+
subscription.on('end', () => {
59+
done()
60+
})
61+
setTimeout(publish.bind(null, ipfs, 'hi', () => {}), 0)
62+
})
63+
it('fails when already subscribed', () => {
64+
const firstSub = ipfs.pubsub.subscribe(topicName)
65+
let caughtErr = null
66+
try {
67+
ipfs.pubsub.subscribe(topicName)
68+
} catch (err) {
69+
caughtErr = err
70+
}
71+
expect(caughtErr.toString()).to.equal('Error: Already subscribed to ' + topicName)
72+
firstSub.cancel()
73+
})
74+
it('receive multiple messages', (done) => {
75+
let receivedMessages = []
76+
let interval = null
77+
const expectedMessages = 2
78+
const subscription = ipfs.pubsub.subscribe(topicName)
79+
subscription.on('data', (d) => {
80+
receivedMessages.push(d.data)
81+
if (receivedMessages.length === expectedMessages) {
82+
receivedMessages.forEach((msg) => {
83+
expect(msg).to.be.equal('hi')
84+
})
85+
clearInterval(interval)
86+
subscription.cancel()
87+
done()
88+
}
89+
})
90+
91+
setTimeout(() => {
92+
interval = setInterval(publish.bind(null, ipfs, 'hi', () => {}), 10)
93+
}, 10)
94+
})
95+
})
96+
describe('multiple nodes pub/sub', () => {
97+
let clients = {}
98+
before(function (done) {
99+
const keys = ['a', 'b']
100+
fc = new FactoryClient()
101+
map(['a', 'b'], (_, cb) => {
102+
return fc.spawnNode(cb)
103+
}, (err, nodes) => {
104+
if (err) return done(err)
105+
keys.forEach((key, i) => {
106+
clients[key] = nodes[i]
107+
})
108+
done()
109+
})
110+
})
111+
after((done) => {
112+
fc.dismantle(done)
113+
})
114+
it('receive messages from different node', (done) => {
115+
const expectedString = 'hello from the other side'
116+
const subscription = clients.a.pubsub.subscribe(topicName)
117+
subscription.on('data', (d) => {
118+
expect(d.data).to.be.equal(expectedString)
119+
subscription.cancel()
120+
done()
121+
})
122+
setTimeout(() => {
123+
clients.b.pubsub.publish(topicName, expectedString, (err, result) => {
124+
expect(err).to.not.exist
125+
expect(result).to.equal(true)
126+
})
127+
}, 100)
128+
})
129+
})
130+
})

0 commit comments

Comments
 (0)