Skip to content

Commit

Permalink
feat: enable manual identify (#1784)
Browse files Browse the repository at this point in the history
In some applications, automatically dialing identify is not desired, and more manual control is useful.

To that end, this PR contains two edits that act together
- Add an additional identify init boolean option `runOnConnectionOpen`, which controls whether to listen to `'connection:open'` events to trigger identify
- Add a return value to the `identify` method

---------

Co-authored-by: chad <chad.nehemiah94@gmail.com>
  • Loading branch information
wemeetagain and maschad authored Jul 20, 2023
1 parent 7b5c54d commit 06f4901
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 50 deletions.
7 changes: 3 additions & 4 deletions interop/test/ping.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import { webTransport } from '@libp2p/webtransport'
import { type Multiaddr, multiaddr } from '@multiformats/multiaddr'
import { createLibp2p, type Libp2p, type Libp2pOptions } from 'libp2p'
import { circuitRelayTransport } from 'libp2p/circuit-relay'
import { identifyService } from 'libp2p/identify'
import { IdentifyService, identifyService } from 'libp2p/identify'
import { pingService, type PingService } from 'libp2p/ping'
import type { DefaultIdentifyService } from 'libp2p/identify/identify'

async function redisProxy (commands: any[]): Promise<any> {
const res = await fetch(`http://localhost:${process.env.proxyPort ?? ''}/`, { body: JSON.stringify(commands), method: 'POST' })
Expand All @@ -25,7 +24,7 @@ async function redisProxy (commands: any[]): Promise<any> {
return res.json()
}

let node: Libp2p<{ ping: PingService, identify: DefaultIdentifyService }>
let node: Libp2p<{ ping: PingService, identify: IdentifyService }>
const isDialer: boolean = process.env.is_dialer === 'true'
const timeoutSecs: string = process.env.test_timeout_secs ?? '180'

Expand All @@ -38,7 +37,7 @@ describe('ping test', () => {
const MUXER = process.env.muxer
const IP = process.env.ip ?? '0.0.0.0'

const options: Libp2pOptions<{ ping: PingService, identify: DefaultIdentifyService }> = {
const options: Libp2pOptions<{ ping: PingService, identify: IdentifyService }> = {
start: true,
connectionGater: {
denyDialMultiaddr: async () => false
Expand Down
23 changes: 14 additions & 9 deletions packages/libp2p/src/identify/identify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
MULTICODEC_IDENTIFY_PUSH_PROTOCOL_VERSION
} from './consts.js'
import { Identify } from './pb/message.js'
import type { IdentifyServiceComponents, IdentifyServiceInit } from './index.js'
import type { IdentifyService, IdentifyServiceComponents, IdentifyServiceInit } from './index.js'
import type { Libp2pEvents, IdentifyResult, SignedPeerRecord, AbortOptions } from '@libp2p/interface'
import type { Connection, Stream } from '@libp2p/interface/connection'
import type { EventEmitter } from '@libp2p/interface/events'
Expand All @@ -49,10 +49,11 @@ const defaultValues = {
maxPushIncomingStreams: 1,
maxPushOutgoingStreams: 1,
maxObservedAddresses: 10,
maxIdentifyMessageSize: 8192
maxIdentifyMessageSize: 8192,
runOnConnectionOpen: true
}

export class DefaultIdentifyService implements Startable {
export class DefaultIdentifyService implements Startable, IdentifyService {
private readonly identifyProtocolStr: string
private readonly identifyPushProtocolStr: string
public readonly host: {
Expand Down Expand Up @@ -100,11 +101,13 @@ export class DefaultIdentifyService implements Startable {
agentVersion: init.agentVersion ?? defaultValues.agentVersion
}

// When a new connection happens, trigger identify
components.events.addEventListener('connection:open', (evt) => {
const connection = evt.detail
this.identify(connection).catch(err => { log.error('error during identify trigged by connection:open', err) })
})
if (init.runOnConnectionOpen ?? defaultValues.runOnConnectionOpen) {
// When a new connection happens, trigger identify
components.events.addEventListener('connection:open', (evt) => {
const connection = evt.detail
this.identify(connection).catch(err => { log.error('error during identify trigged by connection:open', err) })
})
}

// When self peer record changes, trigger identify-push
components.events.addEventListener('self:peer:update', (evt) => {
Expand Down Expand Up @@ -296,7 +299,7 @@ export class DefaultIdentifyService implements Startable {
}
}

async identify (connection: Connection, options: AbortOptions = {}): Promise<void> {
async identify (connection: Connection, options: AbortOptions = {}): Promise<IdentifyResult> {
const message = await this._identify(connection, options)
const {
publicKey,
Expand Down Expand Up @@ -344,6 +347,8 @@ export class DefaultIdentifyService implements Startable {
}

this.events.safeDispatchEvent('peer:identify', { detail: result })

return result
}

/**
Expand Down
23 changes: 21 additions & 2 deletions packages/libp2p/src/identify/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import {
} from './consts.js'
import { DefaultIdentifyService } from './identify.js'
import { Identify } from './pb/message.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { AbortOptions, IdentifyResult, Libp2pEvents } from '@libp2p/interface'
import type { EventEmitter } from '@libp2p/interface/events'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { PeerStore } from '@libp2p/interface/peer-store'
import type { Connection } from '@libp2p/interface/src/connection/index.js'
import type { AddressManager } from '@libp2p/interface-internal/address-manager'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { Registrar } from '@libp2p/interface-internal/registrar'
Expand Down Expand Up @@ -39,6 +40,11 @@ export interface IdentifyServiceInit {
maxPushIncomingStreams?: number
maxPushOutgoingStreams?: number
maxObservedAddresses?: number

/**
* Whether to automatically dial identify on newly opened connections (default: true)
*/
runOnConnectionOpen?: boolean
}

export interface IdentifyServiceComponents {
Expand All @@ -60,6 +66,19 @@ export const multicodecs = {

export const Message = { Identify }

export function identifyService (init: IdentifyServiceInit = {}): (components: IdentifyServiceComponents) => DefaultIdentifyService {
export interface IdentifyService {
/**
* due to the default limits on inbound/outbound streams for this protocol,
* invoking this method when runOnConnectionOpen is true can lead to unpredictable results
* as streams may be closed by the local or the remote node.
* Please use with caution. If you find yourself needing to call this method to discover other peers that support your protocol,
* you may be better off configuring a topology to be notified instead.
*/
identify: (connection: Connection, options?: AbortOptions) => Promise<IdentifyResult>

push: () => Promise<void>
}

export function identifyService (init: IdentifyServiceInit = {}): (components: IdentifyServiceComponents) => IdentifyService {
return (components) => new DefaultIdentifyService(components, init)
}
49 changes: 45 additions & 4 deletions packages/libp2p/test/connection-manager/direct.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ import { stubInterface } from 'sinon-ts'
import { defaultComponents, type Components } from '../../src/components.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { codes as ErrorCodes } from '../../src/errors.js'
import { identifyService } from '../../src/identify/index.js'
import { type IdentifyService, identifyService } from '../../src/identify/index.js'
import { createLibp2p } from '../../src/index.js'
import { plaintext } from '../../src/insecure/index.js'
import { DefaultTransportManager } from '../../src/transport-manager.js'
import { createPeerId } from '../fixtures/creators/peer.js'
import type { DefaultIdentifyService } from '../../src/identify/identify.js'
import type { Libp2p } from '@libp2p/interface'
import type { Connection } from '@libp2p/interface/connection'
import type { PeerId } from '@libp2p/interface/peer-id'
Expand Down Expand Up @@ -342,7 +341,7 @@ describe('dialing (direct, WebSockets)', () => {
})

describe('libp2p.dialer (direct, WebSockets)', () => {
let libp2p: Libp2p<{ identify: unknown }>
let libp2p: Libp2p<{ identify: IdentifyService }>
let peerId: PeerId

beforeEach(async () => {
Expand Down Expand Up @@ -382,7 +381,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
throw new Error('Identify service missing')
}

const identifySpy = sinon.spy(libp2p.services.identify as DefaultIdentifyService, 'identify')
const identifySpy = sinon.spy(libp2p.services.identify, 'identify')
const peerStorePatchSpy = sinon.spy(libp2p.peerStore, 'patch')
const connectionPromise = pEvent(libp2p, 'connection:open')

Expand All @@ -402,6 +401,48 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
await libp2p.stop()
})

it('should not run identify automatically after connecting', async () => {
libp2p = await createLibp2p({
peerId,
transports: [
webSockets({
filter: filters.all
})
],
streamMuxers: [
yamux()
],
connectionEncryption: [
plaintext()
],
services: {
identify: identifyService({
runOnConnectionOpen: false
})
},
connectionGater: mockConnectionGater()
})

if (libp2p.services.identify == null) {
throw new Error('Identify service missing')
}

const identifySpy = sinon.spy(libp2p.services.identify, 'identify')
const connectionPromise = pEvent(libp2p, 'connection:open')

await libp2p.start()

const connection = await libp2p.dial(relayMultiaddr)
expect(connection).to.exist()

// Wait for connection event to be emitted
await connectionPromise

expect(identifySpy.callCount).to.equal(0)

await libp2p.stop()
})

it('should be able to use hangup to close connections', async () => {
libp2p = await createLibp2p({
peerId,
Expand Down
28 changes: 14 additions & 14 deletions packages/libp2p/test/identify/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ describe('identify', () => {
})

it('should be able to identify another peer', async () => {
const localIdentify = new DefaultIdentifyService(localComponents, defaultInit)
const remoteIdentify = new DefaultIdentifyService(remoteComponents, defaultInit)
const localIdentify = identifyService(defaultInit)(localComponents)
const remoteIdentify = identifyService(defaultInit)(remoteComponents)

await start(localIdentify)
await start(remoteIdentify)
Expand All @@ -128,8 +128,8 @@ describe('identify', () => {
})

it('should throw if identified peer is the wrong peer', async () => {
const localIdentify = new DefaultIdentifyService(localComponents, defaultInit)
const remoteIdentify = new DefaultIdentifyService(remoteComponents, defaultInit)
const localIdentify = identifyService(defaultInit)(localComponents)
const remoteIdentify = identifyService(defaultInit)(remoteComponents)

await start(localIdentify)
await start(remoteIdentify)
Expand Down Expand Up @@ -191,8 +191,8 @@ describe('identify', () => {
})

it('should time out during identify', async () => {
const localIdentify = new DefaultIdentifyService(localComponents, defaultInit)
const remoteIdentify = new DefaultIdentifyService(remoteComponents, defaultInit)
const localIdentify = identifyService(defaultInit)(localComponents)
const remoteIdentify = identifyService(defaultInit)(remoteComponents)

await start(localIdentify)
await start(remoteIdentify)
Expand Down Expand Up @@ -237,10 +237,10 @@ describe('identify', () => {
it('should limit incoming identify message sizes', async () => {
const deferred = pDefer()

const remoteIdentify = new DefaultIdentifyService(remoteComponents, {
const remoteIdentify = identifyService({
...defaultInit,
maxIdentifyMessageSize: 100
})
})(remoteComponents)
await start(remoteIdentify)

const identifySpy = sinon.spy(remoteIdentify, 'identify')
Expand Down Expand Up @@ -283,10 +283,10 @@ describe('identify', () => {
it('should time out incoming identify messages', async () => {
const deferred = pDefer()

const remoteIdentify = new DefaultIdentifyService(remoteComponents, {
const remoteIdentify = identifyService({
...defaultInit,
timeout: 100
})
})(remoteComponents)
await start(remoteIdentify)

const identifySpy = sinon.spy(remoteIdentify, 'identify')
Expand Down Expand Up @@ -336,8 +336,8 @@ describe('identify', () => {
})

it('should retain existing peer metadata', async () => {
const localIdentify = new DefaultIdentifyService(localComponents, defaultInit)
const remoteIdentify = new DefaultIdentifyService(remoteComponents, defaultInit)
const localIdentify = identifyService(defaultInit)(localComponents)
const remoteIdentify = identifyService(defaultInit)(remoteComponents)

await start(localIdentify)
await start(remoteIdentify)
Expand All @@ -363,8 +363,8 @@ describe('identify', () => {
})

it('should ignore older signed peer record', async () => {
const localIdentify = new DefaultIdentifyService(localComponents, defaultInit)
const remoteIdentify = new DefaultIdentifyService(remoteComponents, defaultInit)
const localIdentify = identifyService(defaultInit)(localComponents)
const remoteIdentify = identifyService(defaultInit)(remoteComponents)

await start(localIdentify)
await start(remoteIdentify)
Expand Down
13 changes: 6 additions & 7 deletions packages/libp2p/test/identify/push.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import {
MULTICODEC_IDENTIFY,
MULTICODEC_IDENTIFY_PUSH
} from '../../src/identify/consts.js'
import { DefaultIdentifyService } from '../../src/identify/identify.js'
import { identifyService, type IdentifyServiceInit } from '../../src/identify/index.js'
import { DefaultTransportManager } from '../../src/transport-manager.js'
import type { IdentifyServiceInit } from '../../src/identify/index.js'
import type { TransportManager } from '@libp2p/interface-internal/transport-manager'

const listenMaddrs = [multiaddr('/ip4/127.0.0.1/tcp/15002/ws')]
Expand Down Expand Up @@ -97,8 +96,8 @@ describe('identify (push)', () => {
})

it('should be able to push identify updates to another peer', async () => {
const localIdentify = new DefaultIdentifyService(localComponents, defaultInit)
const remoteIdentify = new DefaultIdentifyService(remoteComponents, defaultInit)
const localIdentify = identifyService(defaultInit)(localComponents)
const remoteIdentify = identifyService(defaultInit)(remoteComponents)

await start(localIdentify)
await start(remoteIdentify)
Expand Down Expand Up @@ -171,11 +170,11 @@ describe('identify (push)', () => {

it('should time out during push identify', async () => {
let streamEnded = false
const localIdentify = new DefaultIdentifyService(localComponents, {
const localIdentify = identifyService({
...defaultInit,
timeout: 10
})
const remoteIdentify = new DefaultIdentifyService(remoteComponents, defaultInit)
})(localComponents)
const remoteIdentify = identifyService(defaultInit)(remoteComponents)

await start(localIdentify)
await start(remoteIdentify)
Expand Down
19 changes: 9 additions & 10 deletions packages/libp2p/test/identify/service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,16 @@ import sinon from 'sinon'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { AGENT_VERSION } from '../../src/identify/consts.js'
import { identifyService } from '../../src/identify/index.js'
import { type IdentifyService, identifyService } from '../../src/identify/index.js'
import { createLibp2p } from '../../src/index.js'
import { createBaseOptions } from '../fixtures/base-options.browser.js'
import type { DefaultIdentifyService } from '../../src/identify/identify.js'
import type { Libp2p, IdentifyResult } from '@libp2p/interface'
import type { PeerId } from '@libp2p/interface/peer-id'

describe('identify', () => {
let peerId: PeerId
let libp2p: Libp2p<{ identify: unknown }>
let remoteLibp2p: Libp2p<{ identify: unknown }>
let libp2p: Libp2p<{ identify: IdentifyService }>
let remoteLibp2p: Libp2p<{ identify: IdentifyService }>
const remoteAddr = multiaddr(process.env.RELAY_MULTIADDR)

before(async () => {
Expand Down Expand Up @@ -55,7 +54,7 @@ describe('identify', () => {
throw new Error('Identity service was not configured')
}

const identityServiceIdentifySpy = sinon.spy(libp2p.services.identify as DefaultIdentifyService, 'identify')
const identityServiceIdentifySpy = sinon.spy(libp2p.services.identify, 'identify')

const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
Expand Down Expand Up @@ -110,7 +109,7 @@ describe('identify', () => {
throw new Error('Identity service was not configured')
}

const identityServiceIdentifySpy = sinon.spy(libp2p.services.identify as DefaultIdentifyService, 'identify')
const identityServiceIdentifySpy = sinon.spy(libp2p.services.identify, 'identify')

const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
Expand Down Expand Up @@ -143,8 +142,8 @@ describe('identify', () => {
throw new Error('Identity service was not configured')
}

const identityServiceIdentifySpy = sinon.spy(libp2p.services.identify as DefaultIdentifyService, 'identify')
const identityServicePushSpy = sinon.spy(libp2p.services.identify as DefaultIdentifyService, 'push')
const identityServiceIdentifySpy = sinon.spy(libp2p.services.identify, 'identify')
const identityServicePushSpy = sinon.spy(libp2p.services.identify, 'push')
const connectionPromise = pEvent(libp2p, 'connection:open')
const connection = await libp2p.dial(remoteAddr)

Expand Down Expand Up @@ -241,8 +240,8 @@ describe('identify', () => {
throw new Error('Identity service was not configured')
}

const identityServiceIdentifySpy = sinon.spy(libp2p.services.identify as DefaultIdentifyService, 'identify')
const identityServicePushSpy = sinon.spy(libp2p.services.identify as DefaultIdentifyService, 'push')
const identityServiceIdentifySpy = sinon.spy(libp2p.services.identify, 'identify')
const identityServicePushSpy = sinon.spy(libp2p.services.identify, 'push')
const connectionPromise = pEvent(libp2p, 'connection:open')
const connection = await libp2p.dial(remoteAddr)

Expand Down

0 comments on commit 06f4901

Please sign in to comment.