From e2cc0e690467ef9fa9e79fbaae0b3c16cdecbf1b Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 5 Jan 2024 08:27:04 +0100 Subject: [PATCH 1/3] fix: ensure dht query is aborted on early exit If query results are consumed from a `for await..of`-style loop, and that loop is exited from before the results are complete, ensure we abort any running sub-queries. --- packages/kad-dht/package.json | 1 + packages/kad-dht/src/query/manager.ts | 37 +++++++++++------ packages/kad-dht/test/query.spec.ts | 57 ++++++++++++++++++++++----- 3 files changed, 73 insertions(+), 22 deletions(-) diff --git a/packages/kad-dht/package.json b/packages/kad-dht/package.json index 259936719a..11cb4c9a0f 100644 --- a/packages/kad-dht/package.json +++ b/packages/kad-dht/package.json @@ -87,6 +87,7 @@ "private-ip": "^3.0.1", "progress-events": "^1.0.0", "protons-runtime": "^5.0.0", + "race-signal": "^1.0.2", "uint8-varint": "^2.0.0", "uint8arraylist": "^2.4.3", "uint8arrays": "^5.0.0" diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index 3795b54843..2cfc37c140 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -1,7 +1,8 @@ -import { AbortError, TypedEventEmitter, CustomEvent, setMaxListeners } from '@libp2p/interface' +import { TypedEventEmitter, CustomEvent, setMaxListeners } from '@libp2p/interface' import { PeerSet } from '@libp2p/peer-collections' import { anySignal } from 'any-signal' import merge from 'it-merge' +import { raceSignal } from 'race-signal' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { ALPHA, K, DEFAULT_QUERY_TIMEOUT @@ -127,7 +128,16 @@ export class QueryManager implements Startable { } } - const signal = anySignal([this.shutDownController.signal, options.signal]) + // if the user breaks out of a for..await of loop iterating over query + // results we need to cancel any in-flight network requests + const queryEarlyExitController = new AbortController() + setMaxListeners(Infinity, queryEarlyExitController.signal) + + const signal = anySignal([ + this.shutDownController.signal, + queryEarlyExitController.signal, + options.signal + ]) // this signal will get listened to for every invocation of queryFunc // so make sure we don't make a lot of noise in the logs @@ -138,19 +148,13 @@ export class QueryManager implements Startable { // query a subset of peers up to `kBucketSize / 2` in length const startTime = Date.now() const cleanUp = new TypedEventEmitter() + let queryFinished = false try { if (options.isSelfQuery !== true && this.initialQuerySelfHasRun != null) { log('waiting for initial query-self query before continuing') - await Promise.race([ - new Promise((resolve, reject) => { - signal.addEventListener('abort', () => { - reject(new AbortError('Query was aborted before self-query ran')) - }) - }), - this.initialQuerySelfHasRun.promise - ]) + await raceSignal(this.initialQuerySelfHasRun.promise, signal) this.initialQuerySelfHasRun = undefined } @@ -192,12 +196,14 @@ export class QueryManager implements Startable { // Execute the query along each disjoint path and yield their results as they become available for await (const event of merge(...paths)) { - yield event - if (event.name === 'QUERY_ERROR') { - log('error', event.error) + log.error('query error', event.error) } + + yield event } + + queryFinished = true } catch (err: any) { if (!this.running && err.code === 'ERR_QUERY_ABORTED') { // ignore query aborted errors that were thrown during query manager shutdown @@ -205,6 +211,11 @@ export class QueryManager implements Startable { throw err } } finally { + if (!queryFinished) { + log('query exited early') + queryEarlyExitController.abort() + } + signal.clear() this.queries-- diff --git a/packages/kad-dht/test/query.spec.ts b/packages/kad-dht/test/query.spec.ts index a3c1b92027..d942ed4136 100644 --- a/packages/kad-dht/test/query.spec.ts +++ b/packages/kad-dht/test/query.spec.ts @@ -19,7 +19,7 @@ import { QueryManager, type QueryManagerInit } from '../src/query/manager.js' import { convertBuffer } from '../src/utils.js' import { createPeerId, createPeerIds } from './utils/create-peer-id.js' import { sortClosestPeers } from './utils/sort-closest-peers.js' -import type { QueryFunc } from '../src/query/types.js' +import type { QueryContext, QueryFunc } from '../src/query/types.js' import type { RoutingTable } from '../src/routing-table/index.js' import type { PeerId } from '@libp2p/interface' @@ -29,12 +29,9 @@ interface TopologyEntry { value?: Uint8Array closerPeers?: number[] event: QueryEvent + context?: QueryContext } -type Topology = Record +type Topology = Record describe('QueryManager', () => { let ourPeerId: PeerId @@ -55,7 +52,7 @@ describe('QueryManager', () => { } function createTopology (opts: Record): Topology { - const topology: Record = {} + const topology: Topology = {} Object.keys(opts).forEach(key => { const id = parseInt(key) @@ -94,9 +91,12 @@ describe('QueryManager', () => { return topology } - function createQueryFunction (topology: Record): QueryFunc { - const queryFunc: QueryFunc = async function * ({ peer }) { + function createQueryFunction (topology: Topology): QueryFunc { + const queryFunc: QueryFunc = async function * (context) { + const { peer } = context + const res = topology[peer.toString()] + res.context = context if (res.delay != null) { await delay(res.delay) @@ -870,4 +870,43 @@ describe('QueryManager', () => { await manager.stop() }) + + it('should abort the query if we break out of the loop early', async () => { + const manager = new QueryManager({ + peerId: ourPeerId, + logger: defaultLogger() + }, { + ...defaultInit(), + disjointPaths: 2 + }) + await manager.start() + + // 1 -> 0 [pathComplete] + // 4 -> 3 [delay] -> 2 [pathComplete] + const topology = createTopology({ + // quick value path + 0: { value: uint8ArrayFromString('true') }, + 1: { closerPeers: [0] }, + // slow value path + 2: { value: uint8ArrayFromString('true') }, + 3: { delay: 100, closerPeers: [2] }, + 4: { closerPeers: [3] } + }) + + routingTable.closestPeers.returns([peers[1], peers[4]]) + + for await (const event of manager.run(key, createQueryFunction(topology))) { + if (event.name === 'VALUE') { + expect(event.from.toString()).to.equal(peers[0].toString()) + + // break out of loop early + break + } + } + + // should have aborted query on slow path + expect(topology[peers[3].toString()]).to.have.nested.property('context.signal.aborted', true) + + await manager.stop() + }) }) From fdedc4eac1bccc392cfa11694203913f5e4111a4 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 5 Jan 2024 08:58:57 +0100 Subject: [PATCH 2/3] chore: do not finish query section immediately --- packages/kad-dht/test/query.spec.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/kad-dht/test/query.spec.ts b/packages/kad-dht/test/query.spec.ts index d942ed4136..bc89748ccf 100644 --- a/packages/kad-dht/test/query.spec.ts +++ b/packages/kad-dht/test/query.spec.ts @@ -93,6 +93,8 @@ describe('QueryManager', () => { function createQueryFunction (topology: Topology): QueryFunc { const queryFunc: QueryFunc = async function * (context) { + context.signal.throwIfAborted() + const { peer } = context const res = topology[peer.toString()] @@ -885,11 +887,11 @@ describe('QueryManager', () => { // 4 -> 3 [delay] -> 2 [pathComplete] const topology = createTopology({ // quick value path - 0: { value: uint8ArrayFromString('true') }, + 0: { delay: 10, value: uint8ArrayFromString('true') }, 1: { closerPeers: [0] }, // slow value path 2: { value: uint8ArrayFromString('true') }, - 3: { delay: 100, closerPeers: [2] }, + 3: { delay: 1000, closerPeers: [2] }, 4: { closerPeers: [3] } }) @@ -907,6 +909,9 @@ describe('QueryManager', () => { // should have aborted query on slow path expect(topology[peers[3].toString()]).to.have.nested.property('context.signal.aborted', true) + // should not have visited the next peer on the slow path + expect(topology[peers[4].toString()]).to.not.have.property('context', true) + await manager.stop() }) }) From 51fe960f92cfa125c2502697a34bc5334bac2814 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 5 Jan 2024 19:15:07 +0100 Subject: [PATCH 3/3] chore: remove abort --- packages/kad-dht/test/query.spec.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/kad-dht/test/query.spec.ts b/packages/kad-dht/test/query.spec.ts index bc89748ccf..5766234970 100644 --- a/packages/kad-dht/test/query.spec.ts +++ b/packages/kad-dht/test/query.spec.ts @@ -93,8 +93,6 @@ describe('QueryManager', () => { function createQueryFunction (topology: Topology): QueryFunc { const queryFunc: QueryFunc = async function * (context) { - context.signal.throwIfAborted() - const { peer } = context const res = topology[peer.toString()]