Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
feat: add strict signing validation for messages (#84
Browse files Browse the repository at this point in the history
BREAKING CHANGE: If messages are not being signed, this change will result in them being dropped. A previous release of floodsub added signing by default, but any Floodsub version older than v0.16.0 will have their messages dropped. This is inline with the latest go pubsub behavior.
  • Loading branch information
vasco-santos committed Jul 8, 2019
1 parent fb108d5 commit eed2bc5
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 11 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"debug": "^4.1.1",
"length-prefixed-stream": "^2.0.0",
"libp2p-crypto": "~0.16.1",
"libp2p-pubsub": "~0.1.0",
"libp2p-pubsub": "~0.2.0",
"protons": "^1.0.1",
"pull-length-prefixed": "^1.3.2",
"pull-pushable": "^2.2.0",
Expand Down
37 changes: 27 additions & 10 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ class FloodSub extends BaseProtocol {
)
}

/**
* Called for each RPC call received from the given peer
* @private
* @param {string} idB58Str b58 string PeerId of the connected peer
* @param {rpc.RPC} rpc The pubsub RPC message
*/
_onRpc (idB58Str, rpc) {
if (!rpc) {
return
Expand All @@ -86,7 +92,7 @@ class FloodSub extends BaseProtocol {
const msgs = rpc.msgs

if (msgs && msgs.length) {
this._processRpcMessages(utils.normalizeInRpcMessages(rpc.msgs))
rpc.msgs.forEach((msg) => this._processRpcMessage(msg))
}

if (subs && subs.length) {
Expand All @@ -98,20 +104,31 @@ class FloodSub extends BaseProtocol {
}
}

_processRpcMessages (msgs) {
msgs.forEach((msg) => {
const seqno = utils.msgId(msg.from, msg.seqno)
// 1. check if I've seen the message, if yes, ignore
if (this.seenCache.has(seqno)) {
/**
* @private
* @param {rpc.RPC.Message} message The message to process
* @returns {void}
*/
_processRpcMessage (message) {
const msg = utils.normalizeInRpcMessage(message)
const seqno = utils.msgId(msg.from, msg.seqno)
// 1. check if I've seen the message, if yes, ignore
if (this.seenCache.has(seqno)) {
return
}

this.seenCache.put(seqno)
// 2. validate the message (signature verification)
this.validate(message, (err, isValid) => {
if (err || !isValid) {
this.log('Message could not be validated, dropping it. isValid=%s', isValid, err)
return
}

this.seenCache.put(seqno)

// 2. emit to self
// 3. if message is valid, emit to self
this._emitMessages(msg.topicIDs, [msg])

// 3. propagate msg to others
// 4. if message is valid, propagate msg to others
this._forwardMessages(msg.topicIDs, [msg])
})
}
Expand Down
58 changes: 58 additions & 0 deletions test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const sinon = require('sinon')
const nextTick = require('async/nextTick')

const Floodsub = require('../src')
const { createNode } = require('./utils')
Expand Down Expand Up @@ -88,4 +89,61 @@ describe('pubsub', () => {
})
})
})

describe('validate', () => {
it('should drop unsigned messages', (done) => {
sinon.spy(floodsub, '_emitMessages')
sinon.spy(floodsub, '_forwardMessages')
sinon.spy(floodsub, 'validate')

const topic = 'my-topic'
const rpc = {
subscriptions: [],
msgs: [{
from: libp2p.peerInfo.id.id,
data: Buffer.from('an unsigned message'),
seqno: utils.randomSeqno(),
topicIDs: [topic]
}]
}

floodsub._onRpc('QmAnotherPeer', rpc)

nextTick(() => {
expect(floodsub.validate.callCount).to.eql(1)
expect(floodsub._emitMessages.called).to.eql(false)
expect(floodsub._forwardMessages.called).to.eql(false)

done()
})
})

it('should not drop unsigned messages if strict signing is disabled', (done) => {
sinon.spy(floodsub, '_emitMessages')
sinon.spy(floodsub, '_forwardMessages')
sinon.spy(floodsub, 'validate')
sinon.stub(floodsub, 'strictSigning').value(false)

const topic = 'my-topic'
const rpc = {
subscriptions: [],
msgs: [{
from: libp2p.peerInfo.id.id,
data: Buffer.from('an unsigned message'),
seqno: utils.randomSeqno(),
topicIDs: [topic]
}]
}

floodsub._onRpc('QmAnotherPeer', rpc)

nextTick(() => {
expect(floodsub.validate.callCount).to.eql(1)
expect(floodsub._emitMessages.called).to.eql(true)
expect(floodsub._forwardMessages.called).to.eql(true)

done()
})
})
})
})

0 comments on commit eed2bc5

Please sign in to comment.