Skip to content

Commit

Permalink
Merge pull request #17 from libp2p/feat/signing
Browse files Browse the repository at this point in the history
feat: add support for message signing
  • Loading branch information
vasco-santos authored May 7, 2019
2 parents dda1894 + 5cb17fd commit f1e1889
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 22 deletions.
32 changes: 30 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const errcode = require('err-code')

const Peer = require('./peer')
const message = require('./message')
const { signMessage } = require('./message/sign')
const utils = require('./utils')

const nextTick = require('async/nextTick')
Expand All @@ -22,17 +23,28 @@ class PubsubBaseProtocol extends EventEmitter {
* @param {String} debugName
* @param {String} multicodec
* @param {Object} libp2p libp2p implementation
* @param {Object} options
* @param {boolean} options.signMessages if messages should be signed, defaults to true
* @constructor
*/
constructor (debugName, multicodec, libp2p) {
constructor (debugName, multicodec, libp2p, options) {
super()

options = {
signMessages: true,
...options
}

this.log = debug(debugName)
this.log.err = debug(`${debugName}:error`)
this.multicodec = multicodec
this.libp2p = libp2p
this.started = false

if (options.signMessages) {
this.peerId = this.libp2p.peerInfo.id
}

/**
* Map of topics to which peers are subscribed to
*
Expand Down Expand Up @@ -225,16 +237,32 @@ class PubsubBaseProtocol extends EventEmitter {
this._removePeer(peer)
}

/**
* Normalizes the message and signs it, if signing is enabled
*
* @param {Message} message
* @param {function(Error, Message)} callback
*/
_buildMessage (message, callback) {
const msg = utils.normalizeOutRpcMessage(message)
if (this.peerId) {
signMessage(this.peerId, msg, callback)
} else {
nextTick(callback, null, msg)
}
}

/**
* Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation.
* For example, a Floodsub implementation might simply publish each message to each topic for every peer
* @abstract
* @param {Array<string>|string} topics
* @param {Array<any>|any} messages
* @param {function(Error)} callback
* @returns {undefined}
*
*/
publish (topics, messages) {
publish (topics, messages, callback) {
throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED')
}

Expand Down
4 changes: 4 additions & 0 deletions src/message/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
const protons = require('protons')

const rpcProto = protons(require('./rpc.proto.js'))
const RPC = rpcProto.RPC
const topicDescriptorProto = protons(require('./topic-descriptor.proto.js'))

exports = module.exports
exports.rpc = rpcProto
exports.td = topicDescriptorProto
exports.RPC = RPC
exports.Message = RPC.Message
exports.SubOpts = RPC.SubOpts
4 changes: 3 additions & 1 deletion src/message/rpc.proto.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ message RPC {
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4;
repeated string topicIDs = 4;
optional bytes signature = 5;
optional bytes key = 6;
}
}`
33 changes: 33 additions & 0 deletions src/message/sign.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

const { Message } = require('./index')
const SignPrefix = Buffer.from('libp2p-pubsub:')

module.exports.SignPrefix = SignPrefix

/**
* Signs the provided message with the given `peerId`
*
* @param {PeerId} peerId
* @param {Message} message
* @param {function(Error, Message)} callback
* @returns {void}
*/
module.exports.signMessage = function (peerId, message, callback) {
// Get the message in bytes, and prepend with the pubsub prefix
const bytes = Buffer.concat([
SignPrefix,
Message.encode(message)
])

// Sign the bytes with the private key
peerId.privKey.sign(bytes, (err, signature) => {
if (err) return callback(err)

callback(null, {
...message,
signature: signature,
key: peerId.pubKey.bytes
})
})
}
6 changes: 3 additions & 3 deletions src/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const pull = require('pull-stream')
const setImmediate = require('async/setImmediate')
const EventEmitter = require('events')

const rpc = require('./message').rpc.RPC
const { RPC } = require('./message')

/**
* The known state of a connected peer.
Expand Down Expand Up @@ -109,7 +109,7 @@ class Peer extends EventEmitter {
})
})

this.write(rpc.encode({
this.write(RPC.encode({
subscriptions: subs
}))
}
Expand Down Expand Up @@ -139,7 +139,7 @@ class Peer extends EventEmitter {
* @returns {undefined}
*/
sendMessages (msgs) {
this.write(rpc.encode({
this.write(RPC.encode({
msgs: msgs
}))
}
Expand Down
16 changes: 9 additions & 7 deletions src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,17 @@ exports.normalizeInRpcMessages = (messages) => {
})
}

exports.normalizeOutRpcMessage = (message) => {
const m = Object.assign({}, message)
if (typeof message.from === 'string' || message.from instanceof String) {
m.from = bs58.decode(message.from)
}
return m
}

exports.normalizeOutRpcMessages = (messages) => {
if (!messages) {
return messages
}
return messages.map((msg) => {
const m = Object.assign({}, msg)
if (typeof msg.from === 'string' || msg.from instanceof String) {
m.from = bs58.decode(msg.from)
}
return m
})
return messages.map(exports.normalizeOutRpcMessage)
}
44 changes: 35 additions & 9 deletions test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ const expect = chai.expect
const series = require('async/series')
const parallel = require('async/parallel')

const { Message } = require('../src/message')
const { SignPrefix } = require('../src/message/sign')
const PubsubBaseProtocol = require('../src')
const { randomSeqno, normalizeOutRpcMessage } = require('../src/utils')
const utils = require('./utils')
const createNode = utils.createNode

Expand Down Expand Up @@ -55,14 +58,7 @@ describe('pubsub base protocol', () => {
})
})

after((done) => {
parallel([
(cb) => nodeA.stop(cb),
(cb) => nodeB.stop(cb)
], done)
})

it('mount the pubsub protocol', (done) => {
before('mount the pubsub protocol', (done) => {
psA = new PubsubImplementation(nodeA)
psB = new PubsubImplementation(nodeB)

Expand All @@ -73,13 +69,20 @@ describe('pubsub base protocol', () => {
}, 50)
})

it('start both Pubsub', (done) => {
before('start both Pubsub', (done) => {
parallel([
(cb) => psA.start(cb),
(cb) => psB.start(cb)
], done)
})

after((done) => {
parallel([
(cb) => nodeA.stop(cb),
(cb) => nodeB.stop(cb)
], done)
})

it('Dial from nodeA to nodeB', (done) => {
series([
(cb) => nodeA.dial(nodeB.peerInfo, cb),
Expand All @@ -90,6 +93,29 @@ describe('pubsub base protocol', () => {
}, 1000)
], done)
})

it('_buildMessage normalizes and signs messages', (done) => {
const message = {
from: 'QmABC',
data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic']
}

psA._buildMessage(message, (err, signedMessage) => {
expect(err).to.not.exist()

const bytesToSign = Buffer.concat([
SignPrefix,
Message.encode(normalizeOutRpcMessage(message))
])

psA.peerId.pubKey.verify(bytesToSign, signedMessage.signature, (err, verified) => {
expect(verified).to.eql(true)
done(err)
})
})
})
})

describe('dial the pubsub protocol on mount', () => {
Expand Down
53 changes: 53 additions & 0 deletions test/sign.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 5] */
'use strict'

const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect

const { Message } = require('../src/message')
const { signMessage, SignPrefix } = require('../src/message/sign')
const PeerId = require('peer-id')
const { randomSeqno } = require('../src/utils')

describe('message signing', () => {
let peerId
before((done) => {
peerId = PeerId.create({
bits: 1024
}, (err, id) => {
peerId = id
done(err)
})
})

it('should be able to sign a message', (done) => {
const message = {
from: 'QmABC',
data: 'hello',
seqno: randomSeqno(),
topicIDs: ['test-topic']
}

const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)])

peerId.privKey.sign(bytesToSign, (err, expectedSignature) => {
if (err) return done(err)

signMessage(peerId, message, (err, signedMessage) => {
if (err) return done(err)

// Check the signature and public key
expect(signedMessage.signature).to.eql(expectedSignature)
expect(signedMessage.key).to.eql(peerId.pubKey.bytes)

// Verify the signature
peerId.pubKey.verify(bytesToSign, signedMessage.signature, (err, verified) => {
expect(verified).to.eql(true)
done(err)
})
})
})
})
})

0 comments on commit f1e1889

Please sign in to comment.