Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"libp2p-tcp": "^0.15.0",
"multiaddr": "^8.0.0",
"multibase": "^3.0.0",
"multihashes": "^3.0.1",
"p-defer": "^3.0.0",
"p-limit": "^2.3.0",
"p-wait-for": "^3.1.0",
Expand Down
39 changes: 29 additions & 10 deletions src/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ Table of Contents
* [Extend interface](#extend-interface)
* [Example](#example)
* [API](#api)
* [Constructor](#constructor)
* [new Pubsub(options)](#new-pubsuboptions)
* [Parameters](#parameters)
* [Start](#start)
* [pubsub.start()](#pubsubstart)
* [Returns](#returns)
Expand All @@ -19,24 +22,24 @@ Table of Contents
* [Returns](#returns-1)
* [Publish](#publish)
* [pubsub.publish(topics, message)](#pubsubpublishtopics-message)
* [Parameters](#parameters)
* [Parameters](#parameters-1)
* [Returns](#returns-2)
* [Subscribe](#subscribe)
* [pubsub.subscribe(topic)](#pubsubsubscribetopic)
* [Parameters](#parameters-1)
* [Parameters](#parameters-2)
* [Unsubscribe](#unsubscribe)
* [pubsub.unsubscribe(topic)](#pubsubunsubscribetopic)
* [Parameters](#parameters-2)
* [Parameters](#parameters-3)
* [Get Topics](#get-topics)
* [pubsub.getTopics()](#pubsubgettopics)
* [Returns](#returns-3)
* [Get Peers Subscribed to a topic](#get-peers-subscribed-to-a-topic)
* [pubsub.getSubscribers(topic)](#pubsubgetsubscriberstopic)
* [Parameters](#parameters-3)
* [Parameters](#parameters-4)
* [Returns](#returns-4)
* [Validate](#validate)
* [pubsub.validate(message)](#pubsubvalidatemessage)
* [Parameters](#parameters-4)
* [Parameters](#parameters-5)
* [Returns](#returns-5)
* [Test suite usage](#test-suite-usage)

Expand All @@ -49,7 +52,7 @@ You can check the following implementations as examples for building your own pu

## Interface usage

`interface-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections and streams, as well as the subscription management. This way, a pubsub implementation can focus on its message routing algorithm, instead of also needing to create the setup for it.
`interface-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections and streams, as well as the subscription management and the features describe in the libp2p [pubsub specs](https://github.com/libp2p/specs/tree/master/pubsub). This way, a pubsub implementation can focus on its message routing algorithm, instead of also needing to create the setup for it.

### Extend interface

Expand All @@ -74,16 +77,15 @@ All the remaining functions **MUST NOT** be overwritten.
The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic.

```JavaScript
const Pubsub = require('libp2p-pubsub')
const Pubsub = require('libp2p-interfaces/src/pubsub')

class PubsubImplementation extends Pubsub {
constructor({ libp2p, options })
super({
debugName: 'libp2p:pubsub',
multicodecs: '/pubsub-implementation/1.0.0',
libp2p,
signMessages: options.signMessages,
strictSigning: options.strictSigning
globalSigningPolicy: options.globalSigningPolicy
})
}

Expand All @@ -98,6 +100,23 @@ class PubsubImplementation extends Pubsub {

The interface aims to specify a common interface that all pubsub router implementation should follow. A pubsub router implementation should extend the [EventEmitter](https://nodejs.org/api/events.html#events_class_eventemitter). When peers receive pubsub messages, these messages will be emitted by the event emitter where the `eventName` will be the `topic` associated with the message.

### Constructor

The base class constructor configures the pubsub instance for use with a libp2p instance. It includes settings for logging, signature policies, etc.

#### `new Pubsub({options})`

##### Parameters

| Name | Type | Description | Default |
|------|------|-------------|---------|
| options.libp2p | `Libp2p` | libp2p instance | required, no default |
| options.debugName | `string` | log namespace | required, no default |
| options.multicodecs | `string \| Array<string>` | protocol identifier(s) | required, no default |
| options.globalSignaturePolicy | `'StrictSign' \| 'StrictNoSign'` | signature policy to be globally applied | `'StrictSign'` |
| options.canRelayMessage | `boolean` | if can relay messages if not subscribed | `false` |
| options.emitSelf | `boolean` | if `publish` should emit to self, if subscribed | `false` |

### Start

Starts the pubsub subsystem. The protocol will be registered to `libp2p`, which will result in pubsub being notified when peers who support the protocol connect/disconnect to `libp2p`.
Expand Down Expand Up @@ -185,7 +204,7 @@ Get a list of the [PeerId](https://github.com/libp2p/js-peer-id) strings that ar

### Validate

Validates the signature of a message.
Validates a message according to the signature policy and topic-specific validation function.

#### `pubsub.validate(message)`

Expand Down
42 changes: 41 additions & 1 deletion src/pubsub/errors.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,46 @@
'use strict'

exports.codes = {
/**
* Signature policy is invalid
*/
ERR_INVALID_SIGNATURE_POLICY: 'ERR_INVALID_SIGNATURE_POLICY',
/**
* Signature policy is unhandled
*/
ERR_UNHANDLED_SIGNATURE_POLICY: 'ERR_UNHANDLED_SIGNATURE_POLICY',

// Strict signing codes

/**
* Message expected to have a `signature`, but doesn't
*/
ERR_MISSING_SIGNATURE: 'ERR_MISSING_SIGNATURE',
ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE'
/**
* Message expected to have a `seqno`, but doesn't
*/
ERR_MISSING_SEQNO: 'ERR_MISSING_SEQNO',
/**
* Message `signature` is invalid
*/
ERR_INVALID_SIGNATURE: 'ERR_INVALID_SIGNATURE',

// Strict no-signing codes

/**
* Message expected to not have a `from`, but does
*/
ERR_UNEXPECTED_FROM: 'ERR_UNEXPECTED_FROM',
/**
* Message expected to not have a `signature`, but does
*/
ERR_UNEXPECTED_SIGNATURE: 'ERR_UNEXPECTED_SIGNATURE',
/**
* Message expected to not have a `key`, but does
*/
ERR_UNEXPECTED_KEY: 'ERR_UNEXPECTED_KEY',
/**
* Message expected to not have a `seqno`, but does
*/
ERR_UNEXPECTED_SEQNO: 'ERR_UNEXPECTED_SEQNO'
}
88 changes: 61 additions & 27 deletions src/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const { codes } = require('./errors')
*/
const message = require('./message')
const PeerStreams = require('./peer-streams')
const { SignaturePolicy } = require('./signature-policy')
const utils = require('./utils')

const {
Expand Down Expand Up @@ -44,8 +45,7 @@ class PubsubBaseProtocol extends EventEmitter {
* @param {String} props.debugName log namespace
* @param {Array<string>|string} props.multicodecs protocol identificers to connect
* @param {Libp2p} props.libp2p
* @param {boolean} [props.signMessages = true] if messages should be signed
* @param {boolean} [props.strictSigning = true] if message signing should be required
* @param {SignaturePolicy} [props.globalSignaturePolicy = SignaturePolicy.StrictSign] defines how signatures should be handled
* @param {boolean} [props.canRelayMessage = false] if can relay messages not subscribed
* @param {boolean} [props.emitSelf = false] if publish should emit to self, if subscribed
* @abstract
Expand All @@ -54,8 +54,7 @@ class PubsubBaseProtocol extends EventEmitter {
debugName,
multicodecs,
libp2p,
signMessages = true,
strictSigning = true,
globalSignaturePolicy = SignaturePolicy.StrictSign,
canRelayMessage = false,
emitSelf = false
}) {
Expand Down Expand Up @@ -109,14 +108,17 @@ class PubsubBaseProtocol extends EventEmitter {
*/
this.peers = new Map()

// Message signing
this.signMessages = signMessages
// validate signature policy
if (!SignaturePolicy[globalSignaturePolicy]) {
throw errcode(new Error('Invalid global signature policy'), codes.ERR_INVALID_SIGUATURE_POLICY)
}

/**
* If message signing should be required for incoming messages
* @type {boolean}
* The signature policy to follow by default
*
* @type {SignaturePolicy}
*/
this.strictSigning = strictSigning
this.globalSignaturePolicy = globalSignaturePolicy

/**
* If router can relay received messages, even if not subscribed
Expand Down Expand Up @@ -440,7 +442,15 @@ class PubsubBaseProtocol extends EventEmitter {
* @returns {Uint8Array} message id as bytes
*/
getMsgId (msg) {
return utils.msgId(msg.from, msg.seqno)
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case SignaturePolicy.StrictSign:
return utils.msgId(msg.from, msg.seqno)
case SignaturePolicy.StrictNoSign:
return utils.noSignMsgId(msg.data)
default:
throw errcode(new Error('Cannot get message id: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
}
}

/**
Expand Down Expand Up @@ -511,16 +521,36 @@ class PubsubBaseProtocol extends EventEmitter {
* @returns {Promise<void>}
*/
async validate (message) { // eslint-disable-line require-await
// If strict signing is on and we have no signature, abort
if (this.strictSigning && !message.signature) {
throw errcode(new Error('Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE)
}

// Check the message signature if present
if (message.signature && !(await verifySignature(message))) {
throw errcode(new Error('Invalid message signature'), codes.ERR_INVALID_SIGNATURE)
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case SignaturePolicy.StrictNoSign:
if (message.from) {
throw errcode(new Error('StrictNoSigning: from should not be present'), codes.ERR_UNEXPECTED_FROM)
}
if (message.signature) {
throw errcode(new Error('StrictNoSigning: signature should not be present'), codes.ERR_UNEXPECTED_SIGNATURE)
}
if (message.key) {
throw errcode(new Error('StrictNoSigning: key should not be present'), codes.ERR_UNEXPECTED_KEY)
}
if (message.seqno) {
throw errcode(new Error('StrictNoSigning: seqno should not be present'), codes.ERR_UNEXPECTED_SEQNO)
}
break
case SignaturePolicy.StrictSign:
if (!message.signature) {
throw errcode(new Error('StrictSigning: Signing required and no signature was present'), codes.ERR_MISSING_SIGNATURE)
}
if (!message.seqno) {
throw errcode(new Error('StrictSigning: Signing required and no seqno was present'), codes.ERR_MISSING_SEQNO)
}
if (!(await verifySignature(message))) {
throw errcode(new Error('StrictSigning: Invalid message signature'), codes.ERR_INVALID_SIGNATURE)
}
break
default:
throw errcode(new Error('Cannot validate message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
}

for (const topic of message.topicIDs) {
const validatorFn = this.topicValidators.get(topic)
if (!validatorFn) {
Expand All @@ -538,11 +568,16 @@ class PubsubBaseProtocol extends EventEmitter {
* @returns {Promise<Message>}
*/
_buildMessage (message) {
const msg = utils.normalizeOutRpcMessage(message)
if (this.signMessages) {
return signMessage(this.peerId, msg)
} else {
return message
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case SignaturePolicy.StrictSign:
message.from = this.peerId.toB58String()
message.seqno = utils.randomSeqno()
return signMessage(this.peerId, utils.normalizeOutRpcMessage(message))
case SignaturePolicy.StrictNoSign:
return message
default:
throw errcode(new Error('Cannot build message: unhandled signature policy: ' + signaturePolicy), codes.ERR_UNHANDLED_SIGNATURE_POLICY)
}
}

Expand Down Expand Up @@ -586,13 +621,11 @@ class PubsubBaseProtocol extends EventEmitter {
const from = this.peerId.toB58String()
let msgObject = {
receivedFrom: from,
from: from,
data: message,
seqno: utils.randomSeqno(),
topicIDs: [topic]
}

// ensure that any operations performed on the message will include the signature
// ensure that the message follows the signature policy
const outMsg = await this._buildMessage(msgObject)
msgObject = utils.normalizeInRpcMessage(outMsg)

Expand Down Expand Up @@ -666,3 +699,4 @@ class PubsubBaseProtocol extends EventEmitter {
module.exports = PubsubBaseProtocol
module.exports.message = message
module.exports.utils = utils
module.exports.SignaturePolicy = SignaturePolicy
28 changes: 28 additions & 0 deletions src/pubsub/signature-policy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict'

/**
* Enum for Signature Policy
* Details how message signatures are produced/consumed
*/
exports.SignaturePolicy = {
/**
* On the producing side:
* * Build messages with the signature, key (from may be enough for certain inlineable public key types), from and seqno fields.
*
* On the consuming side:
* * Enforce the fields to be present, reject otherwise.
* * Propagate only if the fields are valid and signature can be verified, reject otherwise.
*/
StrictSign: 'StrictSign',
/**
* On the producing side:
* * Build messages without the signature, key, from and seqno fields.
* * The corresponding protobuf key-value pairs are absent from the marshalled message, not just empty.
*
* On the consuming side:
* * Enforce the fields to be absent, reject otherwise.
* * Propagate only if the fields are absent, reject otherwise.
* * A message_id function will not be able to use the above fields, and should instead rely on the data field. A commonplace strategy is to calculate a hash.
*/
StrictNoSign: 'StrictNoSign'
}
10 changes: 10 additions & 0 deletions src/pubsub/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const randomBytes = require('libp2p-crypto/src/random-bytes')
const uint8ArrayToString = require('uint8arrays/to-string')
const uint8ArrayFromString = require('uint8arrays/from-string')
const PeerId = require('peer-id')
const multihash = require('multihashes')
exports = module.exports

/**
Expand Down Expand Up @@ -32,6 +33,15 @@ exports.msgId = (from, seqno) => {
return msgId
}

/**
* Generate a message id, based on message `data`.
*
* @param {Uint8Array} data
* @returns {Uint8Array}
* @private
*/
exports.noSignMsgId = (data) => multihash.encode(data, 'sha2')

/**
* Check if any member of the first set is also a member
* of the second set.
Expand Down
Loading