Skip to content

Commit

Permalink
feat: serve CAR blocks (#68)
Browse files Browse the repository at this point in the history
This PR adds a new handler that allows freeway to serve CAR files
directly from R2 (CARPARK).

Requesting a CID from the gateway with the CAR codec `0x0202` will
invoke this new handler.

Obviously CARs are not served in the same way as regular IPFS data,
since we can serve the data directly from an R2 bucket. To deal with
this difference, there's a new middlewares that detects the CAR codec
and calls the correct handler, passing through to the regular handlers
if the CAR codec is not detected.

The handler supports `HEAD` requests, allowing you to get the file size
of the item you're requesting. It also supports the HTTP `Range` header,
allowing you to extract byte ranges from within a CAR, which you might
do if you fetch the index for the CAR.

The idea is that we can use these handlers to build a system that runs
on content-claims. The flow might be something like:

1. Get [partition
claim](https://github.com/web3-storage/content-claims#partition-claim)
for a given content CID
2. Get [inclusion
claims](https://github.com/web3-storage/content-claims#inclusion-claim)
for each CAR `part` in the partition claim
3. Fetch each CARv2 index from the gateway for each `includes` CID
specified in the inclusion claims (You must first fetch partition claims
for each index CID to find out which CAR file they can be found in).
4. Use HTTP `Range` requests to export the whole DAG or a sub-DAG
directly from the `parts`, using the index data as a guide for which
blocks to extract.

Note: if exporting a full DAG you might want to just stop after (1) and
import all data in all CARs into IPFS.
  • Loading branch information
Alan Shaw authored Jul 31, 2023
1 parent 86d1f7f commit 0b95438
Show file tree
Hide file tree
Showing 14 changed files with 441 additions and 76 deletions.
22 changes: 14 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
"@ipld/dag-cbor": "^9.0.0",
"@ipld/dag-json": "^10.0.1",
"@ipld/dag-pb": "^4.0.2",
"@web3-storage/content-claims": "^2.1.1",
"@web3-storage/gateway-lib": "^3.2.4",
"@web3-storage/content-claims": "^3.0.1",
"@web3-storage/gateway-lib": "^3.3.2",
"cardex": "^2.3.1",
"chardet": "^1.5.0",
"dagula": "^7.0.0",
"http-range-parse": "^1.0.0",
"lnmap": "^1.0.1",
"magic-bytes.js": "^1.0.12",
"mrmime": "^1.0.1",
Expand Down
1 change: 1 addition & 0 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export interface SimpleBucket {
export interface SimpleBucketObject {
readonly key: string
readonly body: ReadableStream
arrayBuffer(): Promise<ArrayBuffer>
}

export interface IndexSource {
Expand Down
82 changes: 82 additions & 0 deletions src/handlers/car-block.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/* eslint-env browser */
/* global FixedLengthStream */
import { HttpError } from '@web3-storage/gateway-lib/util'
import { CAR_CODE } from '../constants.js'
import * as Http from '../lib/http.js'

/** @typedef {import('@web3-storage/gateway-lib').IpfsUrlContext} CarBlockHandlerContext */

/**
* Handler that serves CAR files directly from R2.
*
* @type {import('@web3-storage/gateway-lib').Handler<CarBlockHandlerContext, import('../bindings').Environment>}
*/
export async function handleCarBlock (request, env, ctx) {
const { searchParams, dataCid } = ctx
if (!dataCid) throw new Error('missing data CID')
if (!searchParams) throw new Error('missing URL search params')

if (request.method !== 'HEAD' && request.method !== 'GET') {
throw new HttpError('method not allowed', { status: 405 })
}
if (dataCid.code !== CAR_CODE) {
throw new HttpError('not a CAR CID', { status: 400 })
}

const etag = `"${dataCid}"`
if (request.headers.get('If-None-Match') === etag) {
return new Response(null, { status: 304 })
}

if (request.method === 'HEAD') {
const obj = await env.CARPARK.head(`${dataCid}/${dataCid}.car`)
if (!obj) throw new HttpError('CAR not found', { status: 404 })
return new Response(undefined, {
headers: {
'Accept-Ranges': 'bytes',
'Content-Length': obj.size.toString(),
Etag: etag
}
})
}

/** @type {import('../lib/http').Range|undefined} */
let range
if (request.headers.has('range')) {
try {
range = Http.parseRange(request.headers.get('range') ?? '')
} catch (err) {
throw new HttpError('invalid range', { status: 400, cause: err })
}
}

const obj = await env.CARPARK.get(`${dataCid}/${dataCid}.car`, { range })
if (!obj) throw new HttpError('CAR not found', { status: 404 })

const status = range ? 206 : 200
const headers = new Headers({
'Content-Type': 'application/vnd.ipld.car; version=1;',
'X-Content-Type-Options': 'nosniff',
'Cache-Control': 'public, max-age=29030400, immutable',
'Content-Disposition': `attachment; filename="${dataCid}.car"`,
Etag: etag
})

let contentLength = obj.size
if (range) {
let first, last
if ('suffix' in range) {
first = obj.size - range.suffix
last = obj.size - 1
} else {
first = range.offset || 0
last = range.length != null ? first + range.length - 1 : obj.size - 1
}
headers.set('Content-Range', `bytes ${first}-${last}/${obj.size}`)
contentLength = last - first
}
headers.set('Content-Length', contentLength.toString())

// @ts-expect-error ReadableStream types incompatible
return new Response(obj.body.pipeThrough(new FixedLengthStream(contentLength)), { status, headers })
}
10 changes: 6 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import {
import {
withDagula,
withIndexSources,
withUnsupportedFeaturesHandler,
withVersionHeader
withHttpRangeUnsupported,
withVersionHeader,
withCarHandler
} from './middleware.js'

/**
Expand All @@ -39,9 +40,10 @@ export default {
withVersionHeader,
withContentDispositionHeader,
withErrorHandler,
withUnsupportedFeaturesHandler,
withHttpGet,
withParsedIpfsUrl,
withCarHandler,
withHttpRangeUnsupported,
withHttpGet,
withIndexSources,
withDagula,
withFixedLengthStream
Expand Down
14 changes: 11 additions & 3 deletions src/lib/bucket.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@ export class CachingBucket {
async get (key) {
// > the cache key requires a TLD to be present in the URL
const cacheKey = new URL(key, 'http://cache.freeway.dag.haus')
const res = await this.#cache.match(cacheKey)
if (res && res.body) return { key, body: res.body }
const cacheRes = await this.#cache.match(cacheKey)
if (cacheRes && cacheRes.body) return { key, body: cacheRes.body, arrayBuffer: () => cacheRes.arrayBuffer() }
const obj = await this.#source.get(key)
if (!obj) return null
const [body0, body1] = obj.body.tee()
this.#ctx.waitUntil(this.#cache.put(cacheKey, new Response(body1, {
headers: { 'Cache-Control': `max-age=${MAX_AGE}` }
})))
return { key, body: body0 }
const res = new Response(body0)
return {
key,
get body () {
if (!res.body) throw new Error('missing body')
return res.body
},
arrayBuffer: () => res.arrayBuffer()
}
}
}

Expand Down
20 changes: 20 additions & 0 deletions src/lib/car.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { CarReader } from '@ipld/car'
import { CAR_CODE } from '../constants.js'

export const code = CAR_CODE

/**
* @param {Uint8Array} bytes
* @returns {Promise<Array<{ cid: import('multiformats').UnknownLink, bytes: Uint8Array }>>}
*/
export async function decode (bytes) {
const reader = await CarReader.fromBytes(bytes)
const blocks = []
for await (const b of reader.blocks()) {
blocks.push({
cid: /** @type {import('multiformats').UnknownLink} */ (b.cid),
bytes: b.bytes
})
}
return blocks
}
28 changes: 23 additions & 5 deletions src/lib/dag-index/content-claims.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as raw from 'multiformats/codecs/raw'
import * as Claims from '@web3-storage/content-claims/client'
import { MultihashIndexSortedReader } from 'cardex/multihash-index-sorted'
import { Map as LinkMap } from 'lnmap'
import { CAR_CODE } from '../../constants'
import * as CAR from '../car.js'

/**
* @typedef {import('multiformats').UnknownLink} UnknownLink
Expand All @@ -14,6 +14,11 @@ import { CAR_CODE } from '../../constants'

/** @implements {Index} */
export class ContentClaimsIndex {
/**
* Index store.
* @type {import('../../bindings').SimpleBucket}
*/
#bucket
/**
* Cached index entries.
* @type {Map<UnknownLink, IndexEntry>}
Expand All @@ -39,9 +44,11 @@ export class ContentClaimsIndex {
#serviceURL

/**
* @param {import('../../bindings').SimpleBucket} bucket Bucket that stores CARs.
* @param {{ serviceURL?: URL }} [options]
*/
constructor (options) {
constructor (bucket, options) {
this.#bucket = bucket
this.#cache = new LinkMap()
this.#claimFetched = new LinkMap()
this.#serviceURL = options?.serviceURL
Expand Down Expand Up @@ -92,21 +99,32 @@ export class ContentClaimsIndex {
// and we don't serve anything that we don't have in our own bucket.
if (claim.type !== 'assert/relation') continue

// export the blocks from the claim - should include the CARv2 indexes
// export the blocks from the claim - may include the CARv2 indexes
const blocks = [...claim.export()]

// each part is a tuple of CAR CID (content) & CARv2 index CID (includes)
for (const { content, includes } of claim.parts) {
if (!isCARLink(content)) continue
if (!includes) continue

const block = blocks.find(b => b.cid.toString() === includes.toString())
/** @type {{ cid: import('multiformats').UnknownLink, bytes: Uint8Array }|undefined} */
let block = blocks.find(b => b.cid.toString() === includes.content.toString())

// if the index is not included in the claim, it should be in CARPARK
if (!block && includes.parts?.length) {
const obj = await this.#bucket.get(`${includes.parts[0]}/${includes.parts[0]}.car`)
if (!obj) continue
const blocks = await CAR.decode(new Uint8Array(await obj.arrayBuffer()))
block = blocks.find(b => b.cid.toString() === includes.content.toString())
}
if (!block) continue

const entries = await decodeIndex(content, block.bytes)
for (const entry of entries) {
this.#cache.set(Link.create(raw.code, entry.multihash), entry)
}
}
break
}
this.#claimFetched.set(cid, true)
}
Expand All @@ -116,7 +134,7 @@ export class ContentClaimsIndex {
* @param {import('multiformats').Link} cid
* @returns {cid is import('cardex/api').CARLink}
*/
const isCARLink = cid => cid.code === CAR_CODE
const isCARLink = cid => cid.code === CAR.code

/**
* Read a MultihashIndexSorted index for the passed origin CAR and return a
Expand Down
19 changes: 19 additions & 0 deletions src/lib/http.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// @ts-expect-error no types
import httpRangeParse from 'http-range-parse'

/** @typedef {{ offset: number, length?: number } | { offset?: number, length: number } | { suffix: number }} Range */

/**
* Convert a HTTP Range header to a range object.
* @param {string} value
* @returns {Range}
*/
export function parseRange (value) {
const result = httpRangeParse(value)
if (result.ranges) throw new Error('Multipart ranges not supported')
const { unit, first, last, suffix } = result
if (unit !== 'bytes') throw new Error(`Unsupported range unit: ${unit}`)
return suffix != null
? { suffix }
: { offset: first, length: last != null ? last - first + 1 : undefined }
}
24 changes: 21 additions & 3 deletions src/middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { ContentClaimsIndex } from './lib/dag-index/content-claims.js'
import { MultiCarIndex, StreamingCarIndex } from './lib/dag-index/car.js'
import { CachingBucket, asSimpleBucket } from './lib/bucket.js'
import { MAX_CAR_BYTES_IN_MEMORY, CAR_CODE } from './constants.js'
import { handleCarBlock } from './handlers/car-block.js'

/**
* @typedef {import('./bindings').Environment} Environment
Expand All @@ -18,11 +19,11 @@ import { MAX_CAR_BYTES_IN_MEMORY, CAR_CODE } from './constants.js'
*/

/**
* Validates the request does not contain unsupported features.
* Validates the request does not contain a HTTP `Range` header.
* Returns 501 Not Implemented in case it has.
* @type {import('@web3-storage/gateway-lib').Middleware<import('@web3-storage/gateway-lib').Context>}
*/
export function withUnsupportedFeaturesHandler (handler) {
export function withHttpRangeUnsupported (handler) {
return (request, env, ctx) => {
// Range request https://github.com/web3-storage/gateway-lib/issues/12
if (request.headers.get('range')) {
Expand All @@ -33,6 +34,23 @@ export function withUnsupportedFeaturesHandler (handler) {
}
}

/**
* Middleware that will serve CAR files if a CAR codec is found in the path
* CID. If the CID is not a CAR CID it delegates to the next middleware.
*
* @type {import('@web3-storage/gateway-lib').Middleware<IpfsUrlContext, IpfsUrlContext, Environment>}
*/
export function withCarHandler (handler) {
return async (request, env, ctx) => {
const { dataCid } = ctx
if (!dataCid) throw new Error('missing data CID')
if (dataCid.code !== CAR_CODE) {
return handler(request, env, ctx) // pass to other handlers
}
return handleCarBlock(request, env, ctx)
}
}

/**
* Extracts a set of index sources from search params from the URL querystring
* or DUDEWHERE bucket.
Expand Down Expand Up @@ -144,7 +162,7 @@ export function withDagula (handler) {
blockstore = new BatchingR2Blockstore(env.CARPARK, index)
}
} else {
const index = new ContentClaimsIndex({
const index = new ContentClaimsIndex(asSimpleBucket(env.CARPARK), {
serviceURL: env.CONTENT_CLAIMS_SERVICE_URL ? new URL(env.CONTENT_CLAIMS_SERVICE_URL) : undefined
})
const found = await index.get(dataCid)
Expand Down
Loading

0 comments on commit 0b95438

Please sign in to comment.