Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
Feature: PubSub 🌟 (#644)
Browse files Browse the repository at this point in the history
feat: pubsub
  • Loading branch information
daviddias authored Jan 16, 2017
1 parent 6a5afdd commit 5078ddc
Show file tree
Hide file tree
Showing 24 changed files with 804 additions and 151 deletions.
17 changes: 10 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,18 @@
"aegir": "^9.3.0",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"delay": "^1.3.1",
"detect-node": "^2.0.3",
"eslint-plugin-react": "^6.9.0",
"execa": "^0.6.0",
"expose-loader": "^0.7.1",
"form-data": "^2.1.2",
"fs-pull-blob-store": "^0.4.1",
"gulp": "^3.9.1",
"interface-ipfs-core": "^0.23.0",
"interface-ipfs-core": "^0.23.3",
"ipfsd-ctl": "^0.18.1",
"left-pad": "^1.1.3",
"lodash": "^4.17.2",
"lodash": "^4.17.4",
"mocha": "^3.2.0",
"ncp": "^2.0.0",
"nexpect": "^0.5.0",
Expand All @@ -85,11 +86,12 @@
"async": "^2.1.4",
"bl": "^1.2.0",
"boom": "^4.2.0",
"debug": "^2.5.1",
"debug": "^2.6.0",
"fs-pull-blob-store": "^0.3.0",
"glob": "^7.1.1",
"hapi": "^16.1.0",
"hapi-set-header": "^1.0.2",
"hoek": "^4.1.0",
"idb-pull-blob-store": "^0.5.1",
"ipfs-api": "^12.1.4",
"ipfs-bitswap": "^0.9.0",
Expand All @@ -101,9 +103,10 @@
"ipfs-unixfs-engine": "^0.15.0",
"ipld-resolver": "^0.4.1",
"isstream": "^0.1.2",
"joi": "^10.0.6",
"libp2p-ipfs-nodejs": "^0.17.1",
"libp2p-ipfs-browser": "^0.17.3",
"libp2p-floodsub": "0.7.1",
"joi": "^10.1.0",
"libp2p-ipfs-nodejs": "^0.17.3",
"libp2p-ipfs-browser": "^0.17.4",
"lodash.flatmap": "^4.5.0",
"lodash.get": "^4.4.2",
"lodash.has": "^4.5.2",
Expand Down Expand Up @@ -132,7 +135,7 @@
"temp": "^0.8.3",
"through2": "^2.0.3",
"update-notifier": "^1.0.3",
"yargs": "^6.5.0"
"yargs": "^6.6.0"
},
"contributors": [
"Andrew de Andrade <andrew@deandrade.com.br>",
Expand Down
14 changes: 14 additions & 0 deletions src/cli/commands/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict'

module.exports = {
command: 'pubsub',

description: 'pubsub commands',

builder (yargs) {
return yargs
.commandDir('pubsub')
},

handler (argv) {}
}
32 changes: 32 additions & 0 deletions src/cli/commands/pubsub/ls.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'ls',

describe: 'Get your list of subscriptions',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.pubsub.ls((err, subscriptions) => {
if (err) {
throw err
}

subscriptions.forEach((sub) => {
console.log(sub)
})
})
})
}
}
32 changes: 32 additions & 0 deletions src/cli/commands/pubsub/peers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'peers <topic>',

describe: 'Get all peers subscribed to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.pubsub.peers(argv.topic, (err, peers) => {
if (err) {
throw err
}

peers.forEach((peer) => {
console.log(peer)
})
})
})
}
}
30 changes: 30 additions & 0 deletions src/cli/commands/pubsub/pub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'pub <topic> <data>',

describe: 'Publish data to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

const data = new Buffer(String(argv.data))

ipfs.pubsub.publish(argv.topic, data, (err) => {
if (err) {
throw err
}
})
})
}
}
32 changes: 32 additions & 0 deletions src/cli/commands/pubsub/sub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'sub <topic>',

describe: 'Subscribe to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

const handler = (msg) => {
console.log(msg.data.toString())
}

ipfs.pubsub.subscribe(argv.topic, handler, (err) => {
if (err) {
throw err
}
})
})
}
}
11 changes: 8 additions & 3 deletions src/core/components/go-offline.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
'use strict'

module.exports = function goOffline (self) {
return (cb) => {
module.exports = (self) => {
return (callback) => {
self._blockService.goOffline()
self._bitswap.stop()
self.libp2p.stop(cb)
self._pubsub.stop((err) => {
if (err) {
return callback(err)
}
self.libp2p.stop(callback)
})
}
}
30 changes: 22 additions & 8 deletions src/core/components/go-online.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,39 @@

const series = require('async/series')
const Bitswap = require('ipfs-bitswap')
const FloodSub = require('libp2p-floodsub')

module.exports = function goOnline (self) {
return (cb) => {
module.exports = (self) => {
return (callback) => {
series([
self.load,
self.libp2p.start
(cb) => self.load(cb),
(cb) => self.libp2p.start(cb)
], (err) => {
if (err) {
return cb(err)
return callback(err)
}

self._bitswap = new Bitswap(
self._libp2pNode,
self._repo.blockstore,
self._libp2pNode.peerBook
)
self._bitswap.start()
self._blockService.goOnline(self._bitswap)
cb()

self._pubsub = new FloodSub(self._libp2pNode)

series([
(cb) => {
self._bitswap.start()
cb()
},
(cb) => {
self._blockService.goOnline(self._bitswap)
cb()
},
(cb) => self._pubsub.start(cb) // ,
// For all of the protocols to handshake with each other
// (cb) => setTimeout(cb, 1000) // Still not decided if we want this
], callback)
})
}
}
97 changes: 97 additions & 0 deletions src/core/components/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
'use strict'

const promisify = require('promisify-es6')
const setImmediate = require('async/setImmediate')

const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR

module.exports = function pubsub (self) {
return {
subscribe: (topic, options, handler, callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

if (typeof options === 'function') {
callback = handler
handler = options
options = {}
}

if (!callback) {
return new Promise((resolve, reject) => {
subscribe(topic, options, handler, (err) => {
if (err) {
return reject(err)
}
resolve()
})
})
}

subscribe(topic, options, handler, callback)
},

unsubscribe: (topic, handler) => {
const ps = self._pubsub

ps.removeListener(topic, handler)

if (ps.listenerCount(topic) === 0) {
ps.unsubscribe(topic)
}
},

publish: promisify((topic, data, callback) => {
if (!self.isOnline()) {
return setImmediate(() => callback(OFFLINE_ERROR))
}

if (!Buffer.isBuffer(data)) {
return setImmediate(() => callback(new Error('data must be a Buffer')))
}

self._pubsub.publish(topic, data)
setImmediate(() => callback())
}),

ls: promisify((callback) => {
if (!self.isOnline()) {
return setImmediate(() => callback(OFFLINE_ERROR))
}

const subscriptions = Array.from(
self._pubsub.subscriptions
)

setImmediate(() => callback(null, subscriptions))
}),

peers: promisify((topic, callback) => {
if (!self.isOnline()) {
return setImmediate(() => callback(OFFLINE_ERROR))
}

const peers = Array.from(self._pubsub.peers.values())
.filter((peer) => peer.topics.has(topic))
.map((peer) => peer.info.id.toB58String())

setImmediate(() => callback(null, peers))
}),

setMaxListeners (n) {
return self._pubsub.setMaxListeners(n)
}
}

function subscribe (topic, options, handler, callback) {
const ps = self._pubsub

if (ps.listenerCount(topic) === 0) {
ps.subscribe(topic)
}

ps.on(topic, handler)
setImmediate(() => callback())
}
}
3 changes: 3 additions & 0 deletions src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const swarm = require('./components/swarm')
const ping = require('./components/ping')
const files = require('./components/files')
const bitswap = require('./components/bitswap')
const pubsub = require('./components/pubsub')

exports = module.exports = IPFS

Expand All @@ -44,6 +45,7 @@ function IPFS (repoInstance) {
this._bitswap = null
this._blockService = new BlockService(this._repo)
this._ipldResolver = new IPLDResolver(this._blockService)
this._pubsub = null

// IPFS Core exposed components

Expand All @@ -67,4 +69,5 @@ function IPFS (repoInstance) {
this.files = files(this)
this.bitswap = bitswap(this)
this.ping = ping(this)
this.pubsub = pubsub(this)
}
Loading

0 comments on commit 5078ddc

Please sign in to comment.