Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!: conform to Delegated Routing V1 HTTP spec #41

Merged
merged 19 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions packages/client/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,51 @@ const options = {
test: {
before: async () => {
const providers = new Map()
const peers = new Map()
const ipnsGet = new Map()
const ipnsPut = new Map()
const echo = new EchoServer()
echo.polka.use(body.raw({ type: 'application/vnd.ipfs.ipns-record'}))
echo.polka.use(body.text())
echo.polka.post('/add-providers/:cid', (req, res) => {
providers.set(req.params.cid, req.body)
res.end()
})
echo.polka.get('/routing/v1/providers/:cid', (req, res) => {
const provs = providers.get(req.params.cid) ?? '[]'
const records = providers.get(req.params.cid) ?? '[]'
providers.delete(req.params.cid)

res.end(provs)
res.end(records)
})
echo.polka.post('/add-peers/:peerId', (req, res) => {
peers.set(req.params.peerId, req.body)
res.end()
})
echo.polka.get('/routing/v1/peers/:peerId', (req, res) => {
const records = peers.get(req.params.peerId) ?? '[]'
peers.delete(req.params.peerId)

res.end(records)
})
echo.polka.post('/add-ipns/:peerId', (req, res) => {
ipnsGet.set(req.params.peerId, req.body)
res.end()
})
echo.polka.get('/routing/v1/ipns/:peerId', (req, res) => {
const record = ipnsGet.get(req.params.peerId) ?? ''
ipnsGet.delete(req.params.peerId)

res.end(record)
})
echo.polka.put('/routing/v1/ipns/:peerId', (req, res) => {
ipnsPut.set(req.params.peerId, req.body)
res.end()
})
echo.polka.get('/get-ipns/:peerId', (req, res) => {
const record = ipnsPut.get(req.params.peerId) ?? ''
ipnsPut.delete(req.params.peerId)

res.end(record)
})

await echo.start()
Expand Down
2 changes: 1 addition & 1 deletion packages/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
[![codecov](https://img.shields.io/codecov/c/github/ipfs/helia-routing-v1-http-api.svg?style=flat-square)](https://codecov.io/gh/ipfs/helia-routing-v1-http-api)
[![CI](https://img.shields.io/github/actions/workflow/status/ipfs/helia-routing-v1-http-api/js-test-and-release.yml?branch=main\&style=flat-square)](https://github.com/ipfs/helia-routing-v1-http-api/actions/workflows/js-test-and-release.yml?query=branch%3Amain)

> A Routing V1 HTTP API client
> A Delegated Routing V1 HTTP API client

## Table of contents <!-- omit in toc -->

Expand Down
3 changes: 2 additions & 1 deletion packages/client/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@helia/routing-v1-http-api-client",
"version": "1.0.2",
"description": "A Routing V1 HTTP API client",
"description": "A Delegated Routing V1 HTTP API client",
"license": "Apache-2.0 OR MIT",
"homepage": "https://github.com/ipfs/helia-routing-v1-http-api/tree/master/packages/client#readme",
"repository": {
Expand Down Expand Up @@ -136,6 +136,7 @@
"@multiformats/multiaddr": "^12.1.3",
"any-signal": "^4.1.1",
"browser-readablestream-to-it": "^2.0.3",
"ipns": "^7.0.1",
"it-all": "^3.0.2",
"iterable-ndjson": "^1.1.0",
"multiformats": "^12.1.1",
Expand Down
199 changes: 167 additions & 32 deletions packages/client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,19 @@ import { peerIdFromString } from '@libp2p/peer-id'
import { multiaddr } from '@multiformats/multiaddr'
import { anySignal } from 'any-signal'
import toIt from 'browser-readablestream-to-it'
import { unmarshal, type IPNSRecord, marshal, peerIdToRoutingKey } from 'ipns'
import { ipnsValidator } from 'ipns/validator'
// @ts-expect-error no types
import ndjson from 'iterable-ndjson'
import defer from 'p-defer'
import PQueue from 'p-queue'
import type { RoutingV1HttpApiClient, RoutingV1HttpApiClientInit } from './index.js'
import type { RoutingV1HttpApiClient, RoutingV1HttpApiClientInit, Record, PeerRecord } from './index.js'
import type { AbortOptions } from '@libp2p/interface'
import type { PeerInfo } from '@libp2p/interface/peer-info'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { CID } from 'multiformats'

const log = logger('routing-v1-http-api-client')

interface RoutingV1HttpApiGetProvidersResponse {
Protocol: string
Schema: string
ID: string
Addrs: Multiaddr[]
}

const defaultValues = {
concurrentRequests: 4,
timeout: 30e3
Expand Down Expand Up @@ -62,8 +56,8 @@ export class DefaultRoutingV1HttpApiClient implements RoutingV1HttpApiClient {
this.started = false
}

async * getProviders (cid: CID, options: AbortOptions | undefined = {}): AsyncGenerator<PeerInfo, any, unknown> {
log('findProviders starts: %c', cid)
async * getProviders (cid: CID, options: AbortOptions = {}): AsyncGenerator<Record, any, unknown> {
log('getProviders starts: %c', cid)

const signal = anySignal([this.shutDownController.signal, options.signal, AbortSignal.timeout(this.timeout)])
const onStart = defer()
Expand All @@ -77,46 +71,187 @@ export class DefaultRoutingV1HttpApiClient implements RoutingV1HttpApiClient {
try {
await onStart.promise

// https://github.com/ipfs/specs/blob/main/routing/ROUTING_V1_HTTP.md#api
// https://specs.ipfs.tech/routing/http-routing-v1/
const resource = `${this.clientUrl}routing/v1/providers/${cid.toString()}`
const getOptions = { headers: { Accept: 'application/x-ndjson' }, signal }
const a = await fetch(resource, getOptions)
const res = await fetch(resource, getOptions)

if (a.body == null) {
if (res.body == null) {
throw new CodeError('Routing response had no body', 'ERR_BAD_RESPONSE')
}

for await (const event of ndjson(toIt(a.body))) {
if (event.Protocol !== 'transport-bitswap') {
continue
const contentType = res.headers.get('Content-Type')
if (contentType === 'application/json') {
const body = await res.json()

for (const provider of body.Providers) {
const record = this.#handleProviderRecords(provider)
if (record !== null) {
yield record
}
}
} else {
for await (const provider of ndjson(toIt(res.body))) {
const record = this.#handleProviderRecords(provider)
if (record !== null) {
yield record
}
}
}
} catch (err) {
log.error('getProviders errored:', err)
} finally {
signal.clear()
onFinish.resolve()
log('getProviders finished: %c', cid)
}
}

async * getPeerInfo (peerId: PeerId, options: AbortOptions | undefined = {}): AsyncGenerator<PeerRecord, any, unknown> {
log('getPeers starts: %c', peerId)

const signal = anySignal([this.shutDownController.signal, options.signal, AbortSignal.timeout(this.timeout)])
const onStart = defer()
const onFinish = defer()

yield this.#mapProvider(event)
void this.httpQueue.add(async () => {
onStart.resolve()
return onFinish.promise
})

try {
await onStart.promise

// https://specs.ipfs.tech/routing/http-routing-v1/
const resource = `${this.clientUrl}routing/v1/peers/${peerId.toCID().toString()}`
const getOptions = { headers: { Accept: 'application/x-ndjson' }, signal }
const res = await fetch(resource, getOptions)

if (res.body == null) {
throw new CodeError('Routing response had no body', 'ERR_BAD_RESPONSE')
}

const contentType = res.headers.get('Content-Type')
if (contentType === 'application/json') {
const body = await res.json()

for (const peer of body.Peers) {
const record = this.#handlePeerRecords(peerId, peer)
if (record !== null) {
yield record
}
}
} else {
for await (const peer of ndjson(toIt(res.body))) {
const record = this.#handlePeerRecords(peerId, peer)
if (record !== null) {
yield record
}
}
}
} catch (err) {
log.error('findProviders errored:', err)
log.error('getPeers errored:', err)
} finally {
signal.clear()
onFinish.resolve()
log('getPeers finished: %c', peerId)
}
}

async getIPNS (peerId: PeerId, options: AbortOptions = {}): Promise<IPNSRecord> {
log('getIPNS starts: %c', peerId)

const signal = anySignal([this.shutDownController.signal, options.signal, AbortSignal.timeout(this.timeout)])
const onStart = defer()
const onFinish = defer()

void this.httpQueue.add(async () => {
onStart.resolve()
return onFinish.promise
})

try {
await onStart.promise

// https://specs.ipfs.tech/routing/http-routing-v1/
const resource = `${this.clientUrl}routing/v1/ipns/${peerId.toCID().toString()}`
const getOptions = { headers: { Accept: 'application/vnd.ipfs.ipns-record' }, signal }
const res = await fetch(resource, getOptions)

if (res.body == null) {
throw new CodeError('GET ipns response had no body', 'ERR_BAD_RESPONSE')
}

const body = new Uint8Array(await res.arrayBuffer())
await ipnsValidator(peerIdToRoutingKey(peerId), body)
return unmarshal(body)
} finally {
signal.clear()
onFinish.resolve()
log('findProviders finished: %c', cid)
log('getIPNS finished: %c', peerId)
}
}

#mapProvider (event: RoutingV1HttpApiGetProvidersResponse): PeerInfo {
const peer = peerIdFromString(event.ID)
const ma: Multiaddr[] = []
async putIPNS (peerId: PeerId, record: IPNSRecord, options: AbortOptions = {}): Promise<void> {
log('getIPNS starts: %c', peerId)

for (const strAddr of event.Addrs) {
const addr = multiaddr(strAddr)
ma.push(addr)
const signal = anySignal([this.shutDownController.signal, options.signal, AbortSignal.timeout(this.timeout)])
const onStart = defer()
const onFinish = defer()

void this.httpQueue.add(async () => {
onStart.resolve()
return onFinish.promise
})

try {
await onStart.promise

const body = marshal(record)

// https://specs.ipfs.tech/routing/http-routing-v1/
const resource = `${this.clientUrl}routing/v1/ipns/${peerId.toCID().toString()}`
const getOptions = { method: 'PUT', headers: { 'Content-Type': 'application/vnd.ipfs.ipns-record' }, body, signal }
const res = await fetch(resource, getOptions)
if (res.status !== 200) {
throw new CodeError('PUT ipns response had status other than 200', 'ERR_BAD_RESPONSE')
}
} finally {
signal.clear()
onFinish.resolve()
log('getIPNS finished: %c', peerId)
}
}

const pi = {
id: peer,
multiaddrs: ma,
protocols: []
#handleProviderRecords (record: any): Record | null {
if (record.Schema === 'peer') {
// Peer schema can have additional, user-defined, fields.
record.ID = peerIdFromString(record.ID)
record.Addrs = record.Addrs.map(multiaddr)
return record
} else if (record.Schema === 'bitswap') {
// Bitswap schema cannot have additional fields.
return {
Schema: record.Schema,
Protocol: record.Protocol,
ID: peerIdFromString(record.ID),
Addrs: record.Addrs.map(multiaddr)
}
}

return null
}

#handlePeerRecords (peerId: PeerId, record: any): PeerRecord | null {
if (record.Schema === 'peer') {
// Peer schema can have additional, user-defined, fields.
record.ID = peerIdFromString(record.ID)
record.Addrs = record.Addrs.map(multiaddr)
if (peerId.equals(record.ID)) {
return record
}
}

return pi
return null
}
}
37 changes: 35 additions & 2 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,27 @@

import { DefaultRoutingV1HttpApiClient } from './client.js'
import type { AbortOptions } from '@libp2p/interface'
import type { PeerInfo } from '@libp2p/interface/peer-info'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { IPNSRecord } from 'ipns'
import type { CID } from 'multiformats/cid'

export interface PeerRecord {
Schema: 'peer'
ID: PeerId
Addrs: Multiaddr[]
Protocols: string[]
}

export interface BitswapRecord {
Schema: 'bitswap'
Protocol: string
ID: PeerId
Addrs: Multiaddr[]
}

export type Record = PeerRecord | BitswapRecord

export interface RoutingV1HttpApiClientInit {
/**
* A concurrency limit to avoid request flood in web browser (default: 4)
Expand All @@ -41,7 +59,22 @@ export interface RoutingV1HttpApiClient {
* Returns an async generator of PeerInfos that can provide the content
* for the passed CID
*/
getProviders(cid: CID, options?: AbortOptions): AsyncGenerator<PeerInfo>
getProviders(cid: CID, options?: AbortOptions): AsyncGenerator<Record>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the return type be guaranteed to be BitswapRecord?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achingbrain we don't want BitswapRecords to be used, see https://specs.ipfs.tech/routing/http-routing-v1/#get-routing-v1-providers-cid - it should be a generic record. PeerRecords are recommended.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then can the return type be guaranteed to be PeerRecord? What I'm getting at here is it makes the users life easier if they don't have to deal with a union type.

Copy link
Member

@lidel lidel Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be other schemas than peer. But in practice we can limit this to PeerRecord responses: ignore unknown schemas unless they have ID and Addrs fields like the legacy bitswap schema, and interpret them aspeer schema.

For the wider context, the only /routing/v1 implementation (afaik) that returned bitswap schema was cid.contact (IPNI), and it had ID and Addrs just like peer schema does.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sounds like we can just convert any BitswapRecords to PeerRecords and drop the union type then.


/**
* Returns an async generator of PeerInfos for the provided PeerId
*/
getPeerInfo(peerId: PeerId, options?: AbortOptions): AsyncGenerator<PeerRecord>

/**
* Returns a promise of a IPNSRecord for the given PeerId
*/
getIPNS(peerId: PeerId, options?: AbortOptions): Promise<IPNSRecord>

/**
* Publishes the given IPNSRecorded for the provided PeerId
*/
putIPNS(peerId: PeerId, record: IPNSRecord, options?: AbortOptions): Promise<void>

/**
* Shut down any currently running HTTP requests and clear up any resources
Expand Down
Loading