Skip to content

Commit

Permalink
refactor: client with new routing v1
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Oct 13, 2023
1 parent 7dac713 commit 0a08dc4
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 36 deletions.
2 changes: 1 addition & 1 deletion packages/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
[![codecov](https://img.shields.io/codecov/c/github/ipfs/helia-routing-v1-http-api.svg?style=flat-square)](https://codecov.io/gh/ipfs/helia-routing-v1-http-api)
[![CI](https://img.shields.io/github/actions/workflow/status/ipfs/helia-routing-v1-http-api/js-test-and-release.yml?branch=main\&style=flat-square)](https://github.com/ipfs/helia-routing-v1-http-api/actions/workflows/js-test-and-release.yml?query=branch%3Amain)

> A Routing V1 HTTP API client
> A [Delegated Routing V1](https://specs.ipfs.tech/routing/http-routing-v1/) HTTP API client
## Table of contents <!-- omit in toc -->

Expand Down
2 changes: 2 additions & 0 deletions packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@
"@multiformats/multiaddr": "^12.1.3",
"any-signal": "^4.1.1",
"browser-readablestream-to-it": "^2.0.3",
"ipns": "^7.0.1",
"it-all": "^3.0.2",
"it-to-buffer": "^4.0.3",
"iterable-ndjson": "^1.1.0",
"multiformats": "^12.1.1",
"p-defer": "^4.0.0",
Expand Down
181 changes: 153 additions & 28 deletions packages/client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,19 @@ import { peerIdFromString } from '@libp2p/peer-id'
import { multiaddr } from '@multiformats/multiaddr'
import { anySignal } from 'any-signal'
import toIt from 'browser-readablestream-to-it'
import { unmarshal, type IPNSRecord, marshal } from 'ipns'
import toBuffer from 'it-to-buffer'
// @ts-expect-error no types
import ndjson from 'iterable-ndjson'
import defer from 'p-defer'
import PQueue from 'p-queue'
import type { RoutingV1HttpApiClient, RoutingV1HttpApiClientInit } from './index.js'
import type { RoutingV1HttpApiClient, RoutingV1HttpApiClientInit, Record } from './index.js'
import type { AbortOptions } from '@libp2p/interface'
import type { PeerInfo } from '@libp2p/interface/peer-info'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { CID } from 'multiformats'

const log = logger('routing-v1-http-api-client')

interface RoutingV1HttpApiGetProvidersResponse {
Protocol: string
Schema: string
ID: string
Addrs: Multiaddr[]
}

const defaultValues = {
concurrentRequests: 4,
timeout: 30e3
Expand Down Expand Up @@ -62,7 +56,7 @@ export class DefaultRoutingV1HttpApiClient implements RoutingV1HttpApiClient {
this.started = false
}

async * getProviders (cid: CID, options: AbortOptions | undefined = {}): AsyncGenerator<PeerInfo, any, unknown> {
async * getProviders (cid: CID, options: AbortOptions | undefined = {}): AsyncGenerator<Record, any, unknown> {
log('findProviders starts: %c', cid)

const signal = anySignal([this.shutDownController.signal, options.signal, AbortSignal.timeout(this.timeout)])
Expand All @@ -77,7 +71,7 @@ export class DefaultRoutingV1HttpApiClient implements RoutingV1HttpApiClient {
try {
await onStart.promise

// https://github.com/ipfs/specs/blob/main/routing/ROUTING_V1_HTTP.md#api
// https://specs.ipfs.tech/routing/http-routing-v1/
const resource = `${this.clientUrl}routing/v1/providers/${cid.toString()}`
const getOptions = { headers: { Accept: 'application/x-ndjson' }, signal }
const a = await fetch(resource, getOptions)
Expand All @@ -86,12 +80,23 @@ export class DefaultRoutingV1HttpApiClient implements RoutingV1HttpApiClient {
throw new CodeError('Routing response had no body', 'ERR_BAD_RESPONSE')
}

for await (const event of ndjson(toIt(a.body))) {
if (event.Protocol !== 'transport-bitswap') {
continue
}
const contentType = a.headers.get('Content-Type')
if (contentType === 'application/json') {
const body = await a.json()

yield this.#mapProvider(event)
for (const provider of body.Providers) {
const record = this.#handleRecord(provider)
if (record !== null) {
yield record
}
}
} else {
for await (const provider of ndjson(toIt(a.body))) {
const record = this.#handleRecord(provider)
if (record !== null) {
yield record
}
}
}
} catch (err) {
log.error('findProviders errored:', err)
Expand All @@ -102,21 +107,141 @@ export class DefaultRoutingV1HttpApiClient implements RoutingV1HttpApiClient {
}
}

#mapProvider (event: RoutingV1HttpApiGetProvidersResponse): PeerInfo {
const peer = peerIdFromString(event.ID)
const ma: Multiaddr[] = []
async * getPeers (pid: PeerId, options: AbortOptions | undefined = {}): AsyncGenerator<Record, any, unknown> {
log('findPeers starts: %c', pid)

const signal = anySignal([this.shutDownController.signal, options.signal, AbortSignal.timeout(this.timeout)])
const onStart = defer()
const onFinish = defer()

void this.httpQueue.add(async () => {
onStart.resolve()
return onFinish.promise
})

try {
await onStart.promise

// https://specs.ipfs.tech/routing/http-routing-v1/
const resource = `${this.clientUrl}routing/v1/peers/${pid.toCID().toString()}`
const getOptions = { headers: { Accept: 'application/x-ndjson' }, signal }
const a = await fetch(resource, getOptions)

if (a.body == null) {
throw new CodeError('Routing response had no body', 'ERR_BAD_RESPONSE')
}

const contentType = a.headers.get('Content-Type')
if (contentType === 'application/json') {
const body = await a.json()

for (const strAddr of event.Addrs) {
const addr = multiaddr(strAddr)
ma.push(addr)
for (const peer of body.Peers) {
const record = this.#handleRecord(peer)
if (record !== null) {
yield record
}
}
} else {
for await (const peer of ndjson(toIt(a.body))) {
const record = this.#handleRecord(peer)
if (record !== null) {
yield record
}
}
}
} catch (err) {
log.error('findPeers errored:', err)
} finally {
signal.clear()
onFinish.resolve()
log('findPeers finished: %c', pid)
}
}

async getIPNS (pid: PeerId, options: AbortOptions | undefined = {}): Promise<IPNSRecord> {
log('getIPNS starts: %c', pid)

const signal = anySignal([this.shutDownController.signal, options.signal, AbortSignal.timeout(this.timeout)])
const onStart = defer()
const onFinish = defer()

const pi = {
id: peer,
multiaddrs: ma,
protocols: []
void this.httpQueue.add(async () => {
onStart.resolve()
return onFinish.promise
})

try {
await onStart.promise

// https://specs.ipfs.tech/routing/http-routing-v1/
const resource = `${this.clientUrl}routing/v1/ipns/${pid.toCID().toString()}`
const getOptions = { headers: { Accept: 'application/vnd.ipfs.ipns-record' }, signal }
const a = await fetch(resource, getOptions)

if (a.body == null) {
throw new CodeError('GET ipns response had no body', 'ERR_BAD_RESPONSE')
}

const body = await toBuffer(toIt(a.body))
return unmarshal(body)
} finally {
signal.clear()
onFinish.resolve()
log('getIPNS finished: %c', pid)
}
}

async putIPNS (pid: PeerId, record: IPNSRecord, options: AbortOptions | undefined = {}): Promise<void> {
log('getIPNS starts: %c', pid)

const signal = anySignal([this.shutDownController.signal, options.signal, AbortSignal.timeout(this.timeout)])
const onStart = defer()
const onFinish = defer()

void this.httpQueue.add(async () => {
onStart.resolve()
return onFinish.promise
})

try {
await onStart.promise

const body = marshal(record)

// https://specs.ipfs.tech/routing/http-routing-v1/
const resource = `${this.clientUrl}routing/v1/ipns/${pid.toCID().toString()}`
const getOptions = { method: 'PUT', headers: { 'Content-Type': 'application/vnd.ipfs.ipns-record' }, body, signal }
const res = await fetch(resource, getOptions)
if (res.status !== 200) {
throw new CodeError('PUT ipns response had status other than 200', 'ERR_BAD_RESPONSE')
}
} finally {
signal.clear()
onFinish.resolve()
log('getIPNS finished: %c', pid)
}
}

#handleRecord (record: any): Record | null {
if (record.Schema === 'peer') {
// Peer schema can have additional, user-defined, fields.
record.ID = peerIdFromString(record.ID)
record.Addrs = record.Addrs.map(multiaddr)
return record
} else if (record.Schema === 'bitswap') {
// Bitswap schema cannot have additional fields.
return {
Schema: record.Schema,
Protocol: record.Protocol,
ID: peerIdFromString(record.ID),
Addrs: record.Addrs.map(multiaddr)
}
} else if (record.Schema !== '') {
// TODO: in Go, we send unknown schemas as an UnknownRecord. I feel like
// doing this here will make it harder. Is there a way in TypeScript
// to do something like if schema === 'bitswap' then it is a BitswapRecord?
}

return pi
return null
}
}
38 changes: 36 additions & 2 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,28 @@

import { DefaultRoutingV1HttpApiClient } from './client.js'
import type { AbortOptions } from '@libp2p/interface'
import type { PeerInfo } from '@libp2p/interface/peer-info'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { IPNSRecord } from 'ipns'
import type { CID } from 'multiformats/cid'

export interface PeerRecord {
Schema: string
ID: PeerId
Addrs: Multiaddr[]
Protocols: string[]
}

// Deprecated: please use PeerRecord instead.
export interface BitswapRecord {
Schema: string
Protocol: string
ID: PeerId
Addrs: Multiaddr[]
}

export type Record = PeerRecord | BitswapRecord

export interface RoutingV1HttpApiClientInit {
/**
* A concurrency limit to avoid request flood in web browser (default: 4)
Expand All @@ -41,7 +60,22 @@ export interface RoutingV1HttpApiClient {
* Returns an async generator of PeerInfos that can provide the content
* for the passed CID
*/
getProviders(cid: CID, options?: AbortOptions): AsyncGenerator<PeerInfo>
getProviders(cid: CID, options?: AbortOptions): AsyncGenerator<Record>

/**
* Returns an async generator of PeerInfos for the provided PeerId
*/
getPeers(pid: PeerId, options?: AbortOptions): AsyncGenerator<Record>

/**
* Returns a promise of a IPNSRecord for the given PeerId
*/
getIPNS(pid: PeerId, options?: AbortOptions): Promise<IPNSRecord>

/**
* Publishes the given IPNSRecorded for the provided PeerId
*/
putIPNS(pid: PeerId, record: IPNSRecord, options?: AbortOptions): Promise<void>

/**
* Shut down any currently running HTTP requests and clear up any resources
Expand Down
8 changes: 4 additions & 4 deletions packages/client/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ describe('routing-v1-http-api-client', () => {
ID: (await createEd25519PeerId()).toString(),
Addrs: ['/ip4/41.41.41.41/tcp/1234']
}, {
Protocol: 'transport-bitswap',
Schema: 'bitswap',
Protocols: ['transport-bitswap'],
Schema: 'peer',
Metadata: 'gBI=',
ID: (await createEd25519PeerId()).toString(),
Addrs: ['/ip4/42.42.42.42/tcp/1234']
Expand All @@ -50,8 +50,8 @@ describe('routing-v1-http-api-client', () => {

const provs = await all(client.getProviders(cid))
expect(provs.map(prov => ({
id: prov.id.toString(),
addrs: prov.multiaddrs.map(ma => ma.toString())
id: prov.ID.toString(),
addrs: prov.Addrs.map(ma => ma.toString())
}))).to.deep.equal(providers.map(prov => ({
id: prov.ID,
addrs: prov.Addrs
Expand Down
2 changes: 1 addition & 1 deletion packages/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
[![codecov](https://img.shields.io/codecov/c/github/ipfs/helia-routing-v1-http-api.svg?style=flat-square)](https://codecov.io/gh/ipfs/helia-routing-v1-http-api)
[![CI](https://img.shields.io/github/actions/workflow/status/ipfs/helia-routing-v1-http-api/js-test-and-release.yml?branch=main\&style=flat-square)](https://github.com/ipfs/helia-routing-v1-http-api/actions/workflows/js-test-and-release.yml?query=branch%3Amain)

> A Routing V1 HTTP API server powered by Helia
> A [Delegated Routing V1](https://specs.ipfs.tech/routing/http-routing-v1/) HTTP API server powered by Helia
## Table of contents <!-- omit in toc -->

Expand Down

0 comments on commit 0a08dc4

Please sign in to comment.