Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: only choose query peers after initial self-query has run (#471)
Browse files Browse the repository at this point in the history
Previously we chose peers before the self-query ran which meant there
was a reasonable chance we chose 0 peers.

Instead, wait for the query to run, then choose peers to query.
  • Loading branch information
achingbrain committed May 5, 2023
1 parent e9efb7f commit 4d05154
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 79 deletions.
15 changes: 3 additions & 12 deletions src/content-fetching/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ import {
valueEvent,
queryErrorEvent
} from '../query/events.js'
import { createPutRecord, convertBuffer, bufferToRecordKey } from '../utils.js'
import { createPutRecord, bufferToRecordKey } from '../utils.js'
import type { KadDHTComponents, Validators, Selectors, ValueEvent, QueryOptions, QueryEvent } from '../index.js'
import type { Network } from '../network.js'
import type { PeerRouting } from '../peer-routing/index.js'
import type { QueryManager } from '../query/manager.js'
import type { QueryFunc } from '../query/types.js'
import type { RoutingTable } from '../routing-table/index.js'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Logger } from '@libp2p/logger'

Expand All @@ -30,7 +29,6 @@ export interface ContentFetchingInit {
selectors: Selectors
peerRouting: PeerRouting
queryManager: QueryManager
routingTable: RoutingTable
network: Network
lan: boolean
}
Expand All @@ -42,19 +40,17 @@ export class ContentFetching {
private readonly selectors: Selectors
private readonly peerRouting: PeerRouting
private readonly queryManager: QueryManager
private readonly routingTable: RoutingTable
private readonly network: Network

constructor (components: KadDHTComponents, init: ContentFetchingInit) {
const { validators, selectors, peerRouting, queryManager, routingTable, network, lan } = init
const { validators, selectors, peerRouting, queryManager, network, lan } = init

this.components = components
this.log = logger(`libp2p:kad-dht:${lan ? 'lan' : 'wan'}:content-fetching`)
this.validators = validators
this.selectors = selectors
this.peerRouting = peerRouting
this.queryManager = queryManager
this.routingTable = routingTable
this.network = network
}

Expand Down Expand Up @@ -249,11 +245,6 @@ export class ContentFetching {
this.log('error getting local value for %b', key, err)
}

const id = await convertBuffer(key)
const rtp = this.routingTable.closestPeers(id)

this.log('found %d peers in routing table', rtp.length)

const self = this // eslint-disable-line @typescript-eslint/no-this-alias

const getValueQuery: QueryFunc = async function * ({ peer, signal }) {
Expand All @@ -267,6 +258,6 @@ export class ContentFetching {
}

// we have peers, lets send the actual query to them
yield * this.queryManager.run(key, rtp, getValueQuery, options)
yield * this.queryManager.run(key, getValueQuery, options)
}
}
4 changes: 1 addition & 3 deletions src/content-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
peerResponseEvent,
providerEvent
} from '../query/events.js'
import { convertBuffer } from '../utils.js'
import type { KadDHTComponents, PeerResponseEvent, ProviderEvent, QueryEvent, QueryOptions } from '../index.js'
import type { Network } from '../network.js'
import type { PeerRouting } from '../peer-routing/index.js'
Expand Down Expand Up @@ -126,7 +125,6 @@ export class ContentRouting {
async * findProviders (key: CID, options: QueryOptions): AsyncGenerator<PeerResponseEvent | ProviderEvent | QueryEvent> {
const toFind = this.routingTable.kBucketSize
const target = key.multihash.bytes
const id = await convertBuffer(target)
const self = this // eslint-disable-line @typescript-eslint/no-this-alias

this.log('findProviders %c', key)
Expand Down Expand Up @@ -175,7 +173,7 @@ export class ContentRouting {

const providers = new Set(provs.map(p => p.toString()))

for await (const event of this.queryManager.run(target, this.routingTable.closestPeers(id), findProvidersQuery, options)) {
for await (const event of this.queryManager.run(target, findProvidersQuery, options)) {
yield event

if (event.name === 'PEER_RESPONSE') {
Expand Down
4 changes: 2 additions & 2 deletions src/kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ export class DefaultKadDHT extends EventEmitter<PeerDiscoveryEvents> implements
// Number of disjoint query paths to use - This is set to `kBucketSize/2` per the S/Kademlia paper
disjointPaths: Math.ceil(this.kBucketSize / 2),
lan,
initialQuerySelfHasRun
initialQuerySelfHasRun,
routingTable: this.routingTable
})

// DHT components
Expand All @@ -147,7 +148,6 @@ export class DefaultKadDHT extends EventEmitter<PeerDiscoveryEvents> implements
selectors: this.selectors,
peerRouting: this.peerRouting,
queryManager: this.queryManager,
routingTable: this.routingTable,
network: this.network,
lan: this.lan
})
Expand Down
32 changes: 2 additions & 30 deletions src/peer-routing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,34 +148,6 @@ export class PeerRouting {
return
}

const key = await utils.convertPeerId(id)
const peers = this.routingTable.closestPeers(key)

// sanity check
const match = peers.find((p) => p.equals(id))

if (match != null) {
try {
const peer = await this.components.peerStore.get(id)

this.log('found in peerStore')
yield finalPeerEvent({
from: this.components.peerId,
peer: {
id: peer.id,
multiaddrs: peer.addresses.map((address) => address.multiaddr),
protocols: []
}
})

return
} catch (err: any) {
if (err.code !== 'ERR_NOT_FOUND') {
throw err
}
}
}

const self = this // eslint-disable-line @typescript-eslint/no-this-alias

const findPeerQuery: QueryFunc = async function * ({ peer, signal }) {
Expand All @@ -197,7 +169,7 @@ export class PeerRouting {

let foundPeer = false

for await (const event of this.queryManager.run(id.toBytes(), peers, findPeerQuery, options)) {
for await (const event of this.queryManager.run(id.toBytes(), findPeerQuery, options)) {
if (event.name === 'FINAL_PEER') {
foundPeer = true
}
Expand Down Expand Up @@ -230,7 +202,7 @@ export class PeerRouting {
yield * self.network.sendRequest(peer, request, { signal })
}

for await (const event of this.queryManager.run(key, tablePeers, getCloserPeersQuery, options)) {
for await (const event of this.queryManager.run(key, getCloserPeersQuery, options)) {
yield event

if (event.name === 'PEER_RESPONSE') {
Expand Down
12 changes: 10 additions & 2 deletions src/query/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import {
ALPHA, K, DEFAULT_QUERY_TIMEOUT
} from '../constants.js'
import { convertBuffer } from '../utils.js'
import { queryPath } from './query-path.js'
import type { QueryFunc } from './types.js'
import type { QueryEvent } from '../index.js'
import type { RoutingTable } from '../routing-table/index.js'
import type { Metric, Metrics } from '@libp2p/interface-metrics'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { AbortOptions } from '@libp2p/interfaces'
Expand All @@ -27,6 +29,7 @@ export interface QueryManagerInit {
disjointPaths?: number
alpha?: number
initialQuerySelfHasRun: DeferredPromise<void>
routingTable: RoutingTable
}

export interface QueryManagerComponents {
Expand Down Expand Up @@ -55,6 +58,7 @@ export class QueryManager implements Startable {
queryTime: Metric
}

private readonly routingTable: RoutingTable
private initialQuerySelfHasRun?: DeferredPromise<void>

constructor (components: QueryManagerComponents, init: QueryManagerInit) {
Expand All @@ -67,6 +71,7 @@ export class QueryManager implements Startable {
this.lan = lan
this.queries = 0
this.initialQuerySelfHasRun = init.initialQuerySelfHasRun
this.routingTable = init.routingTable

// allow us to stop queries on shut down
this.shutDownController = new AbortController()
Expand Down Expand Up @@ -105,7 +110,7 @@ export class QueryManager implements Startable {
this.shutDownController.abort()
}

async * run (key: Uint8Array, peers: PeerId[], queryFunc: QueryFunc, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
async * run (key: Uint8Array, queryFunc: QueryFunc, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
if (!this.running) {
throw new Error('QueryManager not started')
}
Expand Down Expand Up @@ -138,7 +143,6 @@ export class QueryManager implements Startable {
const log = logger(`libp2p:kad-dht:${this.lan ? 'lan' : 'wan'}:query:` + uint8ArrayToString(key, 'base58btc'))

// query a subset of peers up to `kBucketSize / 2` in length
const peersToQuery = peers.slice(0, Math.min(this.disjointPaths, peers.length))
const startTime = Date.now()
const cleanUp = new EventEmitter<CleanUpEvents>()

Expand All @@ -162,6 +166,10 @@ export class QueryManager implements Startable {
this.queries++
this.metrics?.runningQueries.update(this.queries)

const id = await convertBuffer(key)
const peers = this.routingTable.closestPeers(id)
const peersToQuery = peers.slice(0, Math.min(this.disjointPaths, peers.length))

if (peers.length === 0) {
log.error('Running query with no peers')
return
Expand Down
Loading

0 comments on commit 4d05154

Please sign in to comment.