Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
fix: add multistream-select and update pubsub types (#170)
Browse files Browse the repository at this point in the history
Pubsub only uses the registrar and the peerid, so just pass those
in instead of a whole libp2p instance.
  • Loading branch information
achingbrain authored Feb 17, 2022
1 parent f408312 commit b9ecb2b
Show file tree
Hide file tree
Showing 60 changed files with 2,475 additions and 559 deletions.
10 changes: 6 additions & 4 deletions packages/libp2p-interface-compliance-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
"import": "./dist/src/crypto/index.js",
"types": "./dist/src/crypto/index.d.ts"
},
"./mocks": {
"import": "./dist/src/mocks/index.js",
"types": "./dist/src/mocks/index.d.ts"
},
"./peer-discovery": {
"import": "./dist/src/peer-discovery/index.js",
"types": "./dist/src/peer-discovery/index.d.ts"
Expand Down Expand Up @@ -88,10 +92,6 @@
"import": "./dist/src/transport/utils/index.js",
"types": "./dist/src/transport/utils/index.d.ts"
},
"./mocks": {
"import": "./dist/src/mocks/index.js",
"types": "./dist/src/mocks/index.d.ts"
},
"./utils/peers": {
"import": "./dist/src/utils/peers.js",
"types": "./dist/src/utils/peers.d.ts"
Expand Down Expand Up @@ -200,6 +200,8 @@
"dependencies": {
"@libp2p/crypto": "^0.22.2",
"@libp2p/interfaces": "^1.0.0",
"@libp2p/logger": "^1.0.3",
"@libp2p/multistream-select": "^0.0.0",
"@libp2p/peer-id": "^1.0.0",
"@libp2p/peer-id-factory": "^1.0.0",
"@libp2p/pubsub": "^1.1.0",
Expand Down
95 changes: 65 additions & 30 deletions packages/libp2p-interface-compliance-tests/src/mocks/connection.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { peerIdFromString } from '@libp2p/peer-id'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { pipe } from 'it-pipe'
import { duplexPair } from 'it-pair/duplex'
import type { MultiaddrConnection } from '@libp2p/interfaces/transport'
Expand All @@ -9,22 +8,67 @@ import type { Duplex } from 'it-stream-types'
import { mockMuxer } from './muxer.js'
import type { PeerId } from '@libp2p/interfaces/src/peer-id'
import { mockMultiaddrConnection } from './multiaddr-connection.js'
import { Multiaddr } from '@multiformats/multiaddr'
import type { Registrar } from '@libp2p/interfaces/registrar'
import { mockRegistrar } from './registrar.js'
import { Listener } from '@libp2p/multistream-select'
import { logger } from '@libp2p/logger'
import { CustomEvent } from '@libp2p/interfaces'

export async function mockConnection (maConn: MultiaddrConnection, direction: 'inbound' | 'outbound' = 'inbound', muxer?: Muxer): Promise<Connection> {
const log = logger('libp2p:mock-connection')

export interface MockConnectionOptions {
direction?: 'inbound' | 'outbound'
muxer?: Muxer
registrar?: Registrar
}

export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectionOptions = {}): Connection {
const remoteAddr = maConn.remoteAddr
const remotePeerIdStr = remoteAddr.getPeerId()
const remotePeer = remotePeerIdStr != null ? peerIdFromString(remotePeerIdStr) : await createEd25519PeerId()

if (remotePeerIdStr == null) {
throw new Error('Remote multiaddr must contain a peer id')
}

const remotePeer = peerIdFromString(remotePeerIdStr)
const registry = new Map()
const streams: Stream[] = []
let streamId = 0
const mux = muxer ?? mockMuxer()
const direction = opts.direction ?? 'inbound'
const registrar = opts.registrar ?? mockRegistrar()

const muxer = opts.muxer ?? mockMuxer({
onStream: (muxedStream) => {
const mss = new Listener(muxedStream)
try {
mss.handle(registrar.getProtocols())
.then(({ stream, protocol }) => {
log('%s: incoming stream opened on %s', direction, protocol)
muxedStream = { ...muxedStream, ...stream }

connection.addStream(muxedStream, { protocol, metadata: {} })
const handler = registrar.getHandler(protocol)

handler(new CustomEvent('incomingStream', {
detail: { connection, stream: muxedStream, protocol }
}))
}).catch(err => {
log.error(err)
})
} catch (err: any) {
log.error(err)
}
},
onStreamEnd: (stream) => {
connection.removeStream(stream.id)
}
})

void pipe(
maConn, mux, maConn
maConn, muxer, maConn
)

return {
const connection: Connection = {
id: 'mock-connection',
remoteAddr,
remotePeer,
Expand All @@ -48,7 +92,7 @@ export async function mockConnection (maConn: MultiaddrConnection, direction: 'i
}

const id = `${streamId++}`
const stream: Stream = mux.newStream(id)
const stream: Stream = muxer.newStream(id)
const streamData: ProtocolStream = {
protocol: protocols[0],
stream
Expand All @@ -68,6 +112,8 @@ export async function mockConnection (maConn: MultiaddrConnection, direction: 'i
await maConn.close()
}
}

return connection
}

export function mockStream (stream: Duplex<Uint8Array>): Stream {
Expand All @@ -83,26 +129,15 @@ export function mockStream (stream: Duplex<Uint8Array>): Stream {
}
}

export async function connectionPair (peerA: PeerId, peerB: PeerId): Promise<[ Connection, Connection ]> {
const [d0, d1] = duplexPair<Uint8Array>()

return [{
...await mockConnection(mockMultiaddrConnection({
...d0,
remoteAddr: new Multiaddr(`/ip4/127.0.0.1/tcp/4001/p2p/${peerA.toString()}`)
})),
newStream: async (multicodecs: string[]) => await Promise.resolve({
stream: mockStream(d0),
protocol: multicodecs[0]
})
}, {
...await mockConnection(mockMultiaddrConnection({
...d1,
remoteAddr: new Multiaddr(`/ip4/127.0.0.1/tcp/4001/p2p/${peerB.toString()}`)
})),
newStream: async (multicodecs: string[]) => await Promise.resolve({
stream: mockStream(d1),
protocol: multicodecs[0]
})
}]
export function connectionPair (peerA: PeerId, peerB: PeerId): [ Connection, Connection ] {
const [peerBtoPeerA, peerAtoPeerB] = duplexPair<Uint8Array>()

return [
mockConnection(
mockMultiaddrConnection(peerBtoPeerA, peerA)
),
mockConnection(
mockMultiaddrConnection(peerAtoPeerB, peerB)
)
]
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import { Multiaddr } from '@multiformats/multiaddr'
import type { MultiaddrConnection } from '@libp2p/interfaces/transport'
import type { Duplex } from 'it-stream-types'
import type { PeerId } from '@libp2p/interfaces/peer-id'

export function mockMultiaddrConnection (source: Duplex<Uint8Array> & Partial<MultiaddrConnection>): MultiaddrConnection {
export function mockMultiaddrConnection (source: Duplex<Uint8Array> & Partial<MultiaddrConnection>, peerId: PeerId): MultiaddrConnection {
const maConn: MultiaddrConnection = {
async close () {

},
timeline: {
open: Date.now()
},
remoteAddr: new Multiaddr('/ip4/127.0.0.1/tcp/4001'),
remoteAddr: new Multiaddr(`/ip4/127.0.0.1/tcp/4001/p2p/${peerId.toString()}`),
...source
}

Expand Down
70 changes: 63 additions & 7 deletions packages/libp2p-interface-compliance-tests/src/mocks/registrar.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,41 @@
import type { Registrar, StreamHandler } from '@libp2p/interfaces/registrar'
import type { IncomingStreamData, Registrar, StreamHandler } from '@libp2p/interfaces/registrar'
import type { Connection } from '@libp2p/interfaces/src/connection'
import type { PeerId } from '@libp2p/interfaces/src/peer-id'
import type { Topology } from '@libp2p/interfaces/topology'
import { connectionPair } from './connection.js'
import { CustomEvent } from '@libp2p/interfaces'

export class MockRegistrar implements Registrar {
private readonly topologies: Map<string, { topology: Topology, protocols: string[] }> = new Map()
private readonly handlers: Map<string, { handler: StreamHandler, protocols: string[] }> = new Map()

getProtocols () {
const protocols = new Set<string>()

for (const topology of this.topologies.values()) {
topology.protocols.forEach(protocol => protocols.add(protocol))
}

for (const handler of this.handlers.values()) {
handler.protocols.forEach(protocol => protocols.add(protocol))
}

return Array.from(protocols).sort()
}

async handle (protocols: string | string[], handler: StreamHandler) {
if (!Array.isArray(protocols)) {
protocols = [protocols]
}

for (const protocol of protocols) {
for (const { protocols } of this.handlers.values()) {
if (protocols.includes(protocol)) {
throw new Error(`Handler already registered for protocol ${protocol}`)
}
}
}

const id = `handler-id-${Math.random()}`

this.handlers.set(id, {
Expand All @@ -24,16 +50,14 @@ export class MockRegistrar implements Registrar {
this.handlers.delete(id)
}

getHandlers (protocol: string) {
const output: StreamHandler[] = []

getHandler (protocol: string) {
for (const { handler, protocols } of this.handlers.values()) {
if (protocols.includes(protocol)) {
output.push(handler)
return handler
}
}

return output
throw new Error(`No handler registered for protocol ${protocol}`)
}

register (protocols: string | string[], topology: Topology) {
Expand Down Expand Up @@ -68,10 +92,42 @@ export class MockRegistrar implements Registrar {
}
}

return output
if (output.length > 0) {
return output
}

throw new Error(`No topologies registered for protocol ${protocol}`)
}
}

export function mockRegistrar () {
return new MockRegistrar()
}

export async function mockIncomingStreamEvent (protocol: string, conn: Connection, remotePeer: PeerId): Promise<CustomEvent<IncomingStreamData>> {
// @ts-expect-error incomplete implementation
return new CustomEvent('incomingStream', {
detail: {
...await conn.newStream([protocol]),
connection: {
remotePeer
}
}
})
}

export async function connectPeers (protocol: string, registrarA: Registrar, registrarB: Registrar, peerIdA: PeerId, peerIdB: PeerId) {
const topologyA = registrarA.getTopologies(protocol)[0]
const topologyB = registrarB.getTopologies(protocol)[0]
// const handlerA = registrarA.getHandler(protocol)
// const handlerB = registrarB.getHandler(protocol)

// Notify peers of connection
const [bToA, aToB] = connectionPair(peerIdA, peerIdB)

await topologyA.onConnect(peerIdB, aToB)
// await handlerA(await mockIncomingStreamEvent(protocol, aToB, peerIdB))

await topologyB.onConnect(peerIdA, bToA)
// await handlerB(await mockIncomingStreamEvent(protocol, bToA, peerIdA))
}
10 changes: 8 additions & 2 deletions packages/libp2p-interface-compliance-tests/src/mocks/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ export function mockUpgrader (options: MockUpgraderOptions = {}) {
const upgrader: Upgrader = {
async upgradeOutbound (multiaddrConnection) {
ensureProps(multiaddrConnection)
return await mockConnection(multiaddrConnection, 'outbound', options.muxer)
return mockConnection(multiaddrConnection, {
direction: 'outbound',
muxer: options.muxer
})
},
async upgradeInbound (multiaddrConnection) {
ensureProps(multiaddrConnection)
return await mockConnection(multiaddrConnection, 'inbound', options.muxer)
return mockConnection(multiaddrConnection, {
direction: 'inbound',
muxer: options.muxer
})
}
}

Expand Down
28 changes: 19 additions & 9 deletions packages/libp2p-interface-compliance-tests/src/pubsub/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,29 @@ import pDefer from 'p-defer'
import pWaitFor from 'p-wait-for'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import type { TestSetup } from '../index.js'
import type { PubSub } from '@libp2p/interfaces/pubsub'
import type { PubSub, PubSubOptions } from '@libp2p/interfaces/pubsub'
import type { EventMap } from './index.js'
import type { Registrar } from '@libp2p/interfaces/src/registrar'
import { mockRegistrar } from '../mocks/registrar.js'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'

const topic = 'foo'
const data = uint8ArrayFromString('bar')

export default (common: TestSetup<PubSub<EventMap>>) => {
export default (common: TestSetup<PubSub<EventMap>, PubSubOptions>) => {
describe('pubsub api', () => {
let pubsub: PubSub<EventMap>
let registrar: Registrar

// Create pubsub router
beforeEach(async () => {
pubsub = await common.setup()
registrar = mockRegistrar()

pubsub = await common.setup({
peerId: await createEd25519PeerId(),
registrar,
emitSelf: true
})
})

afterEach(async () => {
Expand All @@ -26,22 +36,22 @@ export default (common: TestSetup<PubSub<EventMap>>) => {
})

it('can start correctly', async () => {
sinon.spy(pubsub.registrar, 'register')
sinon.spy(registrar, 'register')

await pubsub.start()

expect(pubsub.started).to.eql(true)
expect(pubsub.registrar.register).to.have.property('callCount', 1)
expect(pubsub.isStarted()).to.equal(true)
expect(registrar.register).to.have.property('callCount', 1)
})

it('can stop correctly', async () => {
sinon.spy(pubsub.registrar, 'unregister')
sinon.spy(registrar, 'unregister')

await pubsub.start()
await pubsub.stop()

expect(pubsub.started).to.eql(false)
expect(pubsub.registrar.unregister).to.have.property('callCount', 1)
expect(pubsub.isStarted()).to.equal(false)
expect(registrar.unregister).to.have.property('callCount', 1)
})

it('can subscribe and unsubscribe correctly', async () => {
Expand Down
Loading

0 comments on commit b9ecb2b

Please sign in to comment.