diff --git a/README.md b/README.md index d09a5b1076..64d6a83fa7 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,18 @@ Floodsub emits two kinds of events: ## API -See https://libp2p.github.io/js-libp2p-floodsub +### Create a floodsub implementation + +```js +const options = {…} +const floodsub = new Floodsub(libp2pNode, options) +``` + +Options is an optional object with the following key-value pairs: + +* **`emitSelf`**: boolean identifying whether the node should emit to self on publish, in the event of the topic being subscribed (defaults to **true**). + +For more, see https://libp2p.github.io/js-libp2p-floodsub ## Contribute diff --git a/src/index.js b/src/index.js index 18ab1e2869..3ff99e9747 100644 --- a/src/index.js +++ b/src/index.js @@ -21,10 +21,12 @@ const noop = () => {} */ class FloodSub extends BaseProtocol { /** - * @param {Object} libp2p + * @param {Object} libp2p an instance of Libp2p + * @param {Object} [options] + * @param {boolean} options.emitSelf if publish should emit to self, if subscribed, defaults to true * @constructor */ - constructor (libp2p) { + constructor (libp2p, options = {}) { super('libp2p:floodsub', multicodec, libp2p) /** @@ -32,6 +34,14 @@ class FloodSub extends BaseProtocol { * @type {Set} */ this.subscriptions = new Set() + + /** + * Pubsub options + */ + this._options = { + emitSelf: true, + ...options + } } /** @@ -203,8 +213,8 @@ class FloodSub extends BaseProtocol { topicIDs: topics } - // Emit to self if I'm interested - this._emitMessages(topics, [message]) + // Emit to self if I'm interested and it is enabled + this._options.emitSelf && this._emitMessages(topics, [message]) this._buildMessage(message, cb) } diff --git a/test/emit-self.spec.js b/test/emit-self.spec.js new file mode 100644 index 0000000000..2907ab6474 --- /dev/null +++ b/test/emit-self.spec.js @@ -0,0 +1,100 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-spies')) +const expect = chai.expect +const series = require('async/series') + +const FloodSub = require('../src') + +const { + createNode +} = require('./utils') + +const shouldNotHappen = (_) => expect.fail() + +describe('emit self', () => { + const topic = 'Z' + + describe('enabled', () => { + let nodeA + let fsA + + before((done) => { + createNode((err, node) => { + if (err) { + return done(err) + } + nodeA = node + nodeA.start(done) + }) + }) + + before((done) => { + fsA = new FloodSub(nodeA, { emitSelf: true }) + fsA.start(done) + }) + + before(() => { + fsA.subscribe(topic) + }) + + after((done) => { + series([ + (cb) => fsA.stop(cb), + (cb) => nodeA.stop(cb) + ], done) + }) + + it('should emit to self on publish', async () => { + const promise = new Promise((resolve) => fsA.once(topic, resolve)) + + fsA.publish(topic, Buffer.from('hey')) + + await promise + }) + }) + + describe('disabled', () => { + let nodeA + let fsA + + before((done) => { + createNode((err, node) => { + if (err) { + return done(err) + } + nodeA = node + nodeA.start(done) + }) + }) + + before((done) => { + fsA = new FloodSub(nodeA, { emitSelf: false }) + fsA.start(done) + }) + + before(() => { + fsA.subscribe(topic) + }) + + after((done) => { + series([ + (cb) => fsA.stop(cb), + (cb) => nodeA.stop(cb) + ], done) + }) + + it('should emit to self on publish', async () => { + fsA.once(topic, (m) => shouldNotHappen) + + fsA.publish(topic, Buffer.from('hey')) + + // Wait 1 second to guarantee that self is not noticed + await new Promise((resolve) => setTimeout(() => resolve(), 1000)) + }) + }) +})