Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 3cdfcb0

Browse files
authored
Merge pull request #610 from gavinmcdermott/feature-floodsub
WIP: feat/pubsub \o/
2 parents c91abc4 + 8084cb5 commit 3cdfcb0

File tree

19 files changed

+674
-7
lines changed

19 files changed

+674
-7
lines changed

package.json

+6-4
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
"form-data": "^2.1.2",
6161
"fs-pull-blob-store": "^0.4.1",
6262
"gulp": "^3.9.1",
63-
"interface-ipfs-core": "^0.22.0",
63+
"interface-ipfs-core": "git+https://github.com/ipfs/interface-ipfs-core.git#5c7df414a8f627f8adb50a52ef8d2b629381285f",
6464
"left-pad": "^1.1.3",
6565
"lodash": "^4.17.2",
6666
"ncp": "^2.0.0",
@@ -80,7 +80,7 @@
8080
"hapi": "^16.0.0",
8181
"hapi-set-header": "^1.0.2",
8282
"idb-pull-blob-store": "^0.5.1",
83-
"ipfs-api": "^12.0.0",
83+
"ipfs-api": "git+https://github.com/ipfs/js-ipfs-api.git#01044a1f59fb866e4e08b06aae4e74d968615931",
8484
"ipfs-bitswap": "^0.8.1",
8585
"ipfs-block": "^0.5.0",
8686
"ipfs-block-service": "^0.7.0",
@@ -91,8 +91,9 @@
9191
"ipld-resolver": "^0.4.0",
9292
"isstream": "^0.1.2",
9393
"joi": "^10.0.1",
94-
"libp2p-ipfs-nodejs": "^0.16.1",
94+
"libp2p-floodsub": "0.3.1",
9595
"libp2p-ipfs-browser": "^0.17.0",
96+
"libp2p-ipfs-nodejs": "^0.16.1",
9697
"lodash.flatmap": "^4.5.0",
9798
"lodash.get": "^4.4.2",
9899
"lodash.has": "^4.5.2",
@@ -102,6 +103,7 @@
102103
"mafmt": "^2.1.2",
103104
"multiaddr": "^2.1.1",
104105
"multihashes": "^0.3.0",
106+
"ndjson": "1.5.0",
105107
"path-exists": "^3.0.0",
106108
"peer-book": "^0.3.0",
107109
"peer-id": "^0.8.0",
@@ -149,4 +151,4 @@
149151
"nginnever <ginneversource@gmail.com>",
150152
"npmcdn-to-unpkg-bot <npmcdn-to-unpkg-bot@users.noreply.github.com>"
151153
]
152-
}
154+
}

src/cli/commands/pubsub.js

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
'use strict'
2+
3+
// The command count bump from 56 to 60 depends on:
4+
// ipfs/interface-ipfs-core.git#5c7df414a8f627f8adb50a52ef8d2b629381285f
5+
// ipfs/js-ipfs-api.git#01044a1f59fb866e4e08b06aae4e74d968615931
6+
module.exports = {
7+
command: 'pubsub',
8+
9+
description: 'pubsub commands',
10+
11+
builder (yargs) {
12+
return yargs
13+
.commandDir('pubsub')
14+
},
15+
16+
handler (argv) {}
17+
}

src/cli/commands/pubsub/ls.js

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
'use strict'
2+
3+
const utils = require('../../utils')
4+
const debug = require('debug')
5+
const log = debug('cli:pubsub')
6+
log.error = debug('cli:pubsub:error')
7+
8+
module.exports = {
9+
command: 'ls',
10+
11+
describe: 'Get your list of subscriptions',
12+
13+
builder: {},
14+
15+
handler (argv) {
16+
utils.getIPFS((err, ipfs) => {
17+
if (err) {
18+
throw err
19+
}
20+
21+
ipfs.pubsub.ls((err, subscriptions) => {
22+
if (err) {
23+
throw err
24+
}
25+
26+
console.log(JSON.stringify(subscriptions, null, 2))
27+
})
28+
})
29+
}
30+
}

src/cli/commands/pubsub/peers.js

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
'use strict'
2+
3+
const utils = require('../../utils')
4+
const debug = require('debug')
5+
const log = debug('cli:pubsub')
6+
log.error = debug('cli:pubsub:error')
7+
8+
module.exports = {
9+
command: 'peers <topic>',
10+
11+
describe: 'Get all peers subscribed to a topic',
12+
13+
builder: {},
14+
15+
handler (argv) {
16+
utils.getIPFS((err, ipfs) => {
17+
if (err) {
18+
throw err
19+
}
20+
21+
ipfs.pubsub.peers(argv.topic, (err, peers) => {
22+
if (err) {
23+
throw err
24+
}
25+
26+
console.log(JSON.stringify(peers, null, 2))
27+
})
28+
})
29+
}
30+
}

src/cli/commands/pubsub/publish.js

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
'use strict'
2+
3+
const utils = require('../../utils')
4+
const debug = require('debug')
5+
const log = debug('cli:pubsub')
6+
log.error = debug('cli:pubsub:error')
7+
8+
module.exports = {
9+
command: 'publish <topic> <data>',
10+
11+
describe: 'Publish data to a topic',
12+
13+
builder: {},
14+
15+
handler (argv) {
16+
utils.getIPFS((err, ipfs) => {
17+
if (err) {
18+
throw err
19+
}
20+
21+
ipfs.pubsub.publish(argv.topic, argv.data, (err) => {
22+
if (err) {
23+
throw err
24+
}
25+
})
26+
})
27+
}
28+
}

src/cli/commands/pubsub/subscribe.js

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
'use strict'
2+
3+
const utils = require('../../utils')
4+
const debug = require('debug')
5+
const log = debug('cli:pubsub')
6+
log.error = debug('cli:pubsub:error')
7+
8+
module.exports = {
9+
command: 'subscribe <topic>',
10+
11+
alias: 'sub',
12+
13+
describe: 'Subscribe to a topic',
14+
15+
builder: {},
16+
17+
handler (argv) {
18+
utils.getIPFS((err, ipfs) => {
19+
if (err) {
20+
throw err
21+
}
22+
23+
ipfs.pubsub.subscribe(argv.topic, (err, stream) => {
24+
if (err) {
25+
throw err
26+
}
27+
28+
console.log(stream.toString())
29+
})
30+
})
31+
}
32+
}

src/core/components/go-online.js

+6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
const series = require('async/series')
44
const Bitswap = require('ipfs-bitswap')
5+
const FloodSub = require('libp2p-floodsub')
56

67
module.exports = function goOnline (self) {
78
return (cb) => {
@@ -21,6 +22,11 @@ module.exports = function goOnline (self) {
2122
)
2223
self._bitswap.start()
2324
self._blockService.goOnline(self._bitswap)
25+
26+
//
27+
self._pubsub = new FloodSub(self._libp2pNode)
28+
//
29+
2430
cb()
2531
})
2632
}

src/core/components/pubsub.js

+131
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
'use strict'
2+
3+
const promisify = require('promisify-es6')
4+
const Readable = require('stream').Readable
5+
const _values = require('lodash.values')
6+
7+
const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR
8+
9+
let subscriptions = {}
10+
11+
const addSubscription = (topic, request, stream) => {
12+
subscriptions[topic] = { request: request, stream: stream }
13+
}
14+
15+
const removeSubscription = promisify((topic, callback) => {
16+
if (!subscriptions[topic]) {
17+
return callback(new Error(`Not subscribed to ${topic}`))
18+
}
19+
20+
subscriptions[topic].stream.emit('end')
21+
delete subscriptions[topic]
22+
23+
if (callback) {
24+
callback(null)
25+
}
26+
})
27+
28+
module.exports = function pubsub (self) {
29+
return {
30+
subscribe: promisify((topic, options, callback) => {
31+
if (!self.isOnline()) {
32+
throw OFFLINE_ERROR
33+
}
34+
35+
if (typeof options === 'function') {
36+
callback = options
37+
options = {}
38+
}
39+
40+
if (subscriptions[topic]) {
41+
return callback(`Error: Already subscribed to '${topic}'`)
42+
}
43+
44+
const stream = new Readable({ objectMode: true })
45+
46+
stream._read = () => {}
47+
48+
// There is no explicit unsubscribe; subscriptions have a cancel event
49+
stream.cancel = promisify((cb) => {
50+
self._pubsub.unsubscribe(topic)
51+
removeSubscription(topic, cb)
52+
})
53+
54+
self._pubsub.on(topic, (data) => {
55+
stream.emit('data', {
56+
data: data.toString(),
57+
topicIDs: [topic]
58+
})
59+
})
60+
61+
try {
62+
self._pubsub.subscribe(topic)
63+
} catch (err) {
64+
return callback(err)
65+
}
66+
67+
// Add the request to the active subscriptions and return the stream
68+
addSubscription(topic, null, stream)
69+
callback(null, stream)
70+
}),
71+
72+
publish: promisify((topic, data, callback) => {
73+
if (!self.isOnline()) {
74+
throw OFFLINE_ERROR
75+
}
76+
77+
const buf = Buffer.isBuffer(data) ? data : new Buffer(data)
78+
79+
try {
80+
self._pubsub.publish(topic, buf)
81+
} catch (err) {
82+
return callback(err)
83+
}
84+
85+
callback(null)
86+
}),
87+
88+
ls: promisify((callback) => {
89+
if (!self.isOnline()) {
90+
throw OFFLINE_ERROR
91+
}
92+
93+
let subscriptions = []
94+
95+
try {
96+
subscriptions = self._pubsub.getSubscriptions()
97+
} catch (err) {
98+
return callback(err)
99+
}
100+
101+
callback(null, subscriptions)
102+
}),
103+
104+
peers: promisify((topic, callback) => {
105+
if (!self.isOnline()) {
106+
throw OFFLINE_ERROR
107+
}
108+
109+
if (!subscriptions[topic]) {
110+
return callback(`Error: Not subscribed to '${topic}'`)
111+
}
112+
113+
let peers = []
114+
115+
try {
116+
const peerSet = self._pubsub.getPeerSet()
117+
_values(peerSet).forEach((peer) => {
118+
const idB58Str = peer.peerInfo.id.toB58String()
119+
const index = peer.topics.indexOf(topic)
120+
if (index > -1) {
121+
peers.push(idB58Str)
122+
}
123+
})
124+
} catch (err) {
125+
return callback(err)
126+
}
127+
128+
callback(null, peers)
129+
})
130+
}
131+
}

src/core/index.js

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const swarm = require('./components/swarm')
2323
const ping = require('./components/ping')
2424
const files = require('./components/files')
2525
const bitswap = require('./components/bitswap')
26+
const pubsub = require('./components/pubsub')
2627

2728
exports = module.exports = IPFS
2829

@@ -44,6 +45,7 @@ function IPFS (repoInstance) {
4445
this._bitswap = null
4546
this._blockService = new BlockService(this._repo)
4647
this._ipldResolver = new IPLDResolver(this._blockService)
48+
this._pubsub = null
4749

4850
// IPFS Core exposed components
4951

@@ -67,4 +69,5 @@ function IPFS (repoInstance) {
6769
this.files = files(this)
6870
this.bitswap = bitswap(this)
6971
this.ping = ping(this)
72+
this.pubsub = pubsub(this)
7073
}

src/http-api/resources/index.js

+1
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ exports.block = require('./block')
1010
exports.swarm = require('./swarm')
1111
exports.bitswap = require('./bitswap')
1212
exports.files = require('./files')
13+
exports.pubsub = require('./pubsub')

0 commit comments

Comments
 (0)