Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
fix: update interface-pubsub and adjust topicValidator implementation (
Browse files Browse the repository at this point in the history
…#102)

Co-authored-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
mpetrunic and achingbrain authored Oct 11, 2022
1 parent d442edc commit f84d365
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 24 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@
"release": "aegir release"
},
"dependencies": {
"@libp2p/components": "^3.0.0",
"@libp2p/components": "^3.1.1",
"@libp2p/crypto": "^1.0.0",
"@libp2p/interface-connection": "^3.0.1",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-pubsub": "^2.0.0",
"@libp2p/interface-pubsub": "^3.0.0",
"@libp2p/interface-registrar": "^2.0.0",
"@libp2p/interfaces": "^3.0.2",
"@libp2p/logger": "^2.0.0",
Expand Down
7 changes: 6 additions & 1 deletion src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,10 @@ export const codes = {
/**
* Message expected to not have a `seqno`, but does
*/
ERR_UNEXPECTED_SEQNO: 'ERR_UNEXPECTED_SEQNO'
ERR_UNEXPECTED_SEQNO: 'ERR_UNEXPECTED_SEQNO',

/**
* Message failed topic validator
*/
ERR_TOPIC_VALIDATOR_REJECT: 'ERR_TOPIC_VALIDATOR_REJECT'
}
16 changes: 8 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ import {
import type { PeerId } from '@libp2p/interface-peer-id'
import type { IncomingStreamData } from '@libp2p/interface-registrar'
import type { Connection } from '@libp2p/interface-connection'
import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult } from '@libp2p/interface-pubsub'
import { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult, TopicValidatorFn, TopicValidatorResult } from '@libp2p/interface-pubsub'
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { Components, Initializable } from '@libp2p/components'
import type { Uint8ArrayList } from 'uint8arraylist'

const log = logger('libp2p:pubsub')

export interface TopicValidator { (topic: string, message: Message): Promise<void> }

/**
* PubSubBaseProtocol handles the peers and connections logic for pubsub routers
* and specifies the API that pubsub routers should have.
Expand Down Expand Up @@ -59,7 +57,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
* Keyed by topic
* Topic validators are functions with the following input:
*/
public topicValidators: Map<string, TopicValidator>
public topicValidators: Map<string, TopicValidatorFn>
public queue: Queue
public multicodecs: string[]
public components: Components = new Components()
Expand Down Expand Up @@ -420,7 +418,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P

// Ensure the message is valid before processing it
try {
await this.validate(msg)
await this.validate(from, msg)
} catch (err: any) {
log('Message is invalid, dropping it. %O', err)
return
Expand Down Expand Up @@ -524,7 +522,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
* Validates the given message. The signature will be checked for authenticity.
* Throws an error on invalid messages
*/
async validate (message: Message) { // eslint-disable-line require-await
async validate (from: PeerId, message: Message) { // eslint-disable-line require-await
const signaturePolicy = this.globalSignaturePolicy
switch (signaturePolicy) {
case 'StrictNoSign':
Expand Down Expand Up @@ -570,9 +568,11 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
}

const validatorFn = this.topicValidators.get(message.topic)

if (validatorFn != null) {
await validatorFn(message.topic, message)
const result = await validatorFn(from, message)
if (result === TopicValidatorResult.Reject || result === TopicValidatorResult.Ignore) {
throw errcode(new Error('Message validation failed'), codes.ERR_TOPIC_VALIDATOR_REJECT)
}
}
}

Expand Down
16 changes: 8 additions & 8 deletions test/message.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ describe('pubsub base messages', () => {

const signedMessage = await pubsub.buildMessage(message)

await expect(pubsub.validate(signedMessage)).to.eventually.not.be.rejected()
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.not.be.rejected()
})

it('validate with StrictNoSign will reject a message with from, signature, key, seqno present', async () => {
Expand All @@ -61,19 +61,19 @@ describe('pubsub base messages', () => {
}

sinon.stub(pubsub, 'globalSignaturePolicy').value('StrictNoSign')
await expect(pubsub.validate(signedMessage)).to.eventually.be.rejected()
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.be.rejected()
// @ts-expect-error this field is not optional
delete signedMessage.from
await expect(pubsub.validate(signedMessage)).to.eventually.be.rejected()
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.be.rejected()
// @ts-expect-error this field is not optional
delete signedMessage.signature
await expect(pubsub.validate(signedMessage)).to.eventually.be.rejected()
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.be.rejected()
// @ts-expect-error this field is not optional
delete signedMessage.key
await expect(pubsub.validate(signedMessage)).to.eventually.be.rejected()
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.be.rejected()
// @ts-expect-error this field is not optional
delete signedMessage.sequenceNumber
await expect(pubsub.validate(signedMessage)).to.eventually.not.be.rejected()
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.not.be.rejected()
})

it('validate with StrictNoSign will validate a message without a signature, key, and seqno', async () => {
Expand All @@ -87,7 +87,7 @@ describe('pubsub base messages', () => {
sinon.stub(pubsub, 'globalSignaturePolicy').value('StrictNoSign')

const signedMessage = await pubsub.buildMessage(message)
await expect(pubsub.validate(signedMessage)).to.eventually.not.be.rejected()
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.not.be.rejected()
})

it('validate with StrictSign requires a signature', async () => {
Expand All @@ -98,6 +98,6 @@ describe('pubsub base messages', () => {
topic: 'test-topic'
}

await expect(pubsub.validate(message)).to.be.rejectedWith(Error, 'Signing required and no signature was present')
await expect(pubsub.validate(peerId, message)).to.be.rejectedWith(Error, 'Signing required and no signature was present')
})
})
2 changes: 1 addition & 1 deletion test/pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ describe('pubsub base implementation', () => {
// Get the first message sent to _publish, and validate it
const signedMessage: Message = publishMessageSpy.getCall(0).lastArg

await expect(pubsub.validate(signedMessage)).to.eventually.be.undefined()
await expect(pubsub.validate(pubsub.components.getPeerId(), signedMessage)).to.eventually.be.undefined()
})

it('calls publishes messages twice', async () => {
Expand Down
8 changes: 4 additions & 4 deletions test/topic-validators.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import pWaitFor from 'p-wait-for'
import errCode from 'err-code'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
Expand All @@ -11,7 +10,7 @@ import {
PubsubImplementation
} from './utils/index.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PubSubRPC } from '@libp2p/interface-pubsub'
import { PubSubRPC, TopicValidatorResult } from '@libp2p/interface-pubsub'
import { Components } from '@libp2p/components'

const protocol = '/pubsub/1.0.0'
Expand Down Expand Up @@ -50,10 +49,11 @@ describe('topic validators', () => {
const peer = new PeerStreams({ id: otherPeerId, protocol: 'a-protocol' })

// Set a trivial topic validator
pubsub.topicValidators.set(filteredTopic, async (topic, message) => {
pubsub.topicValidators.set(filteredTopic, async (_otherPeerId, message) => {
if (!uint8ArrayEquals(message.data, uint8ArrayFromString('a message'))) {
throw errCode(new Error(), 'ERR_TOPIC_VALIDATOR_REJECT')
return TopicValidatorResult.Reject
}
return TopicValidatorResult.Accept
})

// valid case
Expand Down

0 comments on commit f84d365

Please sign in to comment.