From eed2bc555ffb997eadc70f3558337f1f8ded5e29 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 8 Jul 2019 15:35:06 +0200 Subject: [PATCH] feat: add strict signing validation for messages (#84 BREAKING CHANGE: If messages are not being signed, this change will result in them being dropped. A previous release of floodsub added signing by default, but any Floodsub version older than v0.16.0 will have their messages dropped. This is inline with the latest go pubsub behavior. --- package.json | 2 +- src/index.js | 37 +++++++++++++++++++++-------- test/pubsub.spec.js | 58 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 11 deletions(-) diff --git a/package.json b/package.json index 5985597..b401c83 100644 --- a/package.json +++ b/package.json @@ -69,7 +69,7 @@ "debug": "^4.1.1", "length-prefixed-stream": "^2.0.0", "libp2p-crypto": "~0.16.1", - "libp2p-pubsub": "~0.1.0", + "libp2p-pubsub": "~0.2.0", "protons": "^1.0.1", "pull-length-prefixed": "^1.3.2", "pull-pushable": "^2.2.0", diff --git a/src/index.js b/src/index.js index f1668c6..18ab1e2 100644 --- a/src/index.js +++ b/src/index.js @@ -76,6 +76,12 @@ class FloodSub extends BaseProtocol { ) } + /** + * Called for each RPC call received from the given peer + * @private + * @param {string} idB58Str b58 string PeerId of the connected peer + * @param {rpc.RPC} rpc The pubsub RPC message + */ _onRpc (idB58Str, rpc) { if (!rpc) { return @@ -86,7 +92,7 @@ class FloodSub extends BaseProtocol { const msgs = rpc.msgs if (msgs && msgs.length) { - this._processRpcMessages(utils.normalizeInRpcMessages(rpc.msgs)) + rpc.msgs.forEach((msg) => this._processRpcMessage(msg)) } if (subs && subs.length) { @@ -98,20 +104,31 @@ class FloodSub extends BaseProtocol { } } - _processRpcMessages (msgs) { - msgs.forEach((msg) => { - const seqno = utils.msgId(msg.from, msg.seqno) - // 1. check if I've seen the message, if yes, ignore - if (this.seenCache.has(seqno)) { + /** + * @private + * @param {rpc.RPC.Message} message The message to process + * @returns {void} + */ + _processRpcMessage (message) { + const msg = utils.normalizeInRpcMessage(message) + const seqno = utils.msgId(msg.from, msg.seqno) + // 1. check if I've seen the message, if yes, ignore + if (this.seenCache.has(seqno)) { + return + } + + this.seenCache.put(seqno) + // 2. validate the message (signature verification) + this.validate(message, (err, isValid) => { + if (err || !isValid) { + this.log('Message could not be validated, dropping it. isValid=%s', isValid, err) return } - this.seenCache.put(seqno) - - // 2. emit to self + // 3. if message is valid, emit to self this._emitMessages(msg.topicIDs, [msg]) - // 3. propagate msg to others + // 4. if message is valid, propagate msg to others this._forwardMessages(msg.topicIDs, [msg]) }) } diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index ac43d64..e3c492a 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -6,6 +6,7 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const sinon = require('sinon') +const nextTick = require('async/nextTick') const Floodsub = require('../src') const { createNode } = require('./utils') @@ -88,4 +89,61 @@ describe('pubsub', () => { }) }) }) + + describe('validate', () => { + it('should drop unsigned messages', (done) => { + sinon.spy(floodsub, '_emitMessages') + sinon.spy(floodsub, '_forwardMessages') + sinon.spy(floodsub, 'validate') + + const topic = 'my-topic' + const rpc = { + subscriptions: [], + msgs: [{ + from: libp2p.peerInfo.id.id, + data: Buffer.from('an unsigned message'), + seqno: utils.randomSeqno(), + topicIDs: [topic] + }] + } + + floodsub._onRpc('QmAnotherPeer', rpc) + + nextTick(() => { + expect(floodsub.validate.callCount).to.eql(1) + expect(floodsub._emitMessages.called).to.eql(false) + expect(floodsub._forwardMessages.called).to.eql(false) + + done() + }) + }) + + it('should not drop unsigned messages if strict signing is disabled', (done) => { + sinon.spy(floodsub, '_emitMessages') + sinon.spy(floodsub, '_forwardMessages') + sinon.spy(floodsub, 'validate') + sinon.stub(floodsub, 'strictSigning').value(false) + + const topic = 'my-topic' + const rpc = { + subscriptions: [], + msgs: [{ + from: libp2p.peerInfo.id.id, + data: Buffer.from('an unsigned message'), + seqno: utils.randomSeqno(), + topicIDs: [topic] + }] + } + + floodsub._onRpc('QmAnotherPeer', rpc) + + nextTick(() => { + expect(floodsub.validate.callCount).to.eql(1) + expect(floodsub._emitMessages.called).to.eql(true) + expect(floodsub._forwardMessages.called).to.eql(true) + + done() + }) + }) + }) })