From 3be5b429d5869b13aff13019af4fa316ca12d375 Mon Sep 17 00:00:00 2001 From: Irakli Gozalishvili Date: Thu, 10 Mar 2022 19:45:36 -0800 Subject: [PATCH] fix: id allocator --- package.json | 3 +- src/file/layout/trickle.js | 199 +++++++++++++++++++------------------ src/file/writer.js | 1 + test/file.spec.js | 48 ++++++++- test/util.js | 74 ++++++++++++-- 5 files changed, 216 insertions(+), 109 deletions(-) diff --git a/package.json b/package.json index 0d5f974..3771084 100644 --- a/package.json +++ b/package.json @@ -57,7 +57,8 @@ "tsv": "^0.2.0", "@types/tsv": "^0.2.1", "fzstd": "^0.0.4", - "@web-std/fetch": "^4.0.0" + "@web-std/fetch": "^4.0.0", + "@ipld/car": "^4.0.0" }, "license": "(Apache-2.0 AND MIT)" } diff --git a/src/file/layout/trickle.js b/src/file/layout/trickle.js index e4e18a7..a6f5907 100644 --- a/src/file/layout/trickle.js +++ b/src/file/layout/trickle.js @@ -46,7 +46,7 @@ const EMTPY_LEAF_ID = 0 * options: Options * leafCount: number * levelCutoffs: number[] - * tail: TrickleNode|null + * tail: TrickleNode * lastID: number * }} Trickle * @@ -56,8 +56,19 @@ const EMTPY_LEAF_ID = 0 export const open = options => ({ options, leafCount: 0, - levelCutoffs: [], - tail: null, + levelCutoffs: [options.maxDirectLeaves], + tail: new TrickleNode({ + id: EMTPY_LEAF_ID + 2, + depth: 0, + directLeaves: [], + // this is a synthetic parent to hold the final-most-est digest CID + parent: new TrickleNode({ + id: EMTPY_LEAF_ID + 1, + depth: -1, + directLeaves: [], + parent: null, + }), + }), lastID: EMTPY_LEAF_ID, }) @@ -88,112 +99,88 @@ export const write = (state, leaves) => { * @returns {Layout.WriteResult} */ export const addLeaf = ({ nodes, leaves, layout }, leaf) => { - const { options, leafCount } = layout - let { lastID } = layout - if (layout.tail == null) { - // 1) We are just starting: fill in a new tail node with a synthetic parent, - // and other inits - const levelCutoffs = [options.maxDirectLeaves] - const tail = new TrickleNode({ - id: ++lastID, - depth: 0, - directLeaves: [leaf], - // this is a synthetic parent to hold the final-most-est digest CID - parent: new TrickleNode({ - id: ++lastID, - depth: -1, - directLeaves: [], - parent: null, - }), - }) + // we are not yet at a node boundary just add a leaf to a tail + if (layout.leafCount % layout.options.maxDirectLeaves !== 0) { + return { nodes, leaves, layout: pushLeaf(layout, leaf) } + } + // if we got that far we are going to experience a node change + // let's find out where the puck would go next + else { + const { depth, levelCutoffs } = findNextLeafTarget(layout) + + // either backtrack "up the tree" or just reiterate current step, pushing + // the sibling into the parent's "direct leaves" + const result = + layout.tail.depth >= depth + ? sealToLevel( + { layout: { ...layout, levelCutoffs }, nodes, leaves }, + depth + ) + : { layout: { ...layout, levelCutoffs }, nodes, leaves } + + let { lastID } = result.layout + + // now descend one step down for the final already-containing-a-leaf node return { + ...result, layout: { - ...layout, - levelCutoffs, - tail, - leafCount: leafCount + 1, + ...result.layout, + tail: new TrickleNode({ + id: ++lastID, + depth: depth, + directLeaves: [leaf], + parent: result.layout.tail, + }), + leafCount: result.layout.leafCount + 1, lastID, }, - nodes, - leaves, } + } +} - // 2) we are not yet at a node boundary - } else if (layout.leafCount % layout.options.maxDirectLeaves !== 0) { - const tail = new TrickleNode({ - ...layout.tail, - directLeaves: [...layout.tail.directLeaves, leaf], - }) +/** + * @param {Trickle} layout + */ +const findNextLeafTarget = ({ levelCutoffs, options, leafCount }) => { + // we have enough members to trigger the next descent-level-group: + // calculate and cache its size + if (leafCount === levelCutoffs[levelCutoffs.length - 1]) { + const cutoff = + options.maxDirectLeaves * + Math.pow(options.maxSiblingSubgroups + 1, levelCutoffs.length) return { - layout: { - ...layout, - tail, - leafCount: leafCount + 1, - lastID, - }, - nodes, - leaves, + depth: 1, + levelCutoffs: [...levelCutoffs, cutoff], } - - // if we got that far we are going to experience a node change - // let's find out where the puck would go next - } else { - let nextNodeDepth = 0 - let levelCutoffs = layout.levelCutoffs - // we have enough members to trigger the next descent-level-group: - // calculate and cache its size - if (layout.leafCount === levelCutoffs[levelCutoffs.length - 1]) { - levelCutoffs = [ - ...levelCutoffs, - options.maxDirectLeaves * - Math.pow(options.maxSiblingSubgroups + 1, layout.levelCutoffs.length), - ] - - nextNodeDepth = 1 - - // otherwise just find where we'd land - } else { - let remainingLeaves = layout.leafCount - let level = levelCutoffs.length - 1 - while (level >= 0) { - if (remainingLeaves >= levelCutoffs[level]) { - nextNodeDepth++ - } - remainingLeaves %= levelCutoffs[level] - level-- + } + // otherwise just find where we'd land + else { + let depth = 0 + let remainingLeaves = leafCount + let level = levelCutoffs.length - 1 + while (level >= 0) { + if (remainingLeaves >= levelCutoffs[level]) { + depth++ } + remainingLeaves %= levelCutoffs[level] + level-- } - // either backtrack "up the tree" - // or just reiterate current step, pushing the sibling into the parent's - // "direct leaves" - const next = - layout.tail.depth >= nextNodeDepth - ? sealToLevel( - { layout: { ...layout, levelCutoffs }, nodes, leaves }, - nextNodeDepth - ) - : { layout, nodes, leaves } - - // now descend one step down for the final already-containing-a-leaf node - const tail = new TrickleNode({ - id: ++lastID, - depth: nextNodeDepth, - directLeaves: [leaf], - parent: next.layout.tail, - }) + return { depth, levelCutoffs } + } +} +/** + * @param {Trickle} layout + * @param {Layout.NodeID} leaf + * @returns {Trickle} + */ - return { - ...next, - layout: { - ...next.layout, - tail, - levelCutoffs, - leafCount: leafCount + 1, - lastID, - }, - } +const pushLeaf = (layout, leaf) => { + return { + ...layout, + tail: layout.tail.append(leaf), + leafCount: layout.leafCount + 1, } } @@ -206,7 +193,10 @@ const sealToLevel = ({ nodes: input, leaves, layout }, depth) => { let { lastID } = layout const nodes = [...input] - let tail = new TrickleNode(/** @type {TrickleNode} */ (layout.tail)) + let tail = new TrickleNode({ + .../** @type {TrickleNode} */ (layout.tail), + id: ++lastID, + }) while (tail.depth >= depth) { const parent = /** @type {TrickleNode} */ (tail.parent) @@ -218,11 +208,12 @@ const sealToLevel = ({ nodes: input, leaves, layout }, depth) => { tail = new TrickleNode({ ...parent, + id: ++lastID, directLeaves: [...parent.directLeaves, node.id], }) } - return { layout: { ...layout, tail }, nodes, leaves } + return { layout: { ...layout, lastID, tail }, nodes, leaves } } /** @@ -286,4 +277,16 @@ class TrickleNode { get children() { return this.directLeaves } + + /** + * + * @param {Layout.NodeID} leaf + */ + + append(leaf) { + return new TrickleNode({ + ...this, + directLeaves: [...this.directLeaves, leaf], + }) + } } diff --git a/src/file/writer.js b/src/file/writer.js index 23dba77..44ca44e 100644 --- a/src/file/writer.js +++ b/src/file/writer.js @@ -227,6 +227,7 @@ export const close = state => { state.layout, chunks ) + const { root, nodes: rest } = state.config.fileLayout.close( layout, state.metadata diff --git a/test/file.spec.js b/test/file.spec.js index f49e117..b073c78 100644 --- a/test/file.spec.js +++ b/test/file.spec.js @@ -1,8 +1,15 @@ /* eslint-env mocha */ -import { expect, assert } from "chai" -import { encodeUTF8, File, CID, hashrecur, collect } from "./util.js" -import * as unixfs from "../src/lib.js" +import { assert } from "chai" +import { + encodeUTF8, + CID, + hashrecur, + collect, + writeFile, + encodeCar, +} from "./util.js" +import * as UnixFS from "../src/lib.js" import * as FileImporter from "../src/file.js" import * as Trickle from "../src/file/layout/trickle.js" import * as Balanced from "../src/file/layout/balanced.js" @@ -10,7 +17,6 @@ import * as FixedSize from "../src/file/chunker/fixed.js" import * as Rabin from "../src/file/chunker/rabin.new.js" import * as API from "../src/file/api.js" import * as RawLeaf from "multiformats/codecs/raw" -import * as UnixFS from "../src/unixfs.js" import { sha256 } from "multiformats/hashes/sha2" const CHUNK_SIZE = 262144 @@ -215,4 +221,38 @@ describe("test file", () => { dagByteLength: 524738, }) }) + + it("trickle with several levels deep", async function () { + this.timeout(30000) + const chunkSize = 128 + const maxLeaves = 4 + const leafCount = 42 + + const content = hashrecur({ byteLength: chunkSize * leafCount }) + + const { writer, ...importer } = FileImporter.createImporter( + {}, + FileImporter.configure({ + chunker: FixedSize.withMaxChunkSize(chunkSize), + fileLayout: Trickle.configure({ maxDirectLeaves: maxLeaves }), + fileChunkEncoder: FileImporter.UnixFSRawLeaf, + }) + ) + + const collector = collect(importer.blocks) + + for await (const slice of content) { + writer.write(slice) + } + const link = await writer.close() + const blocks = await collector + + assert.deepEqual(link, { + cid: CID.parse( + "bafybeieyaff3xepdv5r56bnhgxbxpjy6pzvxqpc6abjtkk4f46ylwop5ga" + ), + contentByteLength: chunkSize * leafCount, + dagByteLength: 8411, + }) + }) }) diff --git a/test/util.js b/test/util.js index 28abf30..90ec911 100644 --- a/test/util.js +++ b/test/util.js @@ -2,6 +2,7 @@ import { File } from "@web-std/file" import { CID } from "multiformats" import fetch from "@web-std/fetch" import { sha256 } from "multiformats/hashes/sha2" +import { CarWriter } from "@ipld/car" const utf8Encoder = new TextEncoder() @@ -39,19 +40,80 @@ export async function* hashrecur({ } /** - * @param {ReadableStream} blockQueue + * @typedef {import('../src/file/api').Block} Block + * + * @param {ReadableStream} blockQueue + * @param {Block[]} [blocks] + */ +export const collect = async (blockQueue, blocks = []) => { + for await (const block of iterate(blockQueue)) { + blocks.push(block) + } + return blocks +} + +/** + * @template T + * @param {ReadableStream} stream */ -export const collect = async blockQueue => { - const blocks = [] - const reader = blockQueue.getReader() +export const iterate = async function* (stream) { + const reader = stream.getReader() while (true) { const next = await reader.read() if (next.done) { - return blocks + return } else { - blocks.push(next.value) + yield next.value } } } +/** + * @template T, O + * @param {AsyncIterable} source + * @param {{write(value:T): unknown, close():O|Promise}} writer + * @returns {Promise} + */ +const pipe = async (source, writer) => { + for await (const item of source) { + writer.write(item) + } + return await writer.close() +} + +/** + * @param {ReadableStream} blocks + */ +export const encodeCar = blocks => { + const { writer, out } = CarWriter.create([CID.parse("bafkqaaa")]) + pipe(iterate(blocks), { + write: block => + writer.put({ + cid: /** @type {CID} */ (block.cid), + bytes: block.bytes, + }), + close: () => writer.close(), + }) + + return out +} + +/** + * @param {string|URL} target + * @param {AsyncIterable} content + * @returns {Promise} + */ +export const writeFile = async (target, content) => { + const path = "fs" + const { createWriteStream } = await import("fs") + const file = createWriteStream(target) + for await (const chunk of content) { + file.write(chunk) + } + + return await new Promise((resolve, reject) => + file.close(error => (error ? reject(error) : resolve(undefined))) + ) +} + export { File, CID, fetch }