From 4d177d825f7bc241e0906a1b2890cad93f22d8a6 Mon Sep 17 00:00:00 2001 From: Trevor Manz Date: Sun, 14 Jan 2024 13:54:22 -0500 Subject: [PATCH] feat: add `withConsolidated` store helper (#119) --- .changeset/popular-glasses-nail.md | 28 ++++ package.json | 3 +- packages/core/__tests__/consolidated.test.ts | 26 ++- packages/core/src/consolidated.ts | 164 +++++++++---------- packages/core/src/errors.ts | 8 +- packages/core/src/index.ts | 1 - packages/core/src/open.ts | 25 ++- packages/indexing/src/get.ts | 36 ++-- packages/indexing/src/set.ts | 12 +- packages/zarrita/index.test.ts | 1 - 10 files changed, 163 insertions(+), 141 deletions(-) create mode 100644 .changeset/popular-glasses-nail.md diff --git a/.changeset/popular-glasses-nail.md b/.changeset/popular-glasses-nail.md new file mode 100644 index 00000000..7af63918 --- /dev/null +++ b/.changeset/popular-glasses-nail.md @@ -0,0 +1,28 @@ +--- +"@zarrita/indexing": minor +"zarrita": minor +"@zarrita/core": minor +--- + +feat: Add `withConsolidated` store utility + +**BREAKING**: Replaces [`openConsolidated`](https://github.com/manzt/zarrita.js/pull/91) +to provide a consistent interface for accessing consolidated and non-consolidated stores. + +```javascript +import * as zarr from "zarrita"; + +// non-consolidated +let store = new zarr.FetchStore("https://localhost:8080/data.zarr"); +let grp = await zarr.open(store); // network request for .zgroup/.zattrs +let foo = await zarr.open(grp.resolve("/foo"), { kind: array }); // network request for .zarray/.zattrs + +// consolidated +let store = new zarr.FetchStore("https://localhost:8080/data.zarr"); +let consolidatedStore = await zarr.withConsolidated(store); // opens ./zmetadata +let contents = consolidatedStore.contents(); // [ {path: "/", kind: "group" }, { path: "/foo", kind: "array" }, ...] +let grp = await zarr.open(consolidatedStore); // no network request +let foo = await zarr.open(grp.resolve(contents[1].path), { + kind: contents[1].kind, +}); // no network request +``` diff --git a/package.json b/package.json index c9127fed..17cb27e0 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,8 @@ "test": "vitest --api", "fmt": "dprint fmt", "lint": "dprint check", - "publint": "pnpm --recursive --filter=\"./packages/**\" exec publint" + "publint": "pnpm --recursive --filter=\"./packages/**\" exec publint", + "errors": "vim -c \"copen\" -c \"cexpr system('npx build')\" -c \"wincmd p\"" }, "devDependencies": { "@changesets/cli": "^2.27.1", diff --git a/packages/core/__tests__/consolidated.test.ts b/packages/core/__tests__/consolidated.test.ts index 6f41ef86..e3831333 100644 --- a/packages/core/__tests__/consolidated.test.ts +++ b/packages/core/__tests__/consolidated.test.ts @@ -3,16 +3,18 @@ import * as path from "node:path"; import * as url from "node:url"; import { FileSystemStore } from "@zarrita/storage"; -import { openConsolidated } from "../src/consolidated.js"; +import { withConsolidated } from "../src/consolidated.js"; +import { open } from "../src/open.js"; +import { Array as ZarrArray } from "../src/hierarchy.js"; let __dirname = path.dirname(url.fileURLToPath(import.meta.url)); -describe("openConsolidated", () => { +describe("withConsolidated", () => { it("loads consolidated metadata", async () => { let root = path.join(__dirname, "../../../fixtures/v2/data.zarr"); - let h = await openConsolidated(new FileSystemStore(root)); + let store = await withConsolidated(new FileSystemStore(root)); let map = new Map( - [...h.contents.values()].map((entry) => [entry.path, entry.kind]), + store.contents().map((x) => [x.path, x.kind]), ); expect(map).toMatchInlineSnapshot(` Map { @@ -49,8 +51,14 @@ describe("openConsolidated", () => { it("loads chunk data from underlying store", async () => { let root = path.join(__dirname, "../../../fixtures/v2/data.zarr"); - let h = await openConsolidated(new FileSystemStore(root)); - let arr = h.open("/3d.chunked.mixed.i2.C", { kind: "array" }); + let store = await withConsolidated(new FileSystemStore(root)); + let entry = store.contents().find((x) => + x.path === "/3d.chunked.mixed.i2.C" + )!; + let grp = await open(store, { kind: "group" }); + let arr = await open(grp.resolve(entry.path), { kind: entry.kind }); + expect(arr).toBeInstanceOf(ZarrArray); + // @ts-expect-error - we know this is an array expect(await arr.getChunk([0, 0, 0])).toMatchInlineSnapshot(` { "data": Int16Array [ @@ -80,10 +88,10 @@ describe("openConsolidated", () => { it("loads and navigates from root", async () => { let path_root = path.join(__dirname, "../../../fixtures/v2/data.zarr"); - let h = await openConsolidated(new FileSystemStore(path_root)); - let grp = h.root(); + let store = await withConsolidated(new FileSystemStore(path_root)); + let grp = await open(store, { kind: "group" }); expect(grp.kind).toBe("group"); - let arr = h.open(grp.resolve("1d.chunked.i2"), { kind: "array" }); + let arr = await open(grp.resolve("1d.chunked.i2"), { kind: "array" }); expect(arr.kind).toBe("array"); }); }); diff --git a/packages/core/src/consolidated.ts b/packages/core/src/consolidated.ts index e0981d68..a5f039e9 100644 --- a/packages/core/src/consolidated.ts +++ b/packages/core/src/consolidated.ts @@ -1,24 +1,33 @@ -import type { AbsolutePath, Readable } from "@zarrita/storage"; - -import { Array, Group, Location } from "./hierarchy.js"; -import { - json_decode_object, - v2_to_v3_array_metadata, - v2_to_v3_group_metadata, -} from "./util.js"; -import type { ArrayMetadataV2, DataType, GroupMetadataV2 } from "./metadata.js"; -import { NodeNotFoundError } from "./errors.js"; +import { type AbsolutePath, type Readable } from "@zarrita/storage"; +import { json_decode_object, json_encode_object } from "./util.js"; +import { KeyError, NodeNotFoundError } from "./errors.js"; +import type { + ArrayMetadata, + ArrayMetadataV2, + Attributes, + GroupMetadata, + GroupMetadataV2, +} from "./metadata.js"; type ConsolidatedMetadata = { - metadata: Record; + metadata: Record; zarr_consolidated_format: 1; }; +type Listable = { + get: Store["get"]; + contents(): { path: AbsolutePath; kind: "array" | "group" }[]; +}; + async function get_consolidated_metadata( store: Readable, ): Promise { let bytes = await store.get("/.zmetadata"); - if (!bytes) throw new Error("No consolidated metadata found."); + if (!bytes) { + throw new NodeNotFoundError("v2 consolidated metadata", { + cause: new KeyError("/.zmetadata"), + }); + } let meta: ConsolidatedMetadata = json_decode_object(bytes); if (meta.zarr_consolidated_format !== 1) { throw new Error("Unsupported consolidated format."); @@ -26,77 +35,68 @@ async function get_consolidated_metadata( return meta; } -/** Proxies requests to the underlying store. */ -export async function openConsolidated( +type Metadata = + | ArrayMetadataV2 + | GroupMetadataV2 + | ArrayMetadata + | GroupMetadata + | Attributes; + +function is_meta_key(key: string): boolean { + return ( + key.endsWith(".zarray") || + key.endsWith(".zgroup") || + key.endsWith(".zattrs") || + key.endsWith("zarr.json") + ); +} + +function is_v3(meta: Metadata): meta is ArrayMetadata | GroupMetadata { + return "zarr_format" in meta && meta.zarr_format === 3; +} + +export async function withConsolidated( store: Store, -) { - let { metadata } = await get_consolidated_metadata(store); - let meta_nodes = Object - .entries(metadata) - .reduce( - (acc, [path, content]) => { - let parts = path.split("/"); - let file_name = parts.pop()!; - let key: AbsolutePath = `/${parts.join("/")}`; - if (!acc[key]) acc[key] = {}; - if (file_name === ".zarray") { - acc[key].meta = content; - } else if (file_name === ".zgroup") { - acc[key].meta = content; - } else if (file_name === ".zattrs") { - acc[key].attrs = content; - } - return acc; - }, - {} as Record< - AbsolutePath, - { - meta?: ArrayMetadataV2 | GroupMetadataV2; - attrs?: Record; +): Promise> { + let known_meta: Record = + await get_consolidated_metadata(store) + .then((meta) => { + let new_meta: Record = {}; + for (let [key, value] of Object.entries(meta.metadata)) { + new_meta[`/${key}`] = value; } - >, - ); - let nodes = new Map | Group>(); - for (let [path, { meta, attrs }] of Object.entries(meta_nodes)) { - if (!meta) throw new Error("missing metadata"); - let node: Array | Group; - if ("shape" in meta) { - let metadata = v2_to_v3_array_metadata(meta, attrs); - node = new Array(store, path as AbsolutePath, metadata); - } else { - let metadata = v2_to_v3_group_metadata(meta, attrs); - node = new Group(store, path as AbsolutePath, metadata); - } - nodes.set(path as AbsolutePath, node); - } - return new ConsolidatedHierarchy(nodes); -} + return new_meta; + }) + .catch(() => ({})); -class ConsolidatedHierarchy { - constructor( - public contents: Map | Group>, - ) {} - open( - where: AbsolutePath | Location, - options: { kind: "group" }, - ): Group; - open( - where: AbsolutePath | Location, - options: { kind: "array" }, - ): Array; - open( - where: AbsolutePath | Location, - ): Array | Group; - open( - where: AbsolutePath | Location, - options: { kind?: "array" | "group" } = {}, - ) { - let path = typeof where === "string" ? where : where.path; - let node = this.contents.get(path); - if (node && (!options.kind || options.kind == node.kind)) return node; - throw new NodeNotFoundError(path); - } - root() { - return this.open("/", { kind: "group" }); - } + return { + async get( + ...args: Parameters + ): Promise { + let [key, opts] = args; + if (known_meta[key]) { + return json_encode_object(known_meta[key]); + } + let maybe_bytes = await store.get(key, opts); + if (is_meta_key(key) && maybe_bytes) { + let meta = json_decode_object(maybe_bytes); + known_meta[key] = meta; + } + return maybe_bytes; + }, + contents(): { path: AbsolutePath; kind: "array" | "group" }[] { + let contents: { path: AbsolutePath; kind: "array" | "group" }[] = []; + for (let [key, value] of Object.entries(known_meta)) { + let parts = key.split("/"); + let filename = parts.pop()!; + let path = (parts.join("/") || "/") as AbsolutePath; + if (filename === ".zarray") contents.push({ path, kind: "array" }); + if (filename === ".zgroup") contents.push({ path, kind: "group" }); + if (is_v3(value)) { + contents.push({ path, kind: value.node_type }); + } + } + return contents; + }, + }; } diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index b994054e..51af94b9 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -1,13 +1,13 @@ export class NodeNotFoundError extends Error { - constructor(msg: string) { - super(msg); + constructor(context: string, options: { cause?: Error } = {}) { + super(`Node not found: ${context}`, options); this.name = "NodeNotFoundError"; } } export class KeyError extends Error { - constructor(msg: string) { - super(msg); + constructor(path: string) { + super(`Missing key: ${path}`); this.name = "KeyError"; } } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 83f1304a..f8d7e5d9 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -9,5 +9,4 @@ export { export { open } from "./open.js"; export { create } from "./create.js"; export { registry } from "./codecs.js"; -export { openConsolidated } from "./consolidated.js"; export type * from "./metadata.js"; diff --git a/packages/core/src/open.ts b/packages/core/src/open.ts index 0cb2b538..e00d6040 100644 --- a/packages/core/src/open.ts +++ b/packages/core/src/open.ts @@ -6,13 +6,14 @@ import type { GroupMetadata, } from "./metadata.js"; import { Array, Group, Location } from "./hierarchy.js"; -import { NodeNotFoundError } from "./errors.js"; +import { KeyError, NodeNotFoundError } from "./errors.js"; import { json_decode_object, v2_to_v3_array_metadata, v2_to_v3_group_metadata, } from "./util.js"; +let VERSION_COUNTER = create_version_counter(); function create_version_counter() { let version_counts = new WeakMap(); function get_counts(store: Readable) { @@ -30,7 +31,6 @@ function create_version_counter() { }, }; } -let VERSION_COUNTER = create_version_counter(); async function load_attrs( location: Location, @@ -77,7 +77,9 @@ async function open_array_v2( let { path } = location.resolve(".zarray"); let meta = await location.store.get(path); if (!meta) { - throw new NodeNotFoundError(path); + throw new NodeNotFoundError("v2 array", { + cause: new KeyError(path), + }); } VERSION_COUNTER.increment(location.store, "v2"); return new Array( @@ -94,7 +96,9 @@ async function open_group_v2( let { path } = location.resolve(".zgroup"); let meta = await location.store.get(path); if (!meta) { - throw new NodeNotFoundError(path); + throw new NodeNotFoundError("v2 group", { + cause: new KeyError(path), + }); } VERSION_COUNTER.increment(location.store, "v2"); return new Group( @@ -110,7 +114,9 @@ async function _open_v3( let { store, path } = location.resolve("zarr.json"); let meta = await location.store.get(path); if (!meta) { - throw new NodeNotFoundError(path); + throw new NodeNotFoundError("v3 array or group", { + cause: new KeyError(path), + }); } let meta_doc: ArrayMetadata | GroupMetadata = json_decode_object( meta, @@ -169,6 +175,11 @@ export function open( options: { kind: "array" }, ): Promise>; +export async function open( + location: Location | Store, + options: { kind?: "array" | "group" }, +): Promise | Group>; + export function open( location: Location | Store, ): Promise | Group>; @@ -181,8 +192,8 @@ export async function open( location: Location | Store, options: { kind?: "array" | "group" } = {}, ): Promise | Group> { - const store = "store" in location ? location.store : location; - const version_max = VERSION_COUNTER.version_max(store); + let store = "store" in location ? location.store : location; + let version_max = VERSION_COUNTER.version_max(store); // Use the open function for the version with the most successful opens. // Note that here we use the dot syntax to access the open functions // because this enables us to use vi.spyOn during testing. diff --git a/packages/indexing/src/get.ts b/packages/indexing/src/get.ts index e8a8fec5..33d7a9e5 100644 --- a/packages/indexing/src/get.ts +++ b/packages/indexing/src/get.ts @@ -8,7 +8,7 @@ import type { Slice, } from "./types.js"; -import { _internal_get_array_context, KeyError } from "@zarrita/core"; +import { _internal_get_array_context } from "@zarrita/core"; import { BasicIndexer } from "./indexer.js"; import { create_queue } from "./util.js"; @@ -36,41 +36,25 @@ export async function get< ): Promise< null extends Sel[number] ? Arr : Slice extends Sel[number] ? Arr : Scalar > { - const context = _internal_get_array_context(arr); - const indexer = new BasicIndexer({ + let context = _internal_get_array_context(arr); + let indexer = new BasicIndexer({ selection, shape: arr.shape, chunk_shape: arr.chunks, }); - const out = setter.prepare( + let out = setter.prepare( new context.TypedArray(indexer.shape.reduce((a, b) => a * b, 1)), indexer.shape, context.get_strides(indexer.shape, opts.order), ); - const queue = opts.create_queue?.() ?? create_queue(); + let queue = opts.create_queue?.() ?? create_queue(); for (const { chunk_coords, mapping } of indexer) { - queue.add(() => - arr.getChunk(chunk_coords, opts.opts) - .then(({ data, shape, stride }) => { - const chunk = setter.prepare(data, shape, stride); - setter.set_from_chunk(out, chunk, mapping); - }) - .catch((err) => { - // re-throw error if not a missing chunk - if (!(err instanceof KeyError)) throw err; - // KeyError, we need to fill the corresponding array - if (context.fill_value) { - setter.set_scalar( - out, - mapping - .map((m) => m.to) - .filter((s): s is Exclude => s !== null), - context.fill_value, - ); - } - }) - ); + queue.add(async () => { + let { data, shape, stride } = await arr.getChunk(chunk_coords, opts.opts); + let chunk = setter.prepare(data, shape, stride); + setter.set_from_chunk(out, chunk, mapping); + }); } await queue.onIdle(); diff --git a/packages/indexing/src/set.ts b/packages/indexing/src/set.ts index 6d2b5da1..318e547e 100644 --- a/packages/indexing/src/set.ts +++ b/packages/indexing/src/set.ts @@ -1,4 +1,4 @@ -import { _internal_get_array_context, KeyError } from "@zarrita/core"; +import { _internal_get_array_context } from "@zarrita/core"; import type { Mutable } from "@zarrita/storage"; import type { Array, Chunk, DataType, Scalar, TypedArray } from "@zarrita/core"; @@ -78,15 +78,7 @@ export async function set>( } } else { // partially replace the contents of this chunk - chunk_data = await arr.getChunk(chunk_coords) - .then(({ data }) => data) - .catch((err) => { - if (!(err instanceof KeyError)) throw err; - const empty = new context.TypedArray(chunk_size); - // @ts-expect-error - if (arr.fill_value) empty.fill(arr.fill_value); - return empty; - }); + chunk_data = await arr.getChunk(chunk_coords).then(({ data }) => data); const chunk = setter.prepare( chunk_data, diff --git a/packages/zarrita/index.test.ts b/packages/zarrita/index.test.ts index 5d52d544..677c3b6b 100644 --- a/packages/zarrita/index.test.ts +++ b/packages/zarrita/index.test.ts @@ -15,7 +15,6 @@ it("exports all the things", () => { "create": [Function], "get": [Function], "open": [Function], - "openConsolidated": [Function], "registry": Map { "blosc" => [Function], "gzip" => [Function],