Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check for connection status before storing #2732

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions packages/libp2p/src/connection-manager/connection-pruner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,29 @@
this.peerStore = components.peerStore
this.events = components.events
this.log = components.logger.forComponent('libp2p:connection-manager:connection-pruner')
this.maybePruneConnections = this.maybePruneConnections.bind(this)
}

// check the max connection limit whenever a peer connects
components.events.addEventListener('connection:open', () => {
this.maybePruneConnections()
.catch(err => {
this.log.error(err)
})
})
start (): void {
this.events.addEventListener('connection:open', this.maybePruneConnections)
}

stop (): void {
this.events.removeEventListener('connection:open', this.maybePruneConnections)
}

maybePruneConnections (): void {
this._maybePruneConnections()
.catch(err => {
this.log.error('error while pruning connections %e', err)

Check warning on line 57 in packages/libp2p/src/connection-manager/connection-pruner.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/connection-pruner.ts#L57

Added line #L57 was not covered by tests
})
}

/**
* If we have more connections than our maximum, select some excess connections
* to prune based on peer value
*/
async maybePruneConnections (): Promise<void> {
private async _maybePruneConnections (): Promise<void> {
const connections = this.connectionManager.getConnections()
const numConnections = connections.length

Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export class DialQueue {
})
})

if (existingConnection != null) {
if (existingConnection?.status === 'open') {
this.log('already connected to %a', existingConnection.remoteAddr)
options.onProgress?.(new CustomProgressEvent('dial-queue:already-connected'))
return existingConnection
Expand Down
75 changes: 37 additions & 38 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InvalidMultiaddrError, InvalidParametersError, InvalidPeerIdError, NotStartedError, start, stop } from '@libp2p/interface'
import { ConnectionClosedError, InvalidMultiaddrError, InvalidParametersError, InvalidPeerIdError, NotStartedError, start, stop } from '@libp2p/interface'
import { PeerMap } from '@libp2p/peer-collections'
import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { RateLimiter } from '@libp2p/utils/rate-limiter'
Expand Down Expand Up @@ -214,8 +214,6 @@

this.onConnect = this.onConnect.bind(this)
this.onDisconnect = this.onDisconnect.bind(this)
this.events.addEventListener('connection:open', this.onConnect)
this.events.addEventListener('connection:close', this.onDisconnect)

// allow/deny lists
this.allow = (init.allow ?? []).map(ma => multiaddr(ma))
Expand Down Expand Up @@ -268,10 +266,6 @@

readonly [Symbol.toStringTag] = '@libp2p/connection-manager'

isStarted (): boolean {
return this.started
}

/**
* Starts the Connection Manager. If Metrics are not enabled on libp2p
* only event loop and connection limits will be monitored.
Expand All @@ -288,11 +282,7 @@

for (const conns of this.connections.values()) {
for (const conn of conns) {
if (conn.direction === 'inbound') {
metric.inbound++
} else {
metric.outbound++
}
metric[conn.direction]++

Check warning on line 285 in packages/libp2p/src/connection-manager/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/index.ts#L285

Added line #L285 was not covered by tests
}
}

Expand Down Expand Up @@ -356,9 +346,13 @@
}
})

this.events.addEventListener('connection:open', this.onConnect)
this.events.addEventListener('connection:close', this.onDisconnect)

await start(
this.dialQueue,
this.reconnectQueue
this.reconnectQueue,
this.connectionPruner
)

this.started = true
Expand All @@ -369,9 +363,13 @@
* Stops the Connection Manager
*/
async stop (): Promise<void> {
this.events.removeEventListener('connection:open', this.onConnect)
this.events.removeEventListener('connection:close', this.onDisconnect)

await stop(
this.reconnectQueue,
this.dialQueue
this.dialQueue,
this.connectionPruner
)

// Close all connections we're tracking
Expand Down Expand Up @@ -413,17 +411,19 @@
return
}

const peerId = connection.remotePeer
const storedConns = this.connections.get(peerId)
let isNewPeer = false

if (storedConns != null) {
storedConns.push(connection)
} else {
isNewPeer = true
this.connections.set(peerId, [connection])
if (connection.status !== 'open') {
// this can happen when the remote closes the connection immediately after
// opening
return

Check warning on line 417 in packages/libp2p/src/connection-manager/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/index.ts#L415-L417

Added lines #L415 - L417 were not covered by tests
}

const peerId = connection.remotePeer
const isNewPeer = !this.connections.has(peerId)
const storedConns = this.connections.get(peerId) ?? []
storedConns.push(connection)

this.connections.set(peerId, storedConns)

// only need to store RSA public keys, all other types are embedded in the peer id
if (peerId.publicKey != null && peerId.type === 'RSA') {
await this.peerStore.patch(peerId, {
Expand All @@ -441,20 +441,21 @@
*/
onDisconnect (evt: CustomEvent<Connection>): void {
const { detail: connection } = evt
const peerId = connection.remotePeer
const peerConns = this.connections.get(peerId) ?? []

if (!this.started) {
// This can happen when we are in the process of shutting down the node
return
}
// remove closed connection
const filteredPeerConns = peerConns.filter(conn => conn.id !== connection.id)

const peerId = connection.remotePeer
let storedConn = this.connections.get(peerId)
// update peer connections
this.connections.set(peerId, filteredPeerConns)

if (storedConn != null && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id !== connection.id)
this.connections.set(peerId, storedConn)
} else if (storedConn != null) {
if (filteredPeerConns.length === 0) {
// trigger disconnect event if no connections remain
this.log('onDisconnect remove all connections for peer %p', peerId)
this.connections.delete(peerId)

// broadcast disconnect event
this.events.safeDispatchEvent('peer:disconnect', { detail: connection.remotePeer })
}
}
Expand All @@ -478,7 +479,7 @@
}

async openConnection (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: OpenConnectionOptions = {}): Promise<Connection> {
if (!this.isStarted()) {
if (!this.started) {
throw new NotStartedError('Not started')
}

Expand Down Expand Up @@ -508,10 +509,8 @@
priority: options.priority ?? DEFAULT_DIAL_PRIORITY
})

if (connection.remotePeer.equals(this.peerId)) {
const err = new InvalidPeerIdError('Can not dial self')
connection.abort(err)
throw err
if (connection.status !== 'open') {
throw new ConnectionClosedError('Remote closed connection during opening')

Check warning on line 513 in packages/libp2p/src/connection-manager/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/index.ts#L513

Added line #L513 was not covered by tests
}

let peerConnections = this.connections.get(connection.remotePeer)
Expand Down
16 changes: 5 additions & 11 deletions packages/libp2p/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,6 @@
}

try {
this.log.trace('closing all streams')

// close all streams gracefully - this can throw if we're not multiplexed
await Promise.all(
this.streams.map(async s => s.close(options))
)

this.log.trace('closing underlying transport')

// close raw connection
Expand All @@ -184,18 +177,19 @@
}

abort (err: Error): void {
if (this.status === 'closed') {
return
}

Check warning on line 183 in packages/libp2p/src/connection/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection/index.ts#L180-L183

Added lines #L180 - L183 were not covered by tests
this.log.error('aborting connection to %a due to error', this.remoteAddr, err)

this.status = 'closing'
this.streams.forEach(s => { s.abort(err) })

this.log.error('all streams aborted', this.streams.length)

// Abort raw connection
this._abort(err)

this.timeline.close = Date.now()
this.status = 'closed'
this.timeline.close = Date.now()

Check warning on line 192 in packages/libp2p/src/connection/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection/index.ts#L192

Added line #L192 was not covered by tests
}
}

Expand Down
41 changes: 25 additions & 16 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InvalidMultiaddrError, TooManyInboundProtocolStreamsError, TooManyOutboundProtocolStreamsError, LimitedConnectionError, setMaxListeners } from '@libp2p/interface'
import { InvalidMultiaddrError, TooManyInboundProtocolStreamsError, TooManyOutboundProtocolStreamsError, LimitedConnectionError, setMaxListeners, InvalidPeerIdError } from '@libp2p/interface'
import * as mss from '@libp2p/multistream-select'
import { peerIdFromString } from '@libp2p/peer-id'
import { anySignal } from 'any-signal'
Expand Down Expand Up @@ -304,6 +304,14 @@
remotePeer = remotePeerId
}

// this can happen if we dial a multiaddr without a peer id, we only find
// out the identity of the remote after the connection is encrypted
if (remotePeer.equals(this.components.peerId)) {
const err = new InvalidPeerIdError('Can not dial self')
maConn.abort(err)
throw err

Check warning on line 312 in packages/libp2p/src/upgrader.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/upgrader.ts#L310-L312

Added lines #L310 - L312 were not covered by tests
}

upgradedConn = encryptedConn
if (opts?.muxerFactory != null) {
muxerFactory = opts.muxerFactory
Expand All @@ -326,6 +334,8 @@
} catch (err: any) {
maConn.log.error('failed to upgrade inbound connection %s %a - %e', direction === 'inbound' ? 'from' : 'to', maConn.remoteAddr, err)
throw err
} finally {
signal.clear()
}

await this.shouldBlockConnection(direction === 'inbound' ? 'denyInboundUpgradedConnection' : 'denyOutboundUpgradedConnection', remotePeer, maConn)
Expand Down Expand Up @@ -538,22 +548,22 @@
const _timeline = maConn.timeline
maConn.timeline = new Proxy(_timeline, {
set: (...args) => {
if (connection != null && args[1] === 'close' && args[2] != null && _timeline.close == null) {
if (args[1] === 'close' && args[2] != null && _timeline.close == null) {
// Wait for close to finish before notifying of the closure
(async () => {
try {
if (connection.status === 'open') {
await connection.close()
}
} catch (err: any) {
connection.log.error('error closing connection after timeline close', err)
connection.log.error('error closing connection after timeline close %e', err)

Check warning on line 559 in packages/libp2p/src/upgrader.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/upgrader.ts#L559

Added line #L559 was not covered by tests
} finally {
this.events.safeDispatchEvent('connection:close', {
detail: connection
})
}
})().catch(err => {
connection.log.error('error thrown while dispatching connection:close event', err)
connection.log.error('error thrown while dispatching connection:close event %e', err)

Check warning on line 566 in packages/libp2p/src/upgrader.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/upgrader.ts#L566

Added line #L566 was not covered by tests
})
}

Expand All @@ -578,32 +588,31 @@
limits,
logger: this.components.logger,
newStream: newStream ?? errConnectionNotMultiplexed,
getStreams: () => { if (muxer != null) { return muxer.streams } else { return [] } },
getStreams: () => {
return muxer?.streams ?? []
},
close: async (options?: AbortOptions) => {
// Ensure remaining streams are closed gracefully
if (muxer != null) {
connection.log.trace('close muxer')
await muxer.close(options)
}
// ensure remaining streams are closed gracefully
await muxer?.close(options)

connection.log.trace('close maconn')
// close the underlying transport
await maConn.close(options)
connection.log.trace('closed maconn')
},
abort: (err) => {
maConn.abort(err)
// Ensure remaining streams are aborted
if (muxer != null) {
muxer.abort(err)
}

// ensure remaining streams are aborted
muxer?.abort(err)

Check warning on line 605 in packages/libp2p/src/upgrader.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/upgrader.ts#L603-L605

Added lines #L603 - L605 were not covered by tests
}
})

this.events.safeDispatchEvent('connection:open', {
detail: connection
})

// @ts-expect-error nah
connection.__maConnTimeline = _timeline

return connection
}

Expand Down
Loading
Loading