diff --git a/nats-base-client/databuffer.ts b/nats-base-client/databuffer.ts index 5734d588..a94851bf 100644 --- a/nats-base-client/databuffer.ts +++ b/nats-base-client/databuffer.ts @@ -67,6 +67,17 @@ export class DataBuffer { } } + shift(): Uint8Array { + if (this.buffers.length) { + const a = this.buffers.shift(); + if (a) { + this.byteLength -= a.length; + return a; + } + } + return new Uint8Array(0); + } + drain(n?: number): Uint8Array { if (this.buffers.length) { this.pack(); diff --git a/nats-base-client/headers.ts b/nats-base-client/headers.ts index 44163e2d..feeaa834 100644 --- a/nats-base-client/headers.ts +++ b/nats-base-client/headers.ts @@ -284,4 +284,20 @@ export class MsgHdrsImpl implements MsgHdrs { get status(): string { return `${this.code} ${this.description}`.trim(); } + + toRecord(): Record { + const data = {} as Record; + this.keys().forEach((v) => { + data[v] = this.values(v); + }); + return data; + } + + static fromRecord(r: Record): MsgHdrs { + const h = new MsgHdrsImpl(); + for (const k in r) { + h.headers.set(k, r[k]); + } + return h; + } } diff --git a/nats-base-client/jsclient.ts b/nats-base-client/jsclient.ts index e2e43f42..6349a042 100644 --- a/nats-base-client/jsclient.ts +++ b/nats-base-client/jsclient.ts @@ -13,7 +13,14 @@ * limitations under the License. */ -import type { ConsumerOptsBuilder, KV, KvOptions, Views } from "./types.ts"; +import type { + ConsumerOptsBuilder, + KV, + KvOptions, + ObjectStore, + ObjectStoreOptions, + Views, +} from "./types.ts"; import { AckPolicy, ConsumerAPI, @@ -72,6 +79,7 @@ import { consumerOpts, isConsumerOptsBuilder } from "./jsconsumeropts.ts"; import { Bucket } from "./kv.ts"; import { NatsConnectionImpl } from "./nats.ts"; import { Feature } from "./semver.ts"; +import { ObjectStoreImpl } from "./objectstore.ts"; export interface JetStreamSubscriptionInfoable { info: JetStreamSubscriptionInfo | null; @@ -89,7 +97,6 @@ class ViewsImpl implements Views { js: JetStreamClientImpl; constructor(js: JetStreamClientImpl) { this.js = js; - jetstreamPreview(this.js.nc); } kv(name: string, opts: Partial = {}): Promise { if (opts.bindOnly) { @@ -97,6 +104,13 @@ class ViewsImpl implements Views { } return Bucket.create(this.js, name, opts); } + os( + name: string, + opts: Partial = {}, + ): Promise { + jetstreamPreview(this.js.nc); + return ObjectStoreImpl.create(this.js, name, opts); + } } export class JetStreamClientImpl extends BaseApiClient @@ -833,11 +847,11 @@ const jetstreamPreview = (() => { const { lang } = nci?.protocol?.transport; if (lang) { console.log( - `\u001B[33m >> jetstream's materialized views functionality in ${lang} is beta functionality \u001B[0m`, + `\u001B[33m >> jetstream's materialized views object store functionality in ${lang} is beta functionality \u001B[0m`, ); } else { console.log( - `\u001B[33m >> jetstream's materialized views functionality is beta functionality \u001B[0m`, + `\u001B[33m >> jetstream's materialized views object store functionality is beta functionality \u001B[0m`, ); } } diff --git a/nats-base-client/nuid.ts b/nats-base-client/nuid.ts index d62e9767..36570f90 100644 --- a/nats-base-client/nuid.ts +++ b/nats-base-client/nuid.ts @@ -24,28 +24,18 @@ const minInc = 33; const maxInc = 333; const totalLen = preLen + seqLen; -const cryptoObj = initCrypto(); - -function initCrypto() { - let cryptoObj = null; - if (typeof globalThis !== "undefined") { - if ( - "crypto" in globalThis && globalThis.crypto.getRandomValues !== undefined - ) { - cryptoObj = globalThis.crypto; - } +function _getRandomValues(a: Uint8Array) { + for (let i = 0; i < a.length; i++) { + a[i] = Math.floor(Math.random() * 255); } - if (!cryptoObj) { - // shim it - cryptoObj = { - getRandomValues: function (array: Uint8Array) { - for (let i = 0; i < array.length; i++) { - array[i] = Math.floor(Math.random() * 255); - } - }, - }; +} + +function fillRandom(a: Uint8Array) { + if (globalThis?.crypto?.getRandomValues) { + globalThis.crypto.getRandomValues(a); + } else { + _getRandomValues(a); } - return cryptoObj; } /** @@ -92,7 +82,7 @@ export class Nuid { */ private setPre() { const cbuf = new Uint8Array(preLen); - cryptoObj.getRandomValues(cbuf); + fillRandom(cbuf); for (let i = 0; i < preLen; i++) { const di = cbuf[i] % base; this.buf[i] = digits.charCodeAt(di); diff --git a/nats-base-client/objectstore.ts b/nats-base-client/objectstore.ts new file mode 100644 index 00000000..dd1e44c7 --- /dev/null +++ b/nats-base-client/objectstore.ts @@ -0,0 +1,729 @@ +/* + * Copyright 2022 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + DiscardPolicy, + JetStreamClient, + JetStreamManager, + JetStreamOptions, + JsHeaders, + JsMsg, + ObjectInfo, + ObjectResult, + ObjectStore, + ObjectStoreInfo, + ObjectStoreMeta, + ObjectStoreMetaOptions, + ObjectStoreOptions, + PubAck, + PurgeResponse, + StorageType, + StreamConfig, + StreamInfo, + StreamInfoRequestOptions, +} from "./types.ts"; +import { validateBucket, validateKey } from "./kv.ts"; +import { JSONCodec } from "./codec.ts"; +import { nuid } from "./nuid.ts"; +import { deferred } from "./util.ts"; +import { JetStreamClientImpl } from "./jsclient.ts"; +import { DataBuffer } from "./databuffer.ts"; +import { headers, MsgHdrs, MsgHdrsImpl } from "./headers.ts"; +import { consumerOpts } from "./jsconsumeropts.ts"; +import { NatsError } from "./mod.ts"; +import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts"; +import { SHA256 } from "./sha256.js"; + +export function objectStoreStreamName(bucket: string): string { + validateBucket(bucket); + return `OBJ_${bucket}`; +} + +export function objectStoreBucketName(stream: string): string { + if (stream.startsWith("OBJ_")) { + return stream.substring(4); + } + return stream; +} + +export class ObjectStoreInfoImpl implements ObjectStoreInfo { + si: StreamInfo; + backingStore: string; + + constructor(si: StreamInfo) { + this.si = si; + this.backingStore = "JetStream"; + } + get bucket(): string { + return objectStoreBucketName(this.si.config.name); + } + get description(): string { + return this.si.config.description ?? ""; + } + get ttl(): number { + return this.si.config.max_age; + } + get storage(): StorageType { + return this.si.config.storage; + } + get replicas(): number { + return this.si.config.num_replicas; + } + get sealed(): boolean { + return this.si.config.sealed; + } + get size(): number { + return this.si.state.bytes; + } + get streamInfo(): StreamInfo { + return this.si; + } +} + +type ServerObjectStoreMeta = { + name: string; + description?: string; + headers?: Record; + options?: ObjectStoreMetaOptions; +}; + +type ServerObjectInfo = { + bucket: string; + nuid: string; + size: number; + chunks: number; + digest: string; + deleted?: boolean; + mtime: string; +} & ServerObjectStoreMeta; + +class ObjectInfoImpl implements ObjectInfo { + info: ServerObjectInfo; + hdrs!: MsgHdrs; + constructor(oi: ServerObjectInfo) { + this.info = oi; + } + get name(): string { + return this.info.name; + } + get description(): string { + return this.info.description ?? ""; + } + get headers(): MsgHdrs { + if (!this.hdrs) { + this.hdrs = MsgHdrsImpl.fromRecord(this.info.headers || {}); + } + return this.hdrs; + } + get options(): ObjectStoreMetaOptions | undefined { + return this.info.options; + } + get bucket(): string { + return this.info.bucket; + } + get chunks(): number { + return this.info.chunks; + } + get deleted(): boolean { + return this.info.deleted ?? false; + } + get digest(): string { + return this.info.digest; + } + get mtime(): string { + return this.info.mtime; + } + get nuid(): string { + return this.info.nuid; + } + get size(): number { + return this.info.size; + } +} + +function toServerObjectStoreMeta( + meta: Partial, +): ServerObjectStoreMeta { + const v = { + name: meta.name, + description: meta.description ?? "", + options: meta.options, + } as ServerObjectStoreMeta; + + if (meta.headers) { + const mhi = meta.headers as MsgHdrsImpl; + v.headers = mhi.toRecord(); + } + return v; +} + +function meta(oi: ObjectInfo): ObjectStoreMeta { + return { + name: oi.name, + description: oi.description, + headers: oi.headers, + options: oi.options, + }; +} + +function emptyReadableStream(): ReadableStream { + return new ReadableStream({ + pull(c) { + c.enqueue(new Uint8Array(0)); + c.close(); + }, + }); +} + +export class ObjectStoreImpl implements ObjectStore { + jsm: JetStreamManager; + js: JetStreamClient; + stream!: string; + name: string; + + constructor(name: string, jsm: JetStreamManager, js: JetStreamClient) { + this.name = name; + this.jsm = jsm; + this.js = js; + } + + _sanitizeName(name: string): { name: string; error?: Error } { + if (!name || name.length === 0) { + return { name, error: new Error("name cannot be empty") }; + } + // cannot use replaceAll - node until node 16 is min + // name = name.replaceAll(".", "_"); + // name = name.replaceAll(" ", "_"); + name = name.replace(/[\. ]/g, "_"); + + let error = undefined; + try { + validateKey(name); + } catch (err) { + error = err; + } + return { name, error }; + } + + async info(name: string): Promise { + const info = await this.rawInfo(name); + return info ? new ObjectInfoImpl(info) : null; + } + + async list(): Promise { + const buf: ObjectInfo[] = []; + const iter = await this.watch({ + ignoreDeletes: true, + includeHistory: true, + }); + for await (const info of iter) { + // watch will give a null when it has initialized + // for us that is the hint we are done + if (info === null) { + break; + } + buf.push(info); + } + return Promise.resolve(buf); + } + + async rawInfo(name: string): Promise { + const { name: obj, error } = this._sanitizeName(name); + if (error) { + return Promise.reject(error); + } + + const meta = `$O.${this.name}.M.${obj}`; + try { + const m = await this.jsm.streams.getMessage(this.stream, { + last_by_subj: meta, + }); + const jc = JSONCodec(); + const info = jc.decode(m.data) as ServerObjectInfo; + return info; + } catch (err) { + if (err.code === "404") { + return null; + } + return Promise.reject(err); + } + } + + async seal(): Promise { + let info = await this.jsm.streams.info(this.stream); + if (info === null) { + return Promise.reject(new Error("object store not found")); + } + info.config.sealed = true; + info = await this.jsm.streams.update(this.stream, info.config); + return Promise.resolve(new ObjectStoreInfoImpl(info)); + } + + async status( + opts?: Partial, + ): Promise { + const info = await this.jsm.streams.info(this.stream, opts); + if (info === null) { + return Promise.reject(new Error("object store not found")); + } + return Promise.resolve(new ObjectStoreInfoImpl(info)); + } + + destroy(): Promise { + return this.jsm.streams.delete(this.stream); + } + + async put( + meta: ObjectStoreMeta, + rs: ReadableStream | null, + ): Promise { + const jsi = this.js as JetStreamClientImpl; + const maxPayload = jsi.nc.info?.max_payload || 1024; + meta = meta || {} as ObjectStoreMeta; + meta.options = meta.options || {}; + let maxChunk = meta.options?.max_chunk_size || 128 * 1024; + maxChunk = maxChunk > maxPayload ? maxPayload : maxChunk; + meta.options.max_chunk_size = maxChunk; + + const old = await this.info(meta.name); + const { name: n, error } = this._sanitizeName(meta.name); + if (error) { + return Promise.reject(error); + } + + const id = nuid.next(); + const chunkSubj = this._chunkSubject(id); + const metaSubj = this._metaSubject(n); + + const info = Object.assign({ + bucket: this.name, + nuid: id, + size: 0, + chunks: 0, + }, toServerObjectStoreMeta(meta)) as ServerObjectInfo; + + const d = deferred(); + + const proms: Promise[] = []; + const db = new DataBuffer(); + try { + const reader = rs ? rs.getReader() : null; + const sha = new SHA256(); + + while (true) { + const { done, value } = reader + ? await reader.read() + : { done: true, value: undefined }; + if (done) { + // put any partial chunk in + if (db.size() > 0) { + const payload = db.drain(); + sha.update(payload); + info.chunks!++; + info.size! += payload.length; + info.mtime = new Date().toISOString(); + const digest = sha.digest("base64"); + const pad = digest.length % 3; + const padding = pad > 0 ? "=".repeat(pad) : ""; + info.digest = `sha-256=${digest}${padding}`; + info.deleted = false; + proms.push(this.js.publish(chunkSubj, payload)); + } + // trailing md for the object + const h = headers(); + h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject); + proms.push( + this.js.publish(metaSubj, JSONCodec().encode(info), { headers: h }), + ); + + // if we had this object trim it out + if (old) { + proms.push( + this.jsm.streams.purge(this.stream, { + filter: `$O.${this.name}.C.${old.nuid}`, + }), + ); + } + // wait for all the sends to complete + await Promise.all(proms); + d.resolve(new ObjectInfoImpl(info!)); + // stop + break; + } + if (value) { + db.fill(value); + while (db.size() > maxChunk) { + info.chunks!++; + info.size! += maxChunk; + const payload = db.drain(meta.options.max_chunk_size); + sha.update(payload); + proms.push( + this.js.publish(chunkSubj, payload), + ); + } + } + } + } catch (err) { + // we failed, remove any partials + await this.jsm.streams.purge(this.stream, { filter: chunkSubj }); + d.reject(err); + } + + return d; + } + + async get(name: string): Promise { + const info = await this.rawInfo(name); + if (info === null) { + return Promise.resolve(null); + } + + if (info.options && info.options.link) { + const ln = info.options.link.name || ""; + if (ln === "") { + throw new Error("link is a bucket"); + } + const os = await ObjectStoreImpl.create( + this.js, + info.options.link.bucket, + ); + return os.get(ln); + } + + const d = deferred(); + + const r: Partial = { + info: new ObjectInfoImpl(info), + error: d, + }; + if (info.size === 0) { + r.data = emptyReadableStream(); + d.resolve(null); + return Promise.resolve(r as ObjectResult); + } + + let controller: ReadableStreamDefaultController; + + const oc = consumerOpts(); + oc.orderedConsumer(); + const sha = new SHA256(); + const subj = `$O.${this.name}.C.${info.nuid}`; + const sub = await this.js.subscribe(subj, oc); + (async () => { + for await (const jm of sub) { + if (jm.data.length > 0) { + sha.update(jm.data); + controller!.enqueue(jm.data); + } + if (jm.info.pending === 0) { + const hash = sha.digest("base64"); + // go pads the hash - which should be multiple of 3 - otherwise pads with '=' + const pad = hash.length % 3; + const padding = pad > 0 ? "=".repeat(pad) : ""; + const digest = `sha-256=${hash}${padding}`; + if (digest !== info.digest) { + controller!.error( + new Error( + `received a corrupt object, digests do not match received: ${info.digest} calculated ${digest}`, + ), + ); + } else { + controller!.close(); + } + sub.unsubscribe(); + } + } + })() + .then(() => { + d.resolve(); + }) + .catch((err) => { + controller!.error(err); + d.reject(err); + }); + + r.data = new ReadableStream({ + start(c) { + controller = c; + }, + cancel() { + sub.unsubscribe(); + }, + }); + + return r as ObjectResult; + } + + linkStore(name: string, bucket: ObjectStore): Promise { + if (!(bucket instanceof ObjectStoreImpl)) { + return Promise.reject("bucket required"); + } + const osi = bucket as ObjectStoreImpl; + const { name: n, error } = this._sanitizeName(name); + if (error) { + return Promise.reject(error); + } + + const meta = { + name: n, + options: { link: { bucket: osi.name } }, + }; + return this.put(meta, null); + } + + async link(name: string, info: ObjectInfo): Promise { + if (info.deleted) { + return Promise.reject(new Error("object is deleted")); + } + const { name: n, error } = this._sanitizeName(name); + if (error) { + return Promise.reject(error); + } + + // same object store + if (this.name === info.bucket) { + const copy = Object.assign({}, meta(info)) as ObjectStoreMeta; + copy.name = n; + try { + await this.update(info.name, copy); + const ii = await this.info(n); + return ii!; + } catch (err) { + return Promise.reject(err); + } + } + const link = { bucket: info.bucket, name: info.name }; + const mm = { + name: n, + options: { link: link }, + } as ObjectStoreMeta; + return this.put(mm, null); + } + + async delete(name: string): Promise { + const info = await this.rawInfo(name); + if (info === null) { + return Promise.resolve({ purged: 0, success: false }); + } + info.deleted = true; + info.size = 0; + info.chunks = 0; + info.digest = ""; + + const jc = JSONCodec(); + const h = headers(); + h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject); + + await this.js.publish(this._metaSubject(info.name), jc.encode(info), { + headers: h, + }); + return this.jsm.streams.purge(this.stream, { + filter: this._chunkSubject(info.nuid), + }); + } + + async update( + name: string, + meta: Partial = {}, + ): Promise { + const info = await this.rawInfo(name); + if (info === null) { + return Promise.reject(new Error("object not found")); + } + // FIXME: Go's implementation doesn't seem correct - it possibly adds a new meta entry + // effectively making the object available under 2 names, but it doesn't remove the + // older one. + meta.name = meta.name ?? info.name; + const { name: n, error } = this._sanitizeName(meta.name); + if (error) { + return Promise.reject(error); + } + meta.name = n; + const ii = Object.assign({}, info, toServerObjectStoreMeta(meta!)); + const jc = JSONCodec(); + + return this.js.publish(this._metaSubject(ii.name), jc.encode(ii)); + } + + async watch(opts: Partial< + { + ignoreDeletes?: boolean; + includeHistory?: boolean; + } + > = {}): Promise> { + opts.includeHistory = opts.includeHistory ?? false; + opts.ignoreDeletes = opts.ignoreDeletes ?? false; + let initialized = false; + const qi = new QueuedIteratorImpl(); + const subj = this._metaSubjectAll(); + try { + await this.jsm.streams.getMessage(this.stream, { last_by_subj: subj }); + } catch (err) { + if (err.code === "404") { + qi.push(null); + initialized = true; + } else { + qi.stop(err); + } + } + const jc = JSONCodec(); + const copts = consumerOpts(); + copts.orderedConsumer(); + if (opts.includeHistory) { + copts.deliverLastPerSubject(); + } else { + // FIXME: Go's implementation doesn't seem correct - if history is not desired + // the watch should only be giving notifications on new entries + initialized = true; + copts.deliverNew(); + } + copts.callback((err: NatsError | null, jm: JsMsg | null) => { + if (err) { + qi.stop(err); + return; + } + if (jm !== null) { + const oi = jc.decode(jm.data); + if (oi.deleted && opts.ignoreDeletes === true) { + // do nothing + } else { + qi.push(oi); + } + if (jm.info?.pending === 0 && !initialized) { + initialized = true; + qi.push(null); + } + } + }); + + const sub = await this.js.subscribe(subj, copts); + qi._data = sub; + qi.iterClosed.then(() => { + sub.unsubscribe(); + }); + sub.closed.then(() => { + qi.stop(); + }).catch((err) => { + qi.stop(err); + }); + + return qi; + } + + _chunkSubject(id: string) { + return `$O.${this.name}.C.${id}`; + } + + _metaSubject(n: string): string { + return `$O.${this.name}.M.${n}`; + } + + _metaSubjectAll(): string { + return `$O.${this.name}.M.>`; + } + + async init(opts: Partial = {}): Promise { + try { + this.stream = objectStoreStreamName(this.name); + } catch (err) { + return Promise.reject(err); + } + const sc = Object.assign({}, opts) as StreamConfig; + sc.name = this.stream; + sc.allow_rollup_hdrs = true; + sc.discard = DiscardPolicy.New; + sc.subjects = [`$O.${this.name}.C.>`, `$O.${this.name}.M.>`]; + if (opts.placement) { + sc.placement = opts.placement; + } + + try { + await this.jsm.streams.info(sc.name); + } catch (err) { + if (err.message === "stream not found") { + await this.jsm.streams.add(sc); + } + } + } + + static async create( + js: JetStreamClient, + name: string, + opts: Partial = {}, + ): Promise { + // we may not have support in the environment + if (typeof crypto?.subtle?.digest !== "function") { + return Promise.reject( + new Error( + "objectstore: unable to calculate hashes - crypto.subtle.digest with sha256 support is required", + ), + ); + } + + const jsi = js as JetStreamClientImpl; + let jsopts = jsi.opts || {} as JetStreamOptions; + const to = jsopts.timeout || 2000; + jsopts = Object.assign(jsopts, { timeout: to }); + const jsm = await jsi.nc.jetstreamManager(jsopts); + const os = new ObjectStoreImpl(name, jsm, js); + await os.init(opts); + return Promise.resolve(os); + } +} + +class Base64Codec { + static encode(bytes: string | Uint8Array): string { + if (typeof bytes === "string") { + return btoa(bytes); + } + const a = Array.from(bytes); + return btoa(String.fromCharCode(...a)); + } + + static decode(s: string, binary = false): Uint8Array | string { + const bin = atob(s); + if (!binary) { + return bin; + } + const bytes = new Uint8Array(bin.length); + for (let i = 0; i < bin.length; i++) { + bytes[i] = bin.charCodeAt(i); + } + return bytes; + } +} + +class Base64UrlCodec { + static encode(bytes: string | Uint8Array): string { + return Base64UrlCodec.toB64URLEncoding(Base64Codec.encode(bytes)); + } + + static decode(s: string, binary = false): Uint8Array | string { + return Base64Codec.decode(Base64UrlCodec.fromB64URLEncoding(s), binary); + } + + static toB64URLEncoding(b64str: string): string { + b64str = b64str.replace(/=/g, ""); + b64str = b64str.replace(/\+/g, "-"); + return b64str.replace(/\//g, "_"); + } + + static fromB64URLEncoding(b64str: string): string { + // pads are % 4, but not necessary on decoding + b64str = b64str.replace(/_/g, "/"); + b64str = b64str.replace(/-/g, "+"); + return b64str; + } +} diff --git a/nats-base-client/sha256.js b/nats-base-client/sha256.js new file mode 100644 index 00000000..e862c05b --- /dev/null +++ b/nats-base-client/sha256.js @@ -0,0 +1,360 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file +// This code was bundled using `deno bundle` and it's not recommended to edit it manually + +// deno bundle https://deno.land/x/sha256@v1.0.2/mod.ts + +// The MIT License (MIT) +// +// Original work (c) Marco Paland (marco@paland.com) 2015-2018, PALANDesign Hannover, Germany +// +// Deno port Copyright (c) 2019 Noah Anabiik Schwarz +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +function getLengths(b64) { + const len = b64.length; + let validLen = b64.indexOf("="); + if (validLen === -1) { + validLen = len; + } + const placeHoldersLen = validLen === len ? 0 : 4 - validLen % 4; + return [ + validLen, + placeHoldersLen + ]; +} +function init(lookup, revLookup, urlsafe = false) { + function _byteLength(validLen, placeHoldersLen) { + return Math.floor((validLen + placeHoldersLen) * 3 / 4 - placeHoldersLen); + } + function tripletToBase64(num) { + return lookup[num >> 18 & 0x3f] + lookup[num >> 12 & 0x3f] + lookup[num >> 6 & 0x3f] + lookup[num & 0x3f]; + } + function encodeChunk(buf, start, end) { + const out = new Array((end - start) / 3); + for(let i = start, curTriplet = 0; i < end; i += 3){ + out[curTriplet++] = tripletToBase64((buf[i] << 16) + (buf[i + 1] << 8) + buf[i + 2]); + } + return out.join(""); + } + return { + byteLength (b64) { + return _byteLength.apply(null, getLengths(b64)); + }, + toUint8Array (b64) { + const [validLen, placeHoldersLen] = getLengths(b64); + const buf = new Uint8Array(_byteLength(validLen, placeHoldersLen)); + const len = placeHoldersLen ? validLen - 4 : validLen; + let tmp; + let curByte = 0; + let i; + for(i = 0; i < len; i += 4){ + tmp = revLookup[b64.charCodeAt(i)] << 18 | revLookup[b64.charCodeAt(i + 1)] << 12 | revLookup[b64.charCodeAt(i + 2)] << 6 | revLookup[b64.charCodeAt(i + 3)]; + buf[curByte++] = tmp >> 16 & 0xff; + buf[curByte++] = tmp >> 8 & 0xff; + buf[curByte++] = tmp & 0xff; + } + if (placeHoldersLen === 2) { + tmp = revLookup[b64.charCodeAt(i)] << 2 | revLookup[b64.charCodeAt(i + 1)] >> 4; + buf[curByte++] = tmp & 0xff; + } else if (placeHoldersLen === 1) { + tmp = revLookup[b64.charCodeAt(i)] << 10 | revLookup[b64.charCodeAt(i + 1)] << 4 | revLookup[b64.charCodeAt(i + 2)] >> 2; + buf[curByte++] = tmp >> 8 & 0xff; + buf[curByte++] = tmp & 0xff; + } + return buf; + }, + fromUint8Array (buf) { + const maxChunkLength = 16383; + const len = buf.length; + const extraBytes = len % 3; + const len2 = len - extraBytes; + const parts = new Array(Math.ceil(len2 / 16383) + (extraBytes ? 1 : 0)); + let curChunk = 0; + let chunkEnd; + for(let i = 0; i < len2; i += maxChunkLength){ + chunkEnd = i + maxChunkLength; + parts[curChunk++] = encodeChunk(buf, i, chunkEnd > len2 ? len2 : chunkEnd); + } + let tmp; + if (extraBytes === 1) { + tmp = buf[len2]; + parts[curChunk] = lookup[tmp >> 2] + lookup[tmp << 4 & 0x3f]; + if (!urlsafe) parts[curChunk] += "=="; + } else if (extraBytes === 2) { + tmp = buf[len2] << 8 | buf[len2 + 1] & 0xff; + parts[curChunk] = lookup[tmp >> 10] + lookup[tmp >> 4 & 0x3f] + lookup[tmp << 2 & 0x3f]; + if (!urlsafe) parts[curChunk] += "="; + } + return parts.join(""); + } + }; +} +const lookup = []; +const revLookup = []; +const code = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"; +for(let i = 0, l = code.length; i < l; ++i){ + lookup[i] = code[i]; + revLookup[code.charCodeAt(i)] = i; +} +const { byteLength , toUint8Array , fromUint8Array } = init(lookup, revLookup, true); +const decoder = new TextDecoder(); +const encoder = new TextEncoder(); +function toHexString(buf) { + return buf.reduce((hex, __byte)=>`${hex}${__byte < 16 ? "0" : ""}${__byte.toString(16)}`, ""); +} +function fromHexString(hex) { + const len = hex.length; + if (len % 2 || !/^[0-9a-fA-F]+$/.test(hex)) { + throw new TypeError("Invalid hex string."); + } + hex = hex.toLowerCase(); + const buf = new Uint8Array(Math.floor(len / 2)); + const end = len / 2; + for(let i = 0; i < end; ++i){ + buf[i] = parseInt(hex.substr(i * 2, 2), 16); + } + return buf; +} +function decode(buf, encoding = "utf8") { + if (/^utf-?8$/i.test(encoding)) { + return decoder.decode(buf); + } else if (/^base64$/i.test(encoding)) { + return fromUint8Array(buf); + } else if (/^hex(?:adecimal)?$/i.test(encoding)) { + return toHexString(buf); + } else { + throw new TypeError("Unsupported string encoding."); + } +} +function encode(str, encoding = "utf8") { + if (/^utf-?8$/i.test(encoding)) { + return encoder.encode(str); + } else if (/^base64$/i.test(encoding)) { + return toUint8Array(str); + } else if (/^hex(?:adecimal)?$/i.test(encoding)) { + return fromHexString(str); + } else { + throw new TypeError("Unsupported string encoding."); + } +} +const BYTES = 32; +class SHA256 { + hashSize = 32; + _buf; + _bufIdx; + _count; + _K; + _H; + _finalized; + constructor(){ + this._buf = new Uint8Array(64); + this._K = new Uint32Array([ + 0x428a2f98, + 0x71374491, + 0xb5c0fbcf, + 0xe9b5dba5, + 0x3956c25b, + 0x59f111f1, + 0x923f82a4, + 0xab1c5ed5, + 0xd807aa98, + 0x12835b01, + 0x243185be, + 0x550c7dc3, + 0x72be5d74, + 0x80deb1fe, + 0x9bdc06a7, + 0xc19bf174, + 0xe49b69c1, + 0xefbe4786, + 0x0fc19dc6, + 0x240ca1cc, + 0x2de92c6f, + 0x4a7484aa, + 0x5cb0a9dc, + 0x76f988da, + 0x983e5152, + 0xa831c66d, + 0xb00327c8, + 0xbf597fc7, + 0xc6e00bf3, + 0xd5a79147, + 0x06ca6351, + 0x14292967, + 0x27b70a85, + 0x2e1b2138, + 0x4d2c6dfc, + 0x53380d13, + 0x650a7354, + 0x766a0abb, + 0x81c2c92e, + 0x92722c85, + 0xa2bfe8a1, + 0xa81a664b, + 0xc24b8b70, + 0xc76c51a3, + 0xd192e819, + 0xd6990624, + 0xf40e3585, + 0x106aa070, + 0x19a4c116, + 0x1e376c08, + 0x2748774c, + 0x34b0bcb5, + 0x391c0cb3, + 0x4ed8aa4a, + 0x5b9cca4f, + 0x682e6ff3, + 0x748f82ee, + 0x78a5636f, + 0x84c87814, + 0x8cc70208, + 0x90befffa, + 0xa4506ceb, + 0xbef9a3f7, + 0xc67178f2 + ]); + this.init(); + } + init() { + this._H = new Uint32Array([ + 0x6a09e667, + 0xbb67ae85, + 0x3c6ef372, + 0xa54ff53a, + 0x510e527f, + 0x9b05688c, + 0x1f83d9ab, + 0x5be0cd19 + ]); + this._bufIdx = 0; + this._count = new Uint32Array(2); + this._buf.fill(0); + this._finalized = false; + return this; + } + update(msg, inputEncoding) { + if (msg === null) { + throw new TypeError("msg must be a string or Uint8Array."); + } else if (typeof msg === "string") { + msg = encode(msg, inputEncoding); + } + for(let i = 0, len = msg.length; i < len; i++){ + this._buf[this._bufIdx++] = msg[i]; + if (this._bufIdx === 64) { + this._transform(); + this._bufIdx = 0; + } + } + const c = this._count; + if ((c[0] += msg.length << 3) < msg.length << 3) { + c[1]++; + } + c[1] += msg.length >>> 29; + return this; + } + digest(outputEncoding) { + if (this._finalized) { + throw new Error("digest has already been called."); + } + this._finalized = true; + const b = this._buf; + let idx = this._bufIdx; + b[idx++] = 0x80; + while(idx !== 56){ + if (idx === 64) { + this._transform(); + idx = 0; + } + b[idx++] = 0; + } + const c = this._count; + b[56] = c[1] >>> 24 & 0xff; + b[57] = c[1] >>> 16 & 0xff; + b[58] = c[1] >>> 8 & 0xff; + b[59] = c[1] >>> 0 & 0xff; + b[60] = c[0] >>> 24 & 0xff; + b[61] = c[0] >>> 16 & 0xff; + b[62] = c[0] >>> 8 & 0xff; + b[63] = c[0] >>> 0 & 0xff; + this._transform(); + const hash = new Uint8Array(32); + for(let i = 0; i < 8; i++){ + hash[(i << 2) + 0] = this._H[i] >>> 24 & 0xff; + hash[(i << 2) + 1] = this._H[i] >>> 16 & 0xff; + hash[(i << 2) + 2] = this._H[i] >>> 8 & 0xff; + hash[(i << 2) + 3] = this._H[i] >>> 0 & 0xff; + } + this.init(); + return outputEncoding ? decode(hash, outputEncoding) : hash; + } + _transform() { + const h = this._H; + let h0 = h[0]; + let h1 = h[1]; + let h2 = h[2]; + let h3 = h[3]; + let h4 = h[4]; + let h5 = h[5]; + let h6 = h[6]; + let h7 = h[7]; + const w = new Uint32Array(16); + let i; + for(i = 0; i < 16; i++){ + w[i] = this._buf[(i << 2) + 3] | this._buf[(i << 2) + 2] << 8 | this._buf[(i << 2) + 1] << 16 | this._buf[i << 2] << 24; + } + for(i = 0; i < 64; i++){ + let tmp; + if (i < 16) { + tmp = w[i]; + } else { + let a = w[i + 1 & 15]; + let b = w[i + 14 & 15]; + tmp = w[i & 15] = (a >>> 7 ^ a >>> 18 ^ a >>> 3 ^ a << 25 ^ a << 14) + (b >>> 17 ^ b >>> 19 ^ b >>> 10 ^ b << 15 ^ b << 13) + w[i & 15] + w[i + 9 & 15] | 0; + } + tmp = tmp + h7 + (h4 >>> 6 ^ h4 >>> 11 ^ h4 >>> 25 ^ h4 << 26 ^ h4 << 21 ^ h4 << 7) + (h6 ^ h4 & (h5 ^ h6)) + this._K[i] | 0; + h7 = h6; + h6 = h5; + h5 = h4; + h4 = h3 + tmp; + h3 = h2; + h2 = h1; + h1 = h0; + h0 = tmp + (h1 & h2 ^ h3 & (h1 ^ h2)) + (h1 >>> 2 ^ h1 >>> 13 ^ h1 >>> 22 ^ h1 << 30 ^ h1 << 19 ^ h1 << 10) | 0; + } + h[0] = h[0] + h0 | 0; + h[1] = h[1] + h1 | 0; + h[2] = h[2] + h2 | 0; + h[3] = h[3] + h3 | 0; + h[4] = h[4] + h4 | 0; + h[5] = h[5] + h5 | 0; + h[6] = h[6] + h6 | 0; + h[7] = h[7] + h7 | 0; + } +} +function sha256(msg, inputEncoding, outputEncoding) { + return new SHA256().update(msg, inputEncoding).digest(outputEncoding); +} +export { BYTES as BYTES }; +export { SHA256 as SHA256 }; +export { sha256 as sha256 }; + + diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index 61ea6665..c71cb62c 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -909,6 +909,10 @@ export interface Views { * @param opts - optional options to configure the KV and stream backing */ kv: (name: string, opts?: Partial) => Promise; + os: ( + name: string, + opts?: Partial, + ) => Promise; } // FIXME: pulls must limit to maxAcksInFlight @@ -2556,6 +2560,91 @@ export interface KvPutOptions { previousSeq: number; } +export type ObjectStoreLink = { + // name of other object store + bucket: string; + // link to single object, when empty this means the whole store + name?: string; +}; + +export type ObjectStoreMetaOptions = { + link?: ObjectStoreLink; + max_chunk_size?: number; +}; + +export type ObjectStoreMeta = { + name: string; + description?: string; + headers?: MsgHdrs; + options?: ObjectStoreMetaOptions; +}; + +export interface ObjectInfo extends ObjectStoreMeta { + bucket: string; + nuid: string; + size: number; + chunks: number; + digest: string; + deleted: boolean; + mtime: string; +} + +export interface ObjectLink { + bucket: string; + name?: string; +} + +export type ObjectStoreInfo = { + bucket: string; + description: string; + ttl: Nanos; + storage: StorageType; + replicas: number; + sealed: boolean; + size: number; + backingStore: string; +}; + +export type ObjectStoreOptions = { + description?: string; + ttl?: Nanos; + storage: StorageType; + replicas: number; + "max_bytes": number; + placement: Placement; +}; + +export type ObjectResult = { + info: ObjectInfo; + data: ReadableStream; + error: Promise; +}; + +export interface ObjectStore { + info(name: string): Promise; + list(): Promise; + get(name: string): Promise; + put( + meta: ObjectStoreMeta, + rs: ReadableStream, + ): Promise; + delete(name: string): Promise; + link(name: string, meta: ObjectInfo): Promise; + linkStore(name: string, bucket: ObjectStore): Promise; + watch( + opts?: Partial< + { + ignoreDeletes?: boolean; + includeHistory?: boolean; + } + >, + ): Promise>; + seal(): Promise; + status(opts?: Partial): Promise; + update(name: string, meta: Partial): Promise; + destroy(): Promise; +} + export type callbackFn = () => void; export enum DirectMsgHeaders { diff --git a/tests/objectstore_test.ts b/tests/objectstore_test.ts new file mode 100644 index 00000000..ecdb03b2 --- /dev/null +++ b/tests/objectstore_test.ts @@ -0,0 +1,645 @@ +/* + * Copyright 2022 The NATS Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { cleanup, jetstreamServerConf, setup } from "./jstest_util.ts"; +import { notCompatible } from "./helpers/mod.ts"; +import { ObjectStoreInfoImpl } from "../nats-base-client/objectstore.ts"; +import { + assert, + assertEquals, + assertExists, + equal, +} from "https://deno.land/std@0.95.0/testing/asserts.ts"; +import { DataBuffer } from "../nats-base-client/databuffer.ts"; +import { crypto } from "https://deno.land/std@0.136.0/crypto/mod.ts"; +import { headers, StorageType, StringCodec } from "../nats-base-client/mod.ts"; +import { assertRejects } from "https://deno.land/std@0.125.0/testing/asserts.ts"; +import { equals } from "https://deno.land/std@0.111.0/bytes/mod.ts"; +import { ObjectInfo, ObjectStoreMeta } from "../nats-base-client/types.ts"; + +function readableStreamFrom(data: Uint8Array): ReadableStream { + return new ReadableStream({ + pull(controller) { + controller.enqueue(data); + controller.close(); + }, + }); +} + +async function fromReadableStream( + rs: ReadableStream, +): Promise { + const buf = new DataBuffer(); + const reader = rs.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) { + return buf.drain(); + } + if (value && value.length) { + buf.fill(value); + } + } +} + +function makeData(n: number): Uint8Array { + const data = new Uint8Array(n); + let index = 0; + let bytes = n; + while (true) { + if (bytes === 0) { + break; + } + const len = bytes > 65536 ? 65536 : bytes; + bytes -= len; + const buf = new Uint8Array(len); + crypto.getRandomValues(buf); + data.set(buf, index); + index += buf.length; + } + return data; +} + +Deno.test("objectstore - basics", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + + const blob = new Uint8Array(65536); + crypto.getRandomValues(blob); + + const js = nc.jetstream(); + const os = await js.views.os("OBJS", { description: "testing" }); + + const oi = await os.put({ name: "BLOB" }, readableStreamFrom(blob)); + assertEquals(oi.bucket, "OBJS"); + assertEquals(oi.nuid.length, 22); + assertEquals(oi.name, "BLOB"); + // assert(1000 > (Date.now() - millis(oi.mtime))); + + const jsm = await nc.jetstreamManager(); + const si = await jsm.streams.info("OBJ_OBJS"); + assertExists(si); + + const osi = await os.seal(); + assertEquals(osi.sealed, true); + assert(osi.size > blob.length); + assertEquals(osi.storage, StorageType.File); + assertEquals(osi.description, "testing"); + + let or = await os.get("foo"); + assertEquals(or, null); + + or = await os.get("BLOB"); + assertExists(or); + const read = await fromReadableStream(or!.data); + equal(read, blob); + + assertEquals(await os.destroy(), true); + await assertRejects( + async () => { + await jsm.streams.info("OBJ_OBJS"); + }, + Error, + "stream not found", + ); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - default status", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test", { description: "testing" }); + const blob = new Uint8Array(65536); + crypto.getRandomValues(blob); + await os.put({ name: "BLOB" }, readableStreamFrom(blob)); + + const status = await os.status(); + assertEquals(status.backingStore, "JetStream"); + assertEquals(status.bucket, "test"); + const si = status as ObjectStoreInfoImpl; + assertEquals(si.si.config.name, "OBJ_test"); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - chunked content", async () => { + const { ns, nc } = await setup(jetstreamServerConf({ + jetstream: { + max_memory_store: 10 * 1024 * 1024 + 33, + }, + }, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test", { storage: StorageType.Memory }); + + const data = makeData(nc.info!.max_payload * 3); + await os.put( + { name: "blob", options: { max_chunk_size: nc.info!.max_payload } }, + readableStreamFrom(data), + ); + + const d = await os.get("blob"); + const vv = await fromReadableStream(d!.data); + equals(vv, data); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - multi content", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test", { storage: StorageType.Memory }); + + const a = makeData(128); + await os.put( + { name: "a.js", options: { max_chunk_size: 1 } }, + readableStreamFrom(a), + ); + const sc = StringCodec(); + const b = sc.encode("hello world from object store"); + await os.put( + { name: "b.js", options: { max_chunk_size: nc.info!.max_payload } }, + readableStreamFrom(b), + ); + + let d = await os.get("a.js"); + let vv = await fromReadableStream(d!.data); + equals(vv, a); + + d = await os.get("b.js"); + vv = await fromReadableStream(d!.data); + equals(vv, b); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - delete markers", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test", { storage: StorageType.Memory }); + + const a = makeData(128); + await os.put( + { name: "a", options: { max_chunk_size: 10 } }, + readableStreamFrom(a), + ); + + const p = await os.delete("a"); + assertEquals(p.purged, 13); + + const info = await os.info("a"); + assertExists(info); + assertEquals(info!.deleted, true); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - multi with delete", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test", { storage: StorageType.Memory }); + + const sc = StringCodec(); + + await os.put( + { name: "a" }, + readableStreamFrom(sc.encode("a!")), + ); + + const si = await os.status({ subjects_filter: ">" }) as ObjectStoreInfoImpl; + await os.put( + { name: "b", options: { max_chunk_size: nc.info!.max_payload } }, + readableStreamFrom(sc.encode("b!")), + ); + + await os.get("b"); + await os.delete("b"); + + const s2 = await os.status({ subjects_filter: ">" }) as ObjectStoreInfoImpl; + // should have the tumbstone for the deleted subject + assertEquals(s2.si.state.messages, si.si.state.messages + 1); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - object names", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test", { storage: StorageType.Memory }); + const sc = StringCodec(); + await os.put({ name: "blob.txt" }, readableStreamFrom(sc.encode("A"))); + await os.put({ name: "foo bar" }, readableStreamFrom(sc.encode("A"))); + await os.put({ name: " " }, readableStreamFrom(sc.encode("A"))); + + await assertRejects(async () => { + await os.put({ name: "*" }, readableStreamFrom(sc.encode("A"))); + }); + await assertRejects(async () => { + await os.put({ name: ">" }, readableStreamFrom(sc.encode("A"))); + }); + await assertRejects(async () => { + await os.put({ name: "" }, readableStreamFrom(sc.encode("A"))); + }); + await cleanup(ns, nc); +}); + +Deno.test("objectstore - metadata", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test", { storage: StorageType.Memory }); + const sc = StringCodec(); + + await os.put({ name: "a" }, readableStreamFrom(sc.encode("A"))); + + // rename a + let meta = { name: "b" } as ObjectStoreMeta; + await os.update("a", meta); + let info = await os.info("b"); + assertExists(info); + assertEquals(info!.name, "b"); + + // add some headers + meta = {} as ObjectStoreMeta; + meta.headers = headers(); + meta.headers.set("color", "red"); + await os.update("b", meta); + + info = await os.info("b"); + assertExists(info); + assertEquals(info!.headers?.get("color"), "red"); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - empty entry", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("empty"); + + const oi = await os.put( + { name: "empty" }, + readableStreamFrom(new Uint8Array(0)), + ); + assertEquals(oi.nuid.length, 22); + assertEquals(oi.name, "empty"); + + const or = await os.get("empty"); + assert(or !== null); + assertEquals(await or.error, null); + const v = await fromReadableStream(or.data); + assertEquals(v.length, 0); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - list", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test"); + let infos = await os.list(); + assertEquals(infos.length, 0); + + await os.put( + { name: "a" }, + readableStreamFrom(new Uint8Array(0)), + ); + + infos = await os.list(); + assertEquals(infos.length, 1); + assertEquals(infos[0].name, "a"); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - watch initially empty", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test"); + + const buf: ObjectInfo[] = []; + const iter = await os.watch({ includeHistory: true }); + const done = (async () => { + for await (const info of iter) { + if (info === null) { + assertEquals(buf.length, 0); + } else { + buf.push(info); + if (buf.length === 3) { + break; + } + } + } + })(); + const infos = await os.list(); + assertEquals(infos.length, 0); + + const sc = StringCodec(); + await os.put( + { name: "a" }, + readableStreamFrom(sc.encode("a")), + ); + + await os.put( + { name: "a" }, + readableStreamFrom(sc.encode("aa")), + ); + + await os.put({ name: "b" }, readableStreamFrom(sc.encode("b"))); + + await done; + + assertEquals(buf.length, 3); + assertEquals(buf[0].name, "a"); + assertEquals(buf[0].size, 1); + assertEquals(buf[1].name, "a"); + assertEquals(buf[1].size, 2); + assertEquals(buf[2].name, "b"); + assertEquals(buf[2].size, 1); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - watch skip history", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test"); + + const sc = StringCodec(); + await os.put( + { name: "a" }, + readableStreamFrom(sc.encode("a")), + ); + + await os.put( + { name: "a" }, + readableStreamFrom(sc.encode("aa")), + ); + + const buf: ObjectInfo[] = []; + const iter = await os.watch({ includeHistory: false }); + const done = (async () => { + for await (const info of iter) { + if (info === null) { + assertEquals(buf.length, 1); + } else { + buf.push(info); + if (buf.length === 1) { + break; + } + } + } + })(); + + await os.put({ name: "c" }, readableStreamFrom(sc.encode("c"))); + + await done; + + assertEquals(buf.length, 1); + assertEquals(buf[0].name, "c"); + assertEquals(buf[0].size, 1); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - watch history", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test"); + + const sc = StringCodec(); + await os.put( + { name: "a" }, + readableStreamFrom(sc.encode("a")), + ); + + await os.put( + { name: "a" }, + readableStreamFrom(sc.encode("aa")), + ); + + const buf: ObjectInfo[] = []; + const iter = await os.watch({ includeHistory: true }); + const done = (async () => { + for await (const info of iter) { + if (info === null) { + assertEquals(buf.length, 1); + } else { + buf.push(info); + if (buf.length === 2) { + break; + } + } + } + })(); + + await os.put({ name: "c" }, readableStreamFrom(sc.encode("c"))); + + await done; + + assertEquals(buf.length, 2); + assertEquals(buf[0].name, "a"); + assertEquals(buf[0].size, 2); + assertEquals(buf[1].name, "c"); + assertEquals(buf[1].size, 1); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - self link", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test"); + + const sc = StringCodec(); + const src = await os.put( + { name: "a" }, + readableStreamFrom(sc.encode("a")), + ); + const oi = await os.link("ref", src); + assertEquals(oi.options?.link, undefined); + assertEquals(oi.nuid, src.nuid); + + const a = await os.list(); + assertEquals(a.length, 2); + assertEquals(a[0].name, "a"); + assertEquals(a[1].name, "ref"); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - external link", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test"); + + const sc = StringCodec(); + const src = await os.put( + { name: "a" }, + readableStreamFrom(sc.encode("a")), + ); + + const os2 = await js.views.os("another"); + const io = await os2.link("ref", src); + assertExists(io.options?.link); + assertEquals(io.options?.link?.bucket, "test"); + assertEquals(io.options?.link?.name, "a"); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - store link", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test"); + + const os2 = await js.views.os("another"); + const si = await os2.linkStore("src", os); + assertExists(si.options?.link); + assertEquals(si.options?.link?.bucket, "test"); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - max chunk is max payload", async () => { + const { ns, nc } = await setup(jetstreamServerConf({ + max_payload: 8 * 1024, + }, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + assertEquals(nc.info?.max_payload, 8 * 1024); + + const js = nc.jetstream(); + const os = await js.views.os("test"); + + const rs = readableStreamFrom(makeData(32 * 1024)); + + const info = await os.put({ name: "t" }, rs); + assertEquals(info.size, 32 * 1024); + assertEquals(info.chunks, 4); + assertEquals(info.options?.max_chunk_size, 8 * 1024); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - default chunk is 128k", async () => { + const { ns, nc } = await setup(jetstreamServerConf({ + max_payload: 1024 * 1024, + }, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + assertEquals(nc.info?.max_payload, 1024 * 1024); + + const js = nc.jetstream(); + const os = await js.views.os("test"); + + const rs = readableStreamFrom(makeData(129 * 1024)); + + const info = await os.put({ name: "t" }, rs); + assertEquals(info.size, 129 * 1024); + assertEquals(info.chunks, 2); + assertEquals(info.options?.max_chunk_size, 128 * 1024); + + await cleanup(ns, nc); +}); + +Deno.test("objectstore - sanitize", async () => { + const { ns, nc } = await setup(jetstreamServerConf({ + max_payload: 1024 * 1024, + }, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const js = nc.jetstream(); + const os = await js.views.os("test"); + await os.put({ name: "has.dots.here" }, readableStreamFrom(makeData(1))); + await os.put( + { name: "the spaces are here" }, + readableStreamFrom(makeData(1)), + ); + + const info = await os.status({ subjects_filter: ">" }) as ObjectStoreInfoImpl; + assertEquals(info.si.state?.subjects!["$O.test.M.has_dots_here"], 1); + assertEquals(info.si.state.subjects!["$O.test.M.the_spaces_are_here"], 1); + + await cleanup(ns, nc); +}); + +// Deno.test("objectstore - compat", async () => { +// const nc = await connect(); +// const js = nc.jetstream(); +// const os = await js.views.os("test"); +// console.log(await os.status({ subjects_filter: ">" })); +// +// const a = await os.list(); +// console.log(a); +// +// const rs = await os.get("./main.go"); +// const data = await fromReadableStream(rs!.data); +// const sc = StringCodec(); +// console.log(sc.decode(data)); +// +// await os.put({ name: "hello" }, readableStreamFrom(sc.encode("hello world"))); +// +// await nc.close(); +// });