Skip to content

Commit

Permalink
feat: integrate gossipsub by default (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos authored and jacobheun committed Aug 12, 2019
1 parent fc46dbb commit 2959fc8
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 324 deletions.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@
"cids": "~0.6.0",
"debug": "^4.1.1",
"length-prefixed-stream": "github:jacobheun/length-prefixed-stream#v2.0.0-rc.1",
"libp2p": "~0.25.2",
"libp2p": "~0.26.0",
"libp2p-bootstrap": "~0.9.7",
"libp2p-floodsub": "~0.17.0",
"libp2p-gossipsub": "~0.0.4",
"libp2p-kad-dht": "~0.15.2",
"libp2p-secio": "~0.11.1",
"libp2p-tcp": "~0.13.0",
Expand Down
5 changes: 5 additions & 0 deletions src/cli/bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ const main = async (processArgs) => {
type: 'boolean',
default: false
})
.option('pubsubRouter', {
desc: 'Specifies the pubsub router implementation',
type: 'string',
default: 'gossipsub'
})
.fail((msg, err, yargs) => {
if (err) {
throw err // preserve stack
Expand Down
6 changes: 3 additions & 3 deletions src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class Daemon {
async handlePubsubRequest ({ pubsub }, enc) {
switch (pubsub.type) {
case PSRequest.Type.GET_TOPICS: {
const topics = await this.libp2p.pubsub.getTopics()
const topics = await this.libp2p.pubsub.ls()

await new Promise((resolve) => {
enc.write(
Expand All @@ -283,7 +283,7 @@ class Daemon {
case PSRequest.Type.SUBSCRIBE: {
const topic = pubsub.topic

await this.libp2p.pubsub.subscribe(topic, {}, async (msg) => {
await this.libp2p.pubsub.subscribe(topic, async (msg) => {
await new Promise((resolve) => {
enc.write(PSMessage.encode({
from: msg.from && Buffer.from(msg.from),
Expand All @@ -294,7 +294,7 @@ class Daemon {
key: msg.key
}), resolve)
})
})
}, {})

await new Promise((resolve) => {
enc.write(OkResponse(), resolve)
Expand Down
157 changes: 46 additions & 111 deletions src/libp2p.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const Bootstrap = require('libp2p-bootstrap')
const MPLEX = require('pull-mplex')
const SECIO = require('libp2p-secio')
const KadDHT = require('libp2p-kad-dht')
const FloodSub = require('libp2p-floodsub')
const GossipSub = require('libp2p-gossipsub')
const pullToStream = require('pull-stream-to-stream')
const PeerBook = require('peer-book')
const PeerInfo = require('peer-info')
Expand Down Expand Up @@ -105,59 +107,6 @@ class ContentRouting {
}
}

class Pubsub {
/**
* @param {Libp2p} libp2p The libp2p instance to use
*/
constructor (libp2p) {
this.libp2p = libp2p
}

/**
* Subscribe to a pubsub topic
* @param {string} topic
* @param {Object} options
* @param {function(msg)} handler handle received messages
* @returns { Promise<void>}
*/
subscribe (topic, options, handler) {
return new Promise((resolve, reject) => {
this.libp2p._pubsub.subscribe(topic, options, handler, (err) => {
if (err) return reject(err)
resolve()
})
})
}

/**
* Publish data in the context of a topic
* @param {string} topic
* @param {Buffer} data
* @returns {Promise<void>}
*/
publish (topic, data) {
return new Promise((resolve, reject) => {
this.libp2p._pubsub.publish(topic, data, (err) => {
if (err) return reject(err)
resolve()
})
})
}

/**
* Get the list of subscriptions the peer is subscribed to.
* @returns {Promise<Array<string>>}
*/
getTopics () {
return new Promise((resolve, reject) => {
this.libp2p._pubsub.ls((err, topics) => {
if (err) return reject(err)
resolve(topics)
})
})
}
}

class DHT {
/**
* @param {Libp2p} libp2p The libp2p instance to use
Expand Down Expand Up @@ -253,9 +202,7 @@ class DaemonLibp2p extends Libp2p {
constructor (libp2pOpts, { announceAddrs }) {
super(libp2pOpts)
this.announceAddrs = announceAddrs
this.needsPullStream = libp2pOpts.config.EXPERIMENTAL.pubsub
this._pubsub = this.pubsub
this.pubsub = new Pubsub(this)
this.needsPullStream = libp2pOpts.config.pubsub.enabled
}
get contentRouting () {
return this._contentRouting
Expand All @@ -278,76 +225,60 @@ class DaemonLibp2p extends Libp2p {

/**
* Starts the libp2p node
* NOTE: This is currently promisified internally by libp2p
*
* @returns {Promise<void>}
* @param {function(Error)} callback
*/
start () {
return new Promise((resolve, reject) => {
super.start((err) => {
if (err) return reject(err)
start (callback) {
super.start((err) => {
if (err) return callback(err)

// replace with announce addrs until libp2p supports this directly
if (this.announceAddrs.length > 0) {
this.peerInfo.multiaddrs.clear()
this.announceAddrs.forEach(addr => {
this.peerInfo.multiaddrs.add(addr)
})
}

// replace with announce addrs until libp2p supports this directly
if (this.announceAddrs.length > 0) {
this.peerInfo.multiaddrs.clear()
this.announceAddrs.forEach(addr => {
this.peerInfo.multiaddrs.add(addr)
})
// temporary removal of "/ipfs/..." from multiaddrs
// this will be solved in: https://github.com/libp2p/js-libp2p/issues/323
this.peerInfo.multiaddrs.toArray().forEach(m => {
let ma
try {
ma = m.decapsulate('ipfs')
} catch (_) {
ma = m
}

// temporary removal of "/ipfs/..." from multiaddrs
// this will be solved in: https://github.com/libp2p/js-libp2p/issues/323
this.peerInfo.multiaddrs.toArray().forEach(m => {
let ma
try {
ma = m.decapsulate('ipfs')
} catch (_) {
ma = m
}

this.peerInfo.multiaddrs.replace(m, ma)
})

resolve()
this.peerInfo.multiaddrs.replace(m, ma)
})
})
}

/**
* Stops the libp2p node
*
* @returns {Promise<void>}
*/
stop () {
return new Promise((resolve, reject) => {
super.stop((err) => {
if (err) return reject(err)
resolve()
})
callback()
})
}

/**
* Dials the given peer on protocol. The promise will resolve with the connection
* NOTE: This is currently promisified internally by libp2p
*
* @param {PeerInfo} peerInfo
* @param {string} protocol
* @returns {Promise<Connection>}
* @param {function(Error, Connection)} callback
*/
dial (peerInfo, protocol) {
return new Promise((resolve, reject) => {
this.dialProtocol(peerInfo, protocol, (err, conn) => {
if (err) return reject(err)
if (!conn) return resolve()
dial (peerInfo, protocol, callback) {
this.dialProtocol(peerInfo, protocol, (err, conn) => {
if (err) return callback(err)
if (!conn) return callback()

conn.getPeerInfo((err, peerInfo) => {
if (err) return reject(err)
conn.getPeerInfo((err, peerInfo) => {
if (err) return callback(err)

// Convert the pull stream to an iterable node stream
const connection = pullToStream(conn)
connection.peerInfo = peerInfo
// Convert the pull stream to an iterable node stream
const connection = pullToStream(conn)
connection.peerInfo = peerInfo

resolve(connection)
})
callback(null, connection)
})
})
}
Expand Down Expand Up @@ -384,6 +315,8 @@ class DaemonLibp2p extends Libp2p {
* @param {string} opts.id
* @param {string} opts.bootstrapPeers
* @param {string} opts.hostAddrs
* @param {boolean} opts.pubsub
* @param {string} opts.pubsubRouter
* @returns {Libp2p}
*/
const createLibp2p = async ({
Expand All @@ -395,7 +328,8 @@ const createLibp2p = async ({
connMgrLo,
connMgrHi,
id,
pubsub
pubsub,
pubsubRouter
} = {}) => {
const peerInfo = await getPeerInfo(id)
const peerBook = new PeerBook()
Expand Down Expand Up @@ -430,7 +364,8 @@ const createLibp2p = async ({
peerDiscovery: [
Bootstrap
],
dht: KadDHT
dht: KadDHT,
pubsub: pubsubRouter === 'floodsub' ? GossipSub : FloodSub
},
config: {
peerDiscovery: {
Expand All @@ -451,8 +386,8 @@ const createLibp2p = async ({
enabled: dht,
kBucketSize: 20
},
EXPERIMENTAL: {
pubsub: pubsub
pubsub: {
enabled: Boolean(pubsub)
}
}
}, {
Expand Down
1 change: 1 addition & 0 deletions test/daemon/config.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ describe('configuration', () => {
})

await daemon.start()

const addrs = daemon.libp2p.peerInfo.multiaddrs.toArray()
expect(addrs).to.eql([
ma('/dns/ipfs.io')
Expand Down
Loading

0 comments on commit 2959fc8

Please sign in to comment.