Skip to content

Commit

Permalink
feat: add method for uploading CAR files (#293)
Browse files Browse the repository at this point in the history
resolves #288
  • Loading branch information
terichadbourne authored Aug 11, 2021
1 parent 7d2f754 commit 3f6b6d3
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 62 deletions.
172 changes: 115 additions & 57 deletions packages/client/src/lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { pack } from 'ipfs-car/pack'
import parseLink from 'parse-link-header'
import { unpackStream } from 'ipfs-car/unpack'
import { TreewalkCarSplitter } from 'carbites/treewalk'
import { CarReader } from '@ipld/car'
import { filesFromPath, getFilesFromPath } from 'files-from-path'
import {
fetch,
Expand All @@ -39,6 +40,7 @@ const MAX_CHUNK_SIZE = 1024 * 1024 * 10 // chunk to ~10MB CARs
/** @typedef { import('./lib/interface.js').Filelike } Filelike */
/** @typedef { import('./lib/interface.js').CIDString} CIDString */
/** @typedef { import('./lib/interface.js').PutOptions} PutOptions */
/** @typedef { import('./lib/interface.js').PutCarOptions} PutCarOptions */
/** @typedef { import('./lib/interface.js').UnixFSEntry} UnixFSEntry */
/** @typedef { import('./lib/interface.js').Web3Response} Web3Response */

Expand Down Expand Up @@ -75,6 +77,7 @@ class Web3Storage {
/**
* @hidden
* @param {string} token
* @returns {Record<string, string>}
*/
static headers (token) {
if (!token) throw new Error('missing token')
Expand All @@ -97,22 +100,7 @@ class Web3Storage {
wrapWithDirectory = true,
name
} = {}) {
const url = new URL('/car', endpoint)
const targetSize = MAX_CHUNK_SIZE
let headers = Web3Storage.headers(token)

if (name) {
headers = {
...headers,
// @ts-ignore 'X-Name' does not exist in type inferred
'X-Name': name
}
}

/** @type {string} */
let carRoot
const blockstore = new Blockstore()

try {
const { out, root } = await pack({
input: Array.from(files).map((f) => ({
Expand All @@ -124,56 +112,81 @@ class Web3Storage {
maxChunkSize: 1048576,
maxChildrenPerNode: 1024
})
carRoot = root.toString()
onRootCidReady && onRootCidReady(root.toString())
const car = await CarReader.fromIterable(out)
return await Web3Storage.putCar({ endpoint, token }, car, { onStoredChunk, maxRetries, name })
} finally {
await blockstore.close()
}
}

onRootCidReady && onRootCidReady(carRoot)
/**
* @param {Service} service
* @param {import('@ipld/car/api').CarReader} car
* @param {PutCarOptions} [options]
* @returns {Promise<CIDString>}
*/
static async putCar ({ endpoint, token }, car, {
name,
onStoredChunk,
maxRetries = MAX_PUT_RETRIES
} = {}) {
const targetSize = MAX_CHUNK_SIZE
const url = new URL('/car', endpoint)
let headers = Web3Storage.headers(token)

const splitter = await TreewalkCarSplitter.fromIterable(out, targetSize)
if (name) {
headers = { ...headers, 'X-Name': name }
}

const upload = transform(
MAX_CONCURRENT_UPLOADS,
async (/** @type {AsyncIterable<Uint8Array>} */ car) => {
const carParts = []
for await (const part of car) {
carParts.push(part)
}
const roots = await car.getRoots()
if (roots[0] == null) {
throw new Error('missing root CID')
}
if (roots.length > 1) {
throw new Error('too many roots')
}

const carRoot = roots[0].toString()
const splitter = new TreewalkCarSplitter(car, targetSize)

const carFile = new Blob(carParts, {
type: 'application/car'
/**
* @param {AsyncIterable<Uint8Array>} car
* @returns {Promise<CIDString>}
*/
const onCarChunk = async car => {
const carParts = []
for await (const part of car) {
carParts.push(part)
}

const carFile = new Blob(carParts, { type: 'application/car' })
const res = await pRetry(
async () => {
const request = await fetch(url.toString(), {
method: 'POST',
headers,
body: carFile
})
const res = await request.json()
if (!request.ok) {
throw new Error(res.message)
}

const res = await pRetry(
async () => {
const request = await fetch(url.toString(), {
method: 'POST',
headers,
body: carFile
})
const res = await request.json()

if (!request.ok) {
throw new Error(res.message)
}

if (res.cid !== carRoot) {
throw new Error(`root CID mismatch, expected: ${carRoot}, received: ${res.cid}`)
}

return res.cid
},
{ retries: maxRetries }
)
onStoredChunk && onStoredChunk(carFile.size)
return res
}
if (res.cid !== carRoot) {
throw new Error(`root CID mismatch, expected: ${carRoot}, received: ${res.cid}`)
}
return res.cid
},
{ retries: maxRetries }
)

for await (const _ of upload(splitter.cars())) {} // eslint-disable-line
} finally {
// Close Blockstore
await blockstore.close()
onStoredChunk && onStoredChunk(carFile.size)
return res
}

const upload = transform(MAX_CONCURRENT_UPLOADS, onCarChunk)
for await (const _ of upload(splitter.cars())) {} // eslint-disable-line
return carRoot
}

Expand All @@ -198,7 +211,7 @@ class Web3Storage {
*/
/* c8 ignore next 4 */
static async delete ({ endpoint, token }, cid) {
console.log('Not deleteing', cid, endpoint, token)
console.log('Not deleting', cid, endpoint, token)
throw Error('.delete not implemented yet')
}

Expand Down Expand Up @@ -284,7 +297,52 @@ class Web3Storage {
}

/**
* Fetch the Content Addressed Archive by it's root CID.
* Uploads a CAR ([Content Addressed Archive](https://github.com/ipld/specs/blob/master/block-layer/content-addressable-archives.md)) file to web3.storage.
* Takes a CarReader interface from @ipld/car
*
* Returns the corresponding Content Identifier (CID).
*
* @example
* ```js
* import fs from 'fs'
* import { Readable } from 'stream'
* import { CarReader, CarWriter } from '@ipld/car'
* import * as raw from 'multiformats/codecs/raw'
* import { CID } from 'multiformats/cid'
* import { sha256 } from 'multiformats/hashes/sha2'
*
* async function getCar() {
* const bytes = new TextEncoder().encode('random meaningless bytes')
* const hash = await sha256.digest(raw.encode(bytes))
* const cid = CID.create(1, raw.code, hash)
*
* // create the writer and set the header with a single root
* const { writer, out } = await CarWriter.create([cid])
* Readable.from(out).pipe(fs.createWriteStream('example.car'))
* // store a new block, creates a new file entry in the CAR archive
* await writer.put({ cid, bytes })
* await writer.close()
* const inStream = fs.createReadStream('example.car')
* // read and parse the entire stream in one go, this will cache the contents of
* // the car in memory so is not suitable for large files.
* const reader = await CarReader.fromIterable(inStream)
* return reader
* }
*
* const car = await getCar()
* const cid = await client.putCar(car)
* ```
* @param {import('@ipld/car/api').CarReader} car
* @param {PutCarOptions} [options]
*/
putCar (car, options) {
return Web3Storage.putCar(this, car, options)
}

/**
* Fetch the Content Addressed Archive by its root CID.
* @param {CIDString} cid
*/
get (cid) {
Expand Down
37 changes: 32 additions & 5 deletions packages/client/src/lib/interface.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { UnixFSEntry } from 'ipfs-car/unpack'
import type { CID } from 'multiformats'
export type { CID, UnixFSEntry }
import type { CarReader } from '@ipld/car/api'

/**
* Define nominal type of U based on type of T. Similar to Opaque types in Flow
Expand Down Expand Up @@ -31,6 +32,15 @@ export interface API {
options?: PutOptions
): Promise<CIDString>

/**
* Uploads a CAR ([Content Addressed Archive](https://github.com/ipld/specs/blob/master/block-layer/content-addressable-archives.md)) file to web3.storage.
*/
putCar(
service: Service,
car: CarReader,
options?: PutCarOptions
): Promise<CIDString>

/**
* Get files for a root CID packed as a CAR file
*/
Expand Down Expand Up @@ -76,7 +86,7 @@ export interface Filelike {
export type PutOptions = {
/**
* Callback called after the data has been assembled into a DAG, but before
* any upload requests begin. It is passed the CID of the root node of the
* any upload requests begin. It is passed the CID of the root node of the
* graph.
*/
onRootCidReady?: (cid: CIDString) => void
Expand All @@ -92,13 +102,13 @@ export type PutOptions = {
maxRetries?: number
/**
* Should input files be wrapped with a directory? Default: true
*
*
* It is enabled by default as it preserves the input filenames in DAG;
* the filenames become directory entries in the generated wrapping dir.
*
* The trade off is your root CID will be that of the wrapping dir,
*
* The trade off is your root CID will be that of the wrapping dir,
* rather than the input file itself.
*
*
* For a single file e.g. `cat.png` it's IPFS path would be
* `<wrapping dir cid>/cat.png` rather than just `<cid for cat.png>`
*
Expand All @@ -112,6 +122,23 @@ export type PutOptions = {
name?: string
}

export type PutCarOptions = {
/**
* Human readable name for this upload, for use in file listings.
*/
name?: string
/**
* Callback called after each chunk of data has been uploaded. By default,
* data is split into chunks of around 10MB. It is passed the actual chunk
* size in bytes.
*/
onStoredChunk?: (size: number) => void
/**
* Maximum times to retry a failed upload. Default: 5
*/
maxRetries?: number
}

export interface Web3File extends File {
/**
* Content Identifier for the file data.
Expand Down
58 changes: 58 additions & 0 deletions packages/client/test/put.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import * as assert from 'uvu/assert'
import randomBytes from 'randombytes'
import { Web3Storage } from 'web3.storage'
import { File } from '../src/platform.js'
import { pack } from 'ipfs-car/pack'
import { CarReader, CarWriter } from '@ipld/car'
import { CID } from 'multiformats/cid'

describe('put', () => {
const { AUTH_TOKEN, API_PORT } = process.env
Expand Down Expand Up @@ -96,6 +99,54 @@ describe('put', () => {
})
})

describe('putCar', () => {
const { AUTH_TOKEN, API_PORT } = process.env
const token = AUTH_TOKEN || 'good'
const endpoint = new URL(API_PORT ? `http://localhost:${API_PORT}` : '')

it('adds CAR files', async () => {
const client = new Web3Storage({ token, endpoint })
const carReader = await createCar('hello world')
const expectedCid = 'bafybeiczsscdsbs7ffqz55asqdf3smv6klcw3gofszvwlyarci47bgf354'
const cid = await client.putCar(carReader, {
name: 'putCar test',
onRootCidReady: cid => {
assert.equal(cid, expectedCid, 'returned cid matches the CAR')
}
})
assert.equal(cid, expectedCid, 'returned cid matches the CAR')
})

it('errors for CAR with zero roots', async () => {
const client = new Web3Storage({ token, endpoint })
const { writer, out } = CarWriter.create([])
writer.close()
const reader = await CarReader.fromIterable(out)
try {
await client.putCar(reader)
assert.unreachable('should have thrown')
} catch (err) {
assert.match(err.message, /missing root CID/)
}
})

it('errors for CAR with multiple roots', async () => {
const client = new Web3Storage({ token, endpoint })
const { writer, out } = CarWriter.create([
CID.parse('bafybeiczsscdsbs7ffqz55asqdf3smv6klcw3gofszvwlyarci47bgf354'),
CID.parse('bafybeifkc773a2s6gerq7ip7tikahlfflxe4fvagyxf74zfkr33j2yu5li')
])
writer.close()
const reader = await CarReader.fromIterable(out)
try {
await client.putCar(reader)
assert.unreachable('should have thrown')
} catch (err) {
assert.match(err.message, /too many roots/)
}
})
})

function prepareFiles () {
const data = 'Hello web3.storage!'
const data2 = 'Hello web3.storage!!'
Expand All @@ -119,3 +170,10 @@ function prepareFiles () {
)
]
}

// creates a carReader from a text string
async function createCar (str) {
const { out } = await pack({ input: new TextEncoder().encode(str) })
const reader = await CarReader.fromIterable(out)
return reader
}

0 comments on commit 3f6b6d3

Please sign in to comment.