Skip to content

Commit

Permalink
feat: improve chunker perf
Browse files Browse the repository at this point in the history
  • Loading branch information
Gozala committed Mar 9, 2022
1 parent 35ba07a commit dcbe064
Show file tree
Hide file tree
Showing 20 changed files with 639 additions and 97 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"dependencies": {
"@ipld/dag-pb": "^2.1.15",
"@web-std/stream": "1.0.1",
"rabin-rs": "^2.0.0",
"rabin-wasm": "^0.1.5",
"actor": "^2.2.1",
"protobufjs": "^6.11.2"
Expand Down
10 changes: 10 additions & 0 deletions src/file.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ export const defaults = () => ({
createCID: CID.createV1,
})

/**
*
* @param {Partial<API.FileWriterConfig>} config
* @returns {API.FileWriterConfig}
*/
export const configure = config => ({
...defaults(),
...config,
})

export const UnixFSLeaf = {
code: UnixFS.code,
name: UnixFS.name,
Expand Down
2 changes: 1 addition & 1 deletion src/file/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export * from "../writer/api.js"
import * as ChunkerService from "./chunker.js"
import * as LayoutService from "./layout.js"

export type { Chunker, Layout, MultihashHasher, MultihashDigest }
export type { Chunker, Layout, MultihashHasher, MultihashDigest, Block }

export interface FileWriterService<O = unknown, S = unknown>
extends FileWriterConfig<O, S> {
Expand Down
108 changes: 70 additions & 38 deletions src/file/chunker.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* eslint-disable no-nested-ternary */
/* eslint-disable no-unused-vars */
import * as Chunker from "./chunker/api.js"
import * as BufferQueue from "./chunker/buffer.js"
import { unreachable, EMPTY, EMPTY_BUFFER } from "../writer/util.js"
export * from "./chunker/api.js"

Expand All @@ -12,32 +13,35 @@ export * from "./chunker/api.js"
* @typedef {{
* status: 'none'
* config: Config
* buffer: Uint8Array
* buffer: Chunker.Buffer
* byteOffset: number
* }} EmptyState
* Represents empty state where no chunks have been found yet.
*
* @typedef {{
* status: 'single'
* byteOffset: number
* config: Config
* buffer: Uint8Array
* chunk: Uint8Array
* buffer: Chunker.Buffer
* chunk: Chunker.Buffer
* }} SingleChunkState
* Represents state where single chunk have been found. In this
* state it is not yet clear which file layout can be used, because
* since chunk files maybe encoded as raw blocks, or file blocks.
*
* @typedef {{
* status: 'multiple'
* byteOffset: number
* config: Config
* buffer: Uint8Array
* buffer: Chunker.Buffer
* }} MultiChunkState
* Represents state where more than one chunks have been found.
*
* @typedef {EmptyState|SingleChunkState|MultiChunkState} State
*
* @typedef {{
* state: State
* chunks: Uint8Array[]
* chunks: Chunker.Buffer[]
* }} Update
*/

Expand All @@ -47,8 +51,9 @@ export * from "./chunker/api.js"
*/
export const open = config => ({
config,
byteOffset: 0,
status: "none",
buffer: EMPTY_BUFFER,
buffer: BufferQueue.empty(),
})

/**
Expand All @@ -68,39 +73,53 @@ export const chunks = update => update.chunks
*/
export const append = (state, bytes) => {
const { config } = state
const { buffer, chunks } = split(concat(state.buffer, bytes), config.chunker)
const byteOffset = state.byteOffset + bytes.byteLength
const { buffer, chunks } = split(
state.buffer.push(bytes),
// concat(state.buffer, bytes),
config.chunker,
state.byteOffset,
false
)

switch (state.status) {
case "none":
switch (chunks.length) {
case 0:
return {
state: { ...state, buffer },
state: { ...state, byteOffset, buffer },
chunks: EMPTY,
}
case 1:
return {
state: { ...state, status: "single", buffer, chunk: chunks[0] },
state: {
...state,
status: "single",
byteOffset,
buffer,
chunk: chunks[0],
},
chunks: EMPTY,
}
default:
return {
state: { ...state, status: "multiple", buffer },
state: { ...state, status: "multiple", byteOffset, buffer },
chunks,
}
}
case "single":
if (chunks.length === 0) {
return { state: { ...state, buffer }, chunks: EMPTY }
return { state: { ...state, buffer, byteOffset }, chunks: EMPTY }
} else {
const { chunk, ...rest } = state
return {
state: { ...rest, status: "multiple", buffer },
state: { ...rest, status: "multiple", byteOffset, buffer },
chunks: [chunk, ...chunks],
}
}
case "multiple":
return {
state: { ...state, buffer },
state: { ...state, byteOffset, buffer },
chunks,
}
default:
Expand All @@ -110,30 +129,28 @@ export const append = (state, bytes) => {

/**
* @param {State} state
* @returns {{single: true, chunk:Uint8Array}|{single: false, chunks: Uint8Array[]}}
* @returns {{single: true, chunk:Chunker.Buffer}|{single: false, chunks: Chunker.Buffer[]}}
*/
export const close = state => {
const { buffer } = state
const { buffer, config } = state
// flush remaining bytes in the buffer
const { chunks } = split(buffer, config.chunker, state.byteOffset, true)

switch (state.status) {
case "none":
return {
single: true,
chunk: buffer,
}
case "none": {
return chunks.length === 1
? { single: true, chunk: chunks[0] }
: { single: false, chunks }
}
case "single": {
if (buffer.byteLength > 0) {
return {
single: false,
chunks: [state.chunk, buffer],
}
} else {
return { single: true, chunk: state.chunk }
}
return chunks.length === 0
? { single: true, chunk: state.chunk }
: { single: false, chunks: [state.chunk, ...chunks] }
}
case "multiple": {
return {
single: false,
chunks: buffer.byteLength > 0 ? [buffer] : EMPTY,
chunks,
}
}
default:
Expand All @@ -143,22 +160,37 @@ export const close = state => {

/**
* @param {Chunker.Chunker} chunker
* @param {Uint8Array} input
* @returns {{buffer:Uint8Array, chunks:Uint8Array[]}}
* @param {Chunker.Buffer} input
* @param {number} byteOffset
* @param {boolean} end
* @returns {{buffer:Chunker.Buffer, chunks:Chunker.Buffer[]}}
*/

export const split = (input, chunker) => {
export const split = (input, chunker, byteOffset, end) => {
let buffer = input
/** @type {Uint8Array[]} */
/** @type {Chunker.Buffer[]} */
const chunks = []
const sizes = chunker.cut(chunker.context, buffer)
const sizes =
chunker.type === "Stateful"
? chunker.cut(chunker.context, buffer.subarray(byteOffset), end)
: chunker.cut(chunker.context, buffer, end)

let offset = 0
for (const size of sizes) {
const chunk = buffer.subarray(offset, offset + size)
chunks.push(chunk)
offset += size
// We may be splitting empty buffer in which case there will be no chunks
// in it so we make sure that we do not emit empty buffer.
if (size > 0) {
const chunk = buffer.subarray(offset, offset + size)
chunks.push(chunk)
offset += size
}
}
buffer = buffer.subarray(offset)
buffer =
offset === buffer.byteLength
? BufferQueue.empty()
: offset === 0
? buffer
: buffer.subarray(offset)

return { buffer, chunks }
}
Expand Down
28 changes: 19 additions & 9 deletions src/file/chunker/api.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
export interface Buffer {
readonly length: number
readonly byteLength: number
slice(start?: number, end?: number): Buffer
subarray(start?: number, end?: number): Buffer
push(bytes: Uint8Array): Buffer
get(offset: number): number | undefined
copyTo(target: Uint8Array, offset: number): Uint8Array
}

/**
* Chunker API can be used to slice up the file content according
* to specific logic. It is designed with following properties in mind:
*
*
* 1. **Stateless** - Chunker does not retain any state across the calls. This
* implies that calling `cut` function on the same bytes would produce same
* array of sizes. Do note however, that when chunker is used to chunk stream
Expand Down Expand Up @@ -31,9 +41,10 @@ export interface ChunkerAPI<T> {
* buffer contains no valid chunks.
*
* **Note:** Consumer of the chunker is responsible for dealing with remaining
* bytes in the buffer when end of the stream is reached.
* bytes in the buffer, that is unless `end` is true signalling chunker that
* end of the stream is reached.
*/
cut(context:T, buffer:Uint8Array):number[]
cut(context: T, buffer: Buffer, end?: boolean): Iterable<number>
}

/**
Expand All @@ -43,13 +54,12 @@ export interface ChunkerAPI<T> {
* previously seen bytes.
*/
export interface StatefulChunker<T> extends ChunkerAPI<T> {
type: 'Stateful'
type: "Stateful"
}

export interface StatelessChunker<T extends Readonly<unknown>> extends ChunkerAPI<T> {
type: 'Stateless'
export interface StatelessChunker<T extends Readonly<unknown>>
extends ChunkerAPI<T> {
type: "Stateless"
}

export type Chunker<T=any> =
| StatefulChunker<T>
| StatelessChunker<T>
export type Chunker<T = any> = StatefulChunker<T> | StatelessChunker<T>
Loading

0 comments on commit dcbe064

Please sign in to comment.