Skip to content

Commit

Permalink
[FEAT] objectstore (wip) (#339)
Browse files Browse the repository at this point in the history
[FEAT] objectstore
[CHANGE] remove beta from kv notice, targetted object store instead.
[FIX] simplified the nuid "shim" for getRandomValues
[FIX] put an assertion for crypto.subtle on the objectstore create, as this is required and expected but may be missing from Node (specially 14)
  • Loading branch information
aricart authored Aug 10, 2022
1 parent a419d44 commit e07ec14
Show file tree
Hide file tree
Showing 8 changed files with 1,879 additions and 25 deletions.
11 changes: 11 additions & 0 deletions nats-base-client/databuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ export class DataBuffer {
}
}

shift(): Uint8Array {
if (this.buffers.length) {
const a = this.buffers.shift();
if (a) {
this.byteLength -= a.length;
return a;
}
}
return new Uint8Array(0);
}

drain(n?: number): Uint8Array {
if (this.buffers.length) {
this.pack();
Expand Down
16 changes: 16 additions & 0 deletions nats-base-client/headers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,20 @@ export class MsgHdrsImpl implements MsgHdrs {
get status(): string {
return `${this.code} ${this.description}`.trim();
}

toRecord(): Record<string, string[]> {
const data = {} as Record<string, string[]>;
this.keys().forEach((v) => {
data[v] = this.values(v);
});
return data;
}

static fromRecord(r: Record<string, string[]>): MsgHdrs {
const h = new MsgHdrsImpl();
for (const k in r) {
h.headers.set(k, r[k]);
}
return h;
}
}
22 changes: 18 additions & 4 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
* limitations under the License.
*/

import type { ConsumerOptsBuilder, KV, KvOptions, Views } from "./types.ts";
import type {
ConsumerOptsBuilder,
KV,
KvOptions,
ObjectStore,
ObjectStoreOptions,
Views,
} from "./types.ts";
import {
AckPolicy,
ConsumerAPI,
Expand Down Expand Up @@ -72,6 +79,7 @@ import { consumerOpts, isConsumerOptsBuilder } from "./jsconsumeropts.ts";
import { Bucket } from "./kv.ts";
import { NatsConnectionImpl } from "./nats.ts";
import { Feature } from "./semver.ts";
import { ObjectStoreImpl } from "./objectstore.ts";

export interface JetStreamSubscriptionInfoable {
info: JetStreamSubscriptionInfo | null;
Expand All @@ -89,14 +97,20 @@ class ViewsImpl implements Views {
js: JetStreamClientImpl;
constructor(js: JetStreamClientImpl) {
this.js = js;
jetstreamPreview(this.js.nc);
}
kv(name: string, opts: Partial<KvOptions> = {}): Promise<KV> {
if (opts.bindOnly) {
return Bucket.bind(this.js, name);
}
return Bucket.create(this.js, name, opts);
}
os(
name: string,
opts: Partial<ObjectStoreOptions> = {},
): Promise<ObjectStore> {
jetstreamPreview(this.js.nc);
return ObjectStoreImpl.create(this.js, name, opts);
}
}

export class JetStreamClientImpl extends BaseApiClient
Expand Down Expand Up @@ -833,11 +847,11 @@ const jetstreamPreview = (() => {
const { lang } = nci?.protocol?.transport;
if (lang) {
console.log(
`\u001B[33m >> jetstream's materialized views functionality in ${lang} is beta functionality \u001B[0m`,
`\u001B[33m >> jetstream's materialized views object store functionality in ${lang} is beta functionality \u001B[0m`,
);
} else {
console.log(
`\u001B[33m >> jetstream's materialized views functionality is beta functionality \u001B[0m`,
`\u001B[33m >> jetstream's materialized views object store functionality is beta functionality \u001B[0m`,
);
}
}
Expand Down
32 changes: 11 additions & 21 deletions nats-base-client/nuid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,18 @@ const minInc = 33;
const maxInc = 333;
const totalLen = preLen + seqLen;

const cryptoObj = initCrypto();

function initCrypto() {
let cryptoObj = null;
if (typeof globalThis !== "undefined") {
if (
"crypto" in globalThis && globalThis.crypto.getRandomValues !== undefined
) {
cryptoObj = globalThis.crypto;
}
function _getRandomValues(a: Uint8Array) {
for (let i = 0; i < a.length; i++) {
a[i] = Math.floor(Math.random() * 255);
}
if (!cryptoObj) {
// shim it
cryptoObj = {
getRandomValues: function (array: Uint8Array) {
for (let i = 0; i < array.length; i++) {
array[i] = Math.floor(Math.random() * 255);
}
},
};
}

function fillRandom(a: Uint8Array) {
if (globalThis?.crypto?.getRandomValues) {
globalThis.crypto.getRandomValues(a);
} else {
_getRandomValues(a);
}
return cryptoObj;
}

/**
Expand Down Expand Up @@ -92,7 +82,7 @@ export class Nuid {
*/
private setPre() {
const cbuf = new Uint8Array(preLen);
cryptoObj.getRandomValues(cbuf);
fillRandom(cbuf);
for (let i = 0; i < preLen; i++) {
const di = cbuf[i] % base;
this.buf[i] = digits.charCodeAt(di);
Expand Down
Loading

0 comments on commit e07ec14

Please sign in to comment.