Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure dht query is aborted on early exit #2341

Merged
merged 3 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/kad-dht/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
"private-ip": "^3.0.1",
"progress-events": "^1.0.0",
"protons-runtime": "^5.0.0",
"race-signal": "^1.0.2",
"uint8-varint": "^2.0.0",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^5.0.0"
Expand Down
37 changes: 24 additions & 13 deletions packages/kad-dht/src/query/manager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { AbortError, TypedEventEmitter, CustomEvent, setMaxListeners } from '@libp2p/interface'
import { TypedEventEmitter, CustomEvent, setMaxListeners } from '@libp2p/interface'
import { PeerSet } from '@libp2p/peer-collections'
import { anySignal } from 'any-signal'
import merge from 'it-merge'
import { raceSignal } from 'race-signal'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import {
ALPHA, K, DEFAULT_QUERY_TIMEOUT
Expand Down Expand Up @@ -127,7 +128,16 @@ export class QueryManager implements Startable {
}
}

const signal = anySignal([this.shutDownController.signal, options.signal])
// if the user breaks out of a for..await of loop iterating over query
// results we need to cancel any in-flight network requests
const queryEarlyExitController = new AbortController()
setMaxListeners(Infinity, queryEarlyExitController.signal)

const signal = anySignal([
this.shutDownController.signal,
queryEarlyExitController.signal,
options.signal
])

// this signal will get listened to for every invocation of queryFunc
// so make sure we don't make a lot of noise in the logs
Expand All @@ -138,19 +148,13 @@ export class QueryManager implements Startable {
// query a subset of peers up to `kBucketSize / 2` in length
const startTime = Date.now()
const cleanUp = new TypedEventEmitter<CleanUpEvents>()
let queryFinished = false

try {
if (options.isSelfQuery !== true && this.initialQuerySelfHasRun != null) {
log('waiting for initial query-self query before continuing')

await Promise.race([
new Promise((resolve, reject) => {
signal.addEventListener('abort', () => {
reject(new AbortError('Query was aborted before self-query ran'))
})
}),
this.initialQuerySelfHasRun.promise
])
await raceSignal(this.initialQuerySelfHasRun.promise, signal)

this.initialQuerySelfHasRun = undefined
}
Expand Down Expand Up @@ -192,19 +196,26 @@ export class QueryManager implements Startable {

// Execute the query along each disjoint path and yield their results as they become available
for await (const event of merge(...paths)) {
yield event

if (event.name === 'QUERY_ERROR') {
log('error', event.error)
log.error('query error', event.error)
}

yield event
}

queryFinished = true
} catch (err: any) {
if (!this.running && err.code === 'ERR_QUERY_ABORTED') {
// ignore query aborted errors that were thrown during query manager shutdown
} else {
throw err
}
} finally {
if (!queryFinished) {
log('query exited early')
queryEarlyExitController.abort()
}

signal.clear()

this.queries--
Expand Down
60 changes: 51 additions & 9 deletions packages/kad-dht/test/query.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { QueryManager, type QueryManagerInit } from '../src/query/manager.js'
import { convertBuffer } from '../src/utils.js'
import { createPeerId, createPeerIds } from './utils/create-peer-id.js'
import { sortClosestPeers } from './utils/sort-closest-peers.js'
import type { QueryFunc } from '../src/query/types.js'
import type { QueryContext, QueryFunc } from '../src/query/types.js'
import type { RoutingTable } from '../src/routing-table/index.js'
import type { PeerId } from '@libp2p/interface'

Expand All @@ -29,12 +29,9 @@ interface TopologyEntry {
value?: Uint8Array
closerPeers?: number[]
event: QueryEvent
context?: QueryContext
}
type Topology = Record<string, {
delay?: number | undefined
error?: Error | undefined
event: QueryEvent
}>
type Topology = Record<string, TopologyEntry>

describe('QueryManager', () => {
let ourPeerId: PeerId
Expand All @@ -55,7 +52,7 @@ describe('QueryManager', () => {
}

function createTopology (opts: Record<number, { delay?: number, error?: Error, value?: Uint8Array, closerPeers?: number[] }>): Topology {
const topology: Record<string, { delay?: number, error?: Error, event: QueryEvent }> = {}
const topology: Topology = {}

Object.keys(opts).forEach(key => {
const id = parseInt(key)
Expand Down Expand Up @@ -94,9 +91,12 @@ describe('QueryManager', () => {
return topology
}

function createQueryFunction (topology: Record<string, { delay?: number, event: QueryEvent }>): QueryFunc {
const queryFunc: QueryFunc = async function * ({ peer }) {
function createQueryFunction (topology: Topology): QueryFunc {
const queryFunc: QueryFunc = async function * (context) {
const { peer } = context

const res = topology[peer.toString()]
res.context = context

if (res.delay != null) {
await delay(res.delay)
Expand Down Expand Up @@ -870,4 +870,46 @@ describe('QueryManager', () => {

await manager.stop()
})

it('should abort the query if we break out of the loop early', async () => {
const manager = new QueryManager({
peerId: ourPeerId,
logger: defaultLogger()
}, {
...defaultInit(),
disjointPaths: 2
})
await manager.start()

// 1 -> 0 [pathComplete]
// 4 -> 3 [delay] -> 2 [pathComplete]
const topology = createTopology({
// quick value path
0: { delay: 10, value: uint8ArrayFromString('true') },
1: { closerPeers: [0] },
// slow value path
2: { value: uint8ArrayFromString('true') },
3: { delay: 1000, closerPeers: [2] },
4: { closerPeers: [3] }
})

routingTable.closestPeers.returns([peers[1], peers[4]])

for await (const event of manager.run(key, createQueryFunction(topology))) {
if (event.name === 'VALUE') {
expect(event.from.toString()).to.equal(peers[0].toString())

// break out of loop early
break
}
}

// should have aborted query on slow path
expect(topology[peers[3].toString()]).to.have.nested.property('context.signal.aborted', true)

// should not have visited the next peer on the slow path
expect(topology[peers[4].toString()]).to.not.have.property('context', true)

await manager.stop()
})
})
Loading