Skip to content

Commit

Permalink
feat: allow custom shard size (#185)
Browse files Browse the repository at this point in the history
Bubbles up the shard size option so `uploadFile` and `uploadDirectory` can set it.
  • Loading branch information
alanshaw committed Nov 18, 2022
1 parent 785924b commit 0cada8a
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 15 deletions.
6 changes: 4 additions & 2 deletions packages/upload-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ function uploadDirectory(
options: {
retries?: number
signal?: AbortSignal
onShardStored: ShardStoredCallback
onShardStored?: ShardStoredCallback
shardSize?: number
} = {}
): Promise<CID>
```
Expand All @@ -170,7 +171,8 @@ function uploadFile(
options: {
retries?: number
signal?: AbortSignal
onShardStored: ShardStoredCallback
onShardStored?: ShardStoredCallback
shardSize?: number
} = {}
): Promise<CID>
```
Expand Down
13 changes: 4 additions & 9 deletions packages/upload-client/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ import { ShardingStream, ShardStoringStream } from './sharding.js'
export { Storage, Upload, UnixFS, CAR }
export * from './sharding.js'

/**
* @typedef {(meta: import('./types').CARMetadata) => void} StoredShardCallback
* @typedef {import('./types').RequestOptions & { onStoredShard?: StoredShardCallback }} UploadOptions
*/

/**
* Uploads a file to the service and returns the root data CID for the
* generated DAG.
Expand All @@ -32,7 +27,7 @@ export * from './sharding.js'
*
* The issuer needs the `store/add` and `upload/add` delegated capability.
* @param {Blob} file File data.
* @param {UploadOptions} [options]
* @param {import('./types').UploadOptions} [options]
*/
export async function uploadFile(conf, file, options = {}) {
return await uploadBlockStream(
Expand Down Expand Up @@ -63,7 +58,7 @@ export async function uploadFile(conf, file, options = {}) {
*
* The issuer needs the `store/add` and `upload/add` delegated capability.
* @param {import('./types').FileLike[]} files File data.
* @param {UploadOptions} [options]
* @param {import('./types').UploadOptions} [options]
*/
export async function uploadDirectory(conf, files, options = {}) {
return await uploadBlockStream(
Expand All @@ -76,7 +71,7 @@ export async function uploadDirectory(conf, files, options = {}) {
/**
* @param {import('./types').InvocationConfig} conf
* @param {ReadableStream<import('@ipld/unixfs').Block>} blocks
* @param {UploadOptions} [options]
* @param {import('./types').UploadOptions} [options]
* @returns {Promise<import('./types').AnyLink>}
*/
async function uploadBlockStream(conf, blocks, options = {}) {
Expand All @@ -85,7 +80,7 @@ async function uploadBlockStream(conf, blocks, options = {}) {
/** @type {import('./types').AnyLink?} */
let root = null
await blocks
.pipeThrough(new ShardingStream())
.pipeThrough(new ShardingStream(options))
.pipeThrough(new ShardStoringStream(conf, options))
.pipeTo(
new WritableStream({
Expand Down
4 changes: 1 addition & 3 deletions packages/upload-client/src/sharding.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ const CONCURRENT_UPLOADS = 3
*/
export class ShardingStream extends TransformStream {
/**
* @param {object} [options]
* @param {number} [options.shardSize] The target shard size. Actual size of
* CAR output may be bigger due to CAR header and block encoding data.
* @param {import('./types').ShardingOptions} [options]
*/
constructor(options = {}) {
const shardSize = options.shardSize ?? SHARD_SIZE
Expand Down
14 changes: 13 additions & 1 deletion packages/upload-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,19 @@ export interface Connectable {
connection?: ConnectionView<Service>
}

export type RequestOptions = Retryable & Abortable & Connectable
export interface RequestOptions extends Retryable, Abortable, Connectable {}

export interface ShardingOptions {
/**
* The target shard size. Actual size of CAR output may be bigger due to CAR
* header and block encoding data.
*/
shardSize?: number
}

export interface UploadOptions extends RequestOptions, ShardingOptions {
onStoredShard?: (meta: CARMetadata) => void
}

export interface BlobLike {
/**
Expand Down
116 changes: 116 additions & 0 deletions packages/upload-client/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,64 @@ describe('uploadFile', () => {
assert(carCID)
assert(dataCID)
})

it('allows custom shard size to be set', async () => {
const res = {
status: 'upload',
headers: { 'x-test': 'true' },
url: 'http://localhost:9200',
}

const space = await Signer.generate()
const agent = await Signer.generate() // The "user" that will ask the service to accept the upload
const file = new Blob([await randomBytes(500_000)])
/** @type {import('../src/types').CARLink[]} */
const carCIDs = []

const proofs = await Promise.all([
StoreCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
UploadCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
])

const service = mockService({
store: { add: provide(StoreCapabilities.add, () => res) },
upload: { add: provide(UploadCapabilities.add, () => null) },
})

const server = Server.create({
id: serviceSigner,
service,
decoder: CAR,
encoder: CBOR,
})
const connection = Client.connect({
id: serviceSigner,
encoder: CAR,
decoder: CBOR,
channel: server,
})
await uploadFile(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
file,
{
connection,
shardSize: 400_000, // should end up with 2 CAR files
onStoredShard: (meta) => carCIDs.push(meta.cid),
}
)

assert.equal(carCIDs.length, 2)
})
})

describe('uploadDirectory', () => {
Expand Down Expand Up @@ -188,4 +246,62 @@ describe('uploadDirectory', () => {
assert(carCID)
assert(dataCID)
})

it('allows custom shard size to be set', async () => {
const res = {
status: 'upload',
headers: { 'x-test': 'true' },
url: 'http://localhost:9200',
}

const space = await Signer.generate()
const agent = await Signer.generate() // The "user" that will ask the service to accept the upload
const files = [new File([await randomBytes(500_000)], '1.txt')]
/** @type {import('../src/types').CARLink[]} */
const carCIDs = []

const proofs = await Promise.all([
StoreCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
UploadCapabilities.add.delegate({
issuer: space,
audience: agent,
with: space.did(),
expiration: Infinity,
}),
])

const service = mockService({
store: { add: provide(StoreCapabilities.add, () => res) },
upload: { add: provide(UploadCapabilities.add, () => null) },
})

const server = Server.create({
id: serviceSigner,
service,
decoder: CAR,
encoder: CBOR,
})
const connection = Client.connect({
id: serviceSigner,
encoder: CAR,
decoder: CBOR,
channel: server,
})
await uploadDirectory(
{ issuer: agent, with: space.did(), proofs, audience: serviceSigner },
files,
{
connection,
shardSize: 400_000, // should end up with 2 CAR files
onStoredShard: (meta) => carCIDs.push(meta.cid),
}
)

assert.equal(carCIDs.length, 2)
})
})

0 comments on commit 0cada8a

Please sign in to comment.