Skip to content

Commit

Permalink
feat: dht client (#3947)
Browse files Browse the repository at this point in the history
* Enables `libp2p-kad-dht` in client mode
* Updates types with new DHT events

BREAKING CHANGE: The DHT API has been refactored to return async iterators of query events
  • Loading branch information
achingbrain authored Dec 3, 2021
1 parent be351fd commit 37adc28
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 69 deletions.
17 changes: 4 additions & 13 deletions src/dht/find-peer.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Multiaddr } from 'multiaddr'
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { FinalPeer } from './response-types.js'
import { mapEvent } from './map-event.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -12,7 +11,7 @@ export const createFindPeer = configure(api => {
/**
* @type {DHTAPI["findPeer"]}
*/
async function findPeer (peerId, options = {}) {
async function * findPeer (peerId, options = {}) {
const res = await api.post('dht/findpeer', {
signal: options.signal,
searchParams: toUrlSearchParams({
Expand All @@ -22,17 +21,9 @@ export const createFindPeer = configure(api => {
headers: options.headers
})

for await (const data of res.ndjson()) {
if (data.Type === FinalPeer && data.Responses) {
const { ID, Addrs } = data.Responses[0]
return {
id: ID,
addrs: (Addrs || []).map((/** @type {string} **/ a) => new Multiaddr(a))
}
}
for await (const event of res.ndjson()) {
yield mapEvent(event)
}

throw new Error('not found')
}

return findPeer
Expand Down
14 changes: 3 additions & 11 deletions src/dht/find-provs.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Multiaddr } from 'multiaddr'
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { Provider } from './response-types.js'
import { mapEvent } from './map-event.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -22,15 +21,8 @@ export const createFindProvs = configure(api => {
headers: options.headers
})

for await (const message of res.ndjson()) {
if (message.Type === Provider && message.Responses) {
for (const { ID, Addrs } of message.Responses) {
yield {
id: ID,
addrs: (Addrs || []).map((/** @type {string} **/ a) => new Multiaddr(a))
}
}
}
for await (const event of res.ndjson()) {
yield mapEvent(event)
}
}

Expand Down
16 changes: 6 additions & 10 deletions src/dht/get.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { Value } from './response-types.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { mapEvent } from './map-event.js'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'

/**
Expand All @@ -13,23 +12,20 @@ export const createGet = configure(api => {
/**
* @type {DHTAPI["get"]}
*/
async function get (key, options = {}) {
async function * get (key, options = {}) {
const res = await api.post('dht/get', {
signal: options.signal,
searchParams: toUrlSearchParams({
arg: key instanceof Uint8Array ? uint8ArrayToString(key) : key,
// arg: base36.encode(key),
arg: key instanceof Uint8Array ? uint8ArrayToString(key) : key.toString(),
...options
}),
headers: options.headers
})

for await (const message of res.ndjson()) {
if (message.Type === Value) {
return uint8ArrayFromString(message.Extra, 'base64pad')
}
for await (const event of res.ndjson()) {
yield mapEvent(event)
}

throw new Error('not found')
}

return get
Expand Down
119 changes: 119 additions & 0 deletions src/dht/map-event.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import {
SendingQuery,
PeerResponse,
FinalPeer,
QueryError,
Provider,
Value,
AddingPeer,
DialingPeer
} from './response-types.js'
import { Multiaddr } from 'multiaddr'

/**
* @param {{Type: number, ID: string, Extra: string, Responses: {ID: string, Addrs: string[]}[]}} event
* @returns {import('ipfs-core-types/src/dht').QueryEvent}
*/
export const mapEvent = (event) => {
// console.info(JSON.stringify(event, null, 2))

if (event.Type === SendingQuery) {
return {
to: event.ID,
name: 'SENDING_QUERY',
type: event.Type
}
}

if (event.Type === PeerResponse) {
return {
from: event.ID,
name: 'PEER_RESPONSE',
type: event.Type,
// TODO: how to infer this from the go-ipfs response
messageType: 0,
// TODO: how to infer this from the go-ipfs response
messageName: 'PUT_VALUE',
closer: (event.Responses || []).map(({ ID, Addrs }) => ({ id: ID, multiaddrs: Addrs.map(addr => new Multiaddr(addr)) })),
providers: (event.Responses || []).map(({ ID, Addrs }) => ({ id: ID, multiaddrs: Addrs.map(addr => new Multiaddr(addr)) }))
// TODO: how to infer this from the go-ipfs response
// record: ???
}
}

if (event.Type === FinalPeer) {
// dht.query ends with a FinalPeer event with no Responses
let peer = {
id: event.ID,
/** @type {Multiaddr[]} */
multiaddrs: []
}

if (event.Responses && event.Responses.length) {
// dht.findPeer has the result in the Responses field
peer = {
id: event.Responses[0].ID,
multiaddrs: event.Responses[0].Addrs.map(addr => new Multiaddr(addr))
}
}

return {
from: event.ID,
name: 'FINAL_PEER',
type: event.Type,
peer
}
}

if (event.Type === QueryError) {
return {
from: event.ID,
name: 'QUERY_ERROR',
type: event.Type,
error: new Error(event.Extra)
}
}

if (event.Type === Provider) {
return {
from: event.ID,
name: 'PROVIDER',
type: event.Type,
providers: event.Responses.map(({ ID, Addrs }) => ({ id: ID, multiaddrs: Addrs.map(addr => new Multiaddr(addr)) }))
}
}

if (event.Type === Value) {
return {
from: event.ID,
name: 'VALUE',
type: event.Type,
value: uint8ArrayFromString(event.Extra, 'base64pad')
}
}

if (event.Type === AddingPeer) {
const peers = event.Responses.map(({ ID }) => ID)

if (!peers.length) {
throw new Error('No peer found')
}

return {
name: 'ADDING_PEER',
type: event.Type,
peer: peers[0]
}
}

if (event.Type === DialingPeer) {
return {
name: 'DIALING_PEER',
type: event.Type,
peer: event.ID
}
}

throw new Error('Unknown DHT event type')
}
16 changes: 3 additions & 13 deletions src/dht/provide.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Multiaddr } from 'multiaddr'
import { objectToCamel } from '../lib/object-to-camel.js'
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { mapEvent } from './map-event.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -26,17 +25,8 @@ export const createProvide = configure(api => {
headers: options.headers
})

for await (let message of res.ndjson()) {
message = objectToCamel(message)
if (message.responses) {
message.responses = message.responses.map((/** @type {{ ID: string, Addrs: string[] }} */ { ID, Addrs }) => ({
id: ID,
addrs: (Addrs || []).map((/** @type {string} **/ a) => new Multiaddr(a))
}))
} else {
message.responses = []
}
yield message
for await (const event of res.ndjson()) {
yield mapEvent(event)
}
}

Expand Down
16 changes: 4 additions & 12 deletions src/dht/put.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { Multiaddr } from 'multiaddr'
import { objectToCamel } from '../lib/object-to-camel.js'
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { multipartRequest } from 'ipfs-core-utils/multipart-request'
import { abortSignal } from '../lib/abort-signal.js'
import { AbortController } from 'native-abort-controller'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { mapEvent } from './map-event.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -24,23 +23,16 @@ export const createPut = configure(api => {
const res = await api.post('dht/put', {
signal,
searchParams: toUrlSearchParams({
arg: uint8ArrayToString(key),
arg: key instanceof Uint8Array ? uint8ArrayToString(key) : key.toString(),
...options
}),
...(
await multipartRequest([value], controller, options.headers)
)
})

for await (let message of res.ndjson()) {
message = objectToCamel(message)
if (message.responses) {
message.responses = message.responses.map((/** @type {{ ID: string, Addrs: string[] }} */ { ID, Addrs }) => ({
id: ID,
addrs: (Addrs || []).map(a => new Multiaddr(a))
}))
}
yield message
for await (const event of res.ndjson()) {
yield mapEvent(event)
}
}

Expand Down
12 changes: 3 additions & 9 deletions src/dht/query.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Multiaddr } from 'multiaddr'
import { objectToCamel } from '../lib/object-to-camel.js'
import { configure } from '../lib/configure.js'
import { toUrlSearchParams } from '../lib/to-url-search-params.js'
import { mapEvent } from './map-event.js'

/**
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
Expand All @@ -22,13 +21,8 @@ export const createQuery = configure(api => {
headers: options.headers
})

for await (let message of res.ndjson()) {
message = objectToCamel(message)
message.responses = (message.responses || []).map((/** @type {{ ID: string, Addrs: string[] }} */ { ID, Addrs }) => ({
id: ID,
addrs: (Addrs || []).map((/** @type {string} **/ a) => new Multiaddr(a))
}))
yield message
for await (const event of res.ndjson()) {
yield mapEvent(event)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/dht/response-types.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

// Response types are defined here =
// https =//github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L15-L24
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L15-L24
export const SendingQuery = 0
export const PeerResponse = 1
export const FinalPeer = 2
Expand Down

0 comments on commit 37adc28

Please sign in to comment.