Skip to content

Commit f84d365

Browse files
fix: update interface-pubsub and adjust topicValidator implementation (#102)
Co-authored-by: achingbrain <alex@achingbrain.net>
1 parent d442edc commit f84d365

6 files changed

+29
-24
lines changed

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,11 @@
176176
"release": "aegir release"
177177
},
178178
"dependencies": {
179-
"@libp2p/components": "^3.0.0",
179+
"@libp2p/components": "^3.1.1",
180180
"@libp2p/crypto": "^1.0.0",
181181
"@libp2p/interface-connection": "^3.0.1",
182182
"@libp2p/interface-peer-id": "^1.0.2",
183-
"@libp2p/interface-pubsub": "^2.0.0",
183+
"@libp2p/interface-pubsub": "^3.0.0",
184184
"@libp2p/interface-registrar": "^2.0.0",
185185
"@libp2p/interfaces": "^3.0.2",
186186
"@libp2p/logger": "^2.0.0",

src/errors.ts

+6-1
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,10 @@ export const codes = {
4949
/**
5050
* Message expected to not have a `seqno`, but does
5151
*/
52-
ERR_UNEXPECTED_SEQNO: 'ERR_UNEXPECTED_SEQNO'
52+
ERR_UNEXPECTED_SEQNO: 'ERR_UNEXPECTED_SEQNO',
53+
54+
/**
55+
* Message failed topic validator
56+
*/
57+
ERR_TOPIC_VALIDATOR_REJECT: 'ERR_TOPIC_VALIDATOR_REJECT'
5358
}

src/index.ts

+8-8
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@ import {
1414
import type { PeerId } from '@libp2p/interface-peer-id'
1515
import type { IncomingStreamData } from '@libp2p/interface-registrar'
1616
import type { Connection } from '@libp2p/interface-connection'
17-
import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult } from '@libp2p/interface-pubsub'
17+
import { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult, TopicValidatorFn, TopicValidatorResult } from '@libp2p/interface-pubsub'
1818
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
1919
import { Components, Initializable } from '@libp2p/components'
2020
import type { Uint8ArrayList } from 'uint8arraylist'
2121

2222
const log = logger('libp2p:pubsub')
2323

24-
export interface TopicValidator { (topic: string, message: Message): Promise<void> }
25-
2624
/**
2725
* PubSubBaseProtocol handles the peers and connections logic for pubsub routers
2826
* and specifies the API that pubsub routers should have.
@@ -59,7 +57,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
5957
* Keyed by topic
6058
* Topic validators are functions with the following input:
6159
*/
62-
public topicValidators: Map<string, TopicValidator>
60+
public topicValidators: Map<string, TopicValidatorFn>
6361
public queue: Queue
6462
public multicodecs: string[]
6563
public components: Components = new Components()
@@ -420,7 +418,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
420418

421419
// Ensure the message is valid before processing it
422420
try {
423-
await this.validate(msg)
421+
await this.validate(from, msg)
424422
} catch (err: any) {
425423
log('Message is invalid, dropping it. %O', err)
426424
return
@@ -524,7 +522,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
524522
* Validates the given message. The signature will be checked for authenticity.
525523
* Throws an error on invalid messages
526524
*/
527-
async validate (message: Message) { // eslint-disable-line require-await
525+
async validate (from: PeerId, message: Message) { // eslint-disable-line require-await
528526
const signaturePolicy = this.globalSignaturePolicy
529527
switch (signaturePolicy) {
530528
case 'StrictNoSign':
@@ -570,9 +568,11 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
570568
}
571569

572570
const validatorFn = this.topicValidators.get(message.topic)
573-
574571
if (validatorFn != null) {
575-
await validatorFn(message.topic, message)
572+
const result = await validatorFn(from, message)
573+
if (result === TopicValidatorResult.Reject || result === TopicValidatorResult.Ignore) {
574+
throw errcode(new Error('Message validation failed'), codes.ERR_TOPIC_VALIDATOR_REJECT)
575+
}
576576
}
577577
}
578578

test/message.spec.ts

+8-8
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ describe('pubsub base messages', () => {
4141

4242
const signedMessage = await pubsub.buildMessage(message)
4343

44-
await expect(pubsub.validate(signedMessage)).to.eventually.not.be.rejected()
44+
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.not.be.rejected()
4545
})
4646

4747
it('validate with StrictNoSign will reject a message with from, signature, key, seqno present', async () => {
@@ -61,19 +61,19 @@ describe('pubsub base messages', () => {
6161
}
6262

6363
sinon.stub(pubsub, 'globalSignaturePolicy').value('StrictNoSign')
64-
await expect(pubsub.validate(signedMessage)).to.eventually.be.rejected()
64+
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.be.rejected()
6565
// @ts-expect-error this field is not optional
6666
delete signedMessage.from
67-
await expect(pubsub.validate(signedMessage)).to.eventually.be.rejected()
67+
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.be.rejected()
6868
// @ts-expect-error this field is not optional
6969
delete signedMessage.signature
70-
await expect(pubsub.validate(signedMessage)).to.eventually.be.rejected()
70+
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.be.rejected()
7171
// @ts-expect-error this field is not optional
7272
delete signedMessage.key
73-
await expect(pubsub.validate(signedMessage)).to.eventually.be.rejected()
73+
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.be.rejected()
7474
// @ts-expect-error this field is not optional
7575
delete signedMessage.sequenceNumber
76-
await expect(pubsub.validate(signedMessage)).to.eventually.not.be.rejected()
76+
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.not.be.rejected()
7777
})
7878

7979
it('validate with StrictNoSign will validate a message without a signature, key, and seqno', async () => {
@@ -87,7 +87,7 @@ describe('pubsub base messages', () => {
8787
sinon.stub(pubsub, 'globalSignaturePolicy').value('StrictNoSign')
8888

8989
const signedMessage = await pubsub.buildMessage(message)
90-
await expect(pubsub.validate(signedMessage)).to.eventually.not.be.rejected()
90+
await expect(pubsub.validate(peerId, signedMessage)).to.eventually.not.be.rejected()
9191
})
9292

9393
it('validate with StrictSign requires a signature', async () => {
@@ -98,6 +98,6 @@ describe('pubsub base messages', () => {
9898
topic: 'test-topic'
9999
}
100100

101-
await expect(pubsub.validate(message)).to.be.rejectedWith(Error, 'Signing required and no signature was present')
101+
await expect(pubsub.validate(peerId, message)).to.be.rejectedWith(Error, 'Signing required and no signature was present')
102102
})
103103
})

test/pubsub.spec.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ describe('pubsub base implementation', () => {
7272
// Get the first message sent to _publish, and validate it
7373
const signedMessage: Message = publishMessageSpy.getCall(0).lastArg
7474

75-
await expect(pubsub.validate(signedMessage)).to.eventually.be.undefined()
75+
await expect(pubsub.validate(pubsub.components.getPeerId(), signedMessage)).to.eventually.be.undefined()
7676
})
7777

7878
it('calls publishes messages twice', async () => {

test/topic-validators.spec.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { expect } from 'aegir/chai'
22
import sinon from 'sinon'
33
import pWaitFor from 'p-wait-for'
4-
import errCode from 'err-code'
54
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
65
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
76
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
@@ -11,7 +10,7 @@ import {
1110
PubsubImplementation
1211
} from './utils/index.js'
1312
import type { PeerId } from '@libp2p/interface-peer-id'
14-
import type { PubSubRPC } from '@libp2p/interface-pubsub'
13+
import { PubSubRPC, TopicValidatorResult } from '@libp2p/interface-pubsub'
1514
import { Components } from '@libp2p/components'
1615

1716
const protocol = '/pubsub/1.0.0'
@@ -50,10 +49,11 @@ describe('topic validators', () => {
5049
const peer = new PeerStreams({ id: otherPeerId, protocol: 'a-protocol' })
5150

5251
// Set a trivial topic validator
53-
pubsub.topicValidators.set(filteredTopic, async (topic, message) => {
52+
pubsub.topicValidators.set(filteredTopic, async (_otherPeerId, message) => {
5453
if (!uint8ArrayEquals(message.data, uint8ArrayFromString('a message'))) {
55-
throw errCode(new Error(), 'ERR_TOPIC_VALIDATOR_REJECT')
54+
return TopicValidatorResult.Reject
5655
}
56+
return TopicValidatorResult.Accept
5757
})
5858

5959
// valid case

0 commit comments

Comments
 (0)