Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

feat: optional self emit #85

Merged
merged 2 commits into from
Jul 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,18 @@ Floodsub emits two kinds of events:

## API

See https://libp2p.github.io/js-libp2p-floodsub
### Create a floodsub implementation

```js
const options = {…}
const floodsub = new Floodsub(libp2pNode, options)
```

Options is an optional object with the following key-value pairs:

* **`emitSelf`**: boolean identifying whether the node should emit to self on publish, in the event of the topic being subscribed (defaults to **true**).

For more, see https://libp2p.github.io/js-libp2p-floodsub

## Contribute

Expand Down
18 changes: 14 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,27 @@ const noop = () => {}
*/
class FloodSub extends BaseProtocol {
/**
* @param {Object} libp2p
* @param {Object} libp2p an instance of Libp2p
* @param {Object} [options]
* @param {boolean} options.emitSelf if publish should emit to self, if subscribed, defaults to true
* @constructor
*/
constructor (libp2p) {
constructor (libp2p, options = {}) {
super('libp2p:floodsub', multicodec, libp2p)

/**
* List of our subscriptions
* @type {Set<string>}
*/
this.subscriptions = new Set()

/**
* Pubsub options
*/
this._options = {
emitSelf: true,
...options
}
}

/**
Expand Down Expand Up @@ -203,8 +213,8 @@ class FloodSub extends BaseProtocol {
topicIDs: topics
}

// Emit to self if I'm interested
this._emitMessages(topics, [message])
// Emit to self if I'm interested and it is enabled
this._options.emitSelf && this._emitMessages(topics, [message])

this._buildMessage(message, cb)
}
Expand Down
100 changes: 100 additions & 0 deletions test/emit-self.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 5] */
'use strict'

const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-spies'))
const expect = chai.expect
const series = require('async/series')

const FloodSub = require('../src')

const {
createNode
} = require('./utils')

const shouldNotHappen = (_) => expect.fail()

describe('emit self', () => {
const topic = 'Z'

describe('enabled', () => {
let nodeA
let fsA

before((done) => {
createNode((err, node) => {
if (err) {
return done(err)
}
nodeA = node
nodeA.start(done)
})
})

before((done) => {
fsA = new FloodSub(nodeA, { emitSelf: true })
fsA.start(done)
})

before(() => {
fsA.subscribe(topic)
})

after((done) => {
series([
(cb) => fsA.stop(cb),
(cb) => nodeA.stop(cb)
], done)
})

it('should emit to self on publish', async () => {
const promise = new Promise((resolve) => fsA.once(topic, resolve))

fsA.publish(topic, Buffer.from('hey'))

await promise
})
})

describe('disabled', () => {
let nodeA
let fsA

before((done) => {
createNode((err, node) => {
if (err) {
return done(err)
}
nodeA = node
nodeA.start(done)
})
})

before((done) => {
fsA = new FloodSub(nodeA, { emitSelf: false })
fsA.start(done)
})

before(() => {
fsA.subscribe(topic)
})

after((done) => {
series([
(cb) => fsA.stop(cb),
(cb) => nodeA.stop(cb)
], done)
})

it('should emit to self on publish', async () => {
fsA.once(topic, (m) => shouldNotHappen)

fsA.publish(topic, Buffer.from('hey'))

// Wait 1 second to guarantee that self is not noticed
await new Promise((resolve) => setTimeout(() => resolve(), 1000))
})
})
})