Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] objectstore (wip) #339

Merged
merged 10 commits into from
Aug 10, 2022
11 changes: 11 additions & 0 deletions nats-base-client/databuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ export class DataBuffer {
}
}

shift(): Uint8Array {
if (this.buffers.length) {
const a = this.buffers.shift();
if (a) {
this.byteLength -= a.length;
return a;
}
}
return new Uint8Array(0);
}

drain(n?: number): Uint8Array {
if (this.buffers.length) {
this.pack();
Expand Down
16 changes: 16 additions & 0 deletions nats-base-client/headers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,20 @@ export class MsgHdrsImpl implements MsgHdrs {
get status(): string {
return `${this.code} ${this.description}`.trim();
}

toRecord(): Record<string, string[]> {
const data = {} as Record<string, string[]>;
this.keys().forEach((v) => {
data[v] = this.values(v);
});
return data;
}

static fromRecord(r: Record<string, string[]>): MsgHdrs {
const h = new MsgHdrsImpl();
for (const k in r) {
h.headers.set(k, r[k]);
}
return h;
}
}
22 changes: 18 additions & 4 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
* limitations under the License.
*/

import type { ConsumerOptsBuilder, KV, KvOptions, Views } from "./types.ts";
import type {
ConsumerOptsBuilder,
KV,
KvOptions,
ObjectStore,
ObjectStoreOptions,
Views,
} from "./types.ts";
import {
AckPolicy,
ConsumerAPI,
Expand Down Expand Up @@ -72,6 +79,7 @@ import { consumerOpts, isConsumerOptsBuilder } from "./jsconsumeropts.ts";
import { Bucket } from "./kv.ts";
import { NatsConnectionImpl } from "./nats.ts";
import { Feature } from "./semver.ts";
import { ObjectStoreImpl } from "./objectstore.ts";

export interface JetStreamSubscriptionInfoable {
info: JetStreamSubscriptionInfo | null;
Expand All @@ -89,14 +97,20 @@ class ViewsImpl implements Views {
js: JetStreamClientImpl;
constructor(js: JetStreamClientImpl) {
this.js = js;
jetstreamPreview(this.js.nc);
}
kv(name: string, opts: Partial<KvOptions> = {}): Promise<KV> {
if (opts.bindOnly) {
return Bucket.bind(this.js, name);
}
return Bucket.create(this.js, name, opts);
}
os(
name: string,
opts: Partial<ObjectStoreOptions> = {},
): Promise<ObjectStore> {
jetstreamPreview(this.js.nc);
return ObjectStoreImpl.create(this.js, name, opts);
}
}

export class JetStreamClientImpl extends BaseApiClient
Expand Down Expand Up @@ -833,11 +847,11 @@ const jetstreamPreview = (() => {
const { lang } = nci?.protocol?.transport;
if (lang) {
console.log(
`\u001B[33m >> jetstream's materialized views functionality in ${lang} is beta functionality \u001B[0m`,
`\u001B[33m >> jetstream's materialized views object store functionality in ${lang} is beta functionality \u001B[0m`,
);
} else {
console.log(
`\u001B[33m >> jetstream's materialized views functionality is beta functionality \u001B[0m`,
`\u001B[33m >> jetstream's materialized views object store functionality is beta functionality \u001B[0m`,
);
}
}
Expand Down
32 changes: 11 additions & 21 deletions nats-base-client/nuid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,18 @@ const minInc = 33;
const maxInc = 333;
const totalLen = preLen + seqLen;

const cryptoObj = initCrypto();

function initCrypto() {
let cryptoObj = null;
if (typeof globalThis !== "undefined") {
if (
"crypto" in globalThis && globalThis.crypto.getRandomValues !== undefined
) {
cryptoObj = globalThis.crypto;
}
function _getRandomValues(a: Uint8Array) {
for (let i = 0; i < a.length; i++) {
a[i] = Math.floor(Math.random() * 255);
}
if (!cryptoObj) {
// shim it
cryptoObj = {
getRandomValues: function (array: Uint8Array) {
for (let i = 0; i < array.length; i++) {
array[i] = Math.floor(Math.random() * 255);
}
},
};
}

function fillRandom(a: Uint8Array) {
if (globalThis?.crypto?.getRandomValues) {
globalThis.crypto.getRandomValues(a);
} else {
_getRandomValues(a);
}
return cryptoObj;
}

/**
Expand Down Expand Up @@ -92,7 +82,7 @@ export class Nuid {
*/
private setPre() {
const cbuf = new Uint8Array(preLen);
cryptoObj.getRandomValues(cbuf);
fillRandom(cbuf);
for (let i = 0; i < preLen; i++) {
const di = cbuf[i] % base;
this.buf[i] = digits.charCodeAt(di);
Expand Down
Loading