Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iterator fixes #190

Merged
merged 4 commits into from
Aug 31, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 30 additions & 15 deletions nats-base-client/ekv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface EncodedEntry<T> {

export interface EncodedRoKV<T> {
get(k: string): Promise<EncodedEntry<T> | null>;
history(k: string): Promise<QueuedIterator<EncodedEntry<T>>>;
history(opts?: { key?: string }): Promise<QueuedIterator<EncodedEntry<T>>>;
watch(opts?: { key?: string }): Promise<QueuedIterator<EncodedEntry<T>>>;
close(): Promise<void>;
status(): Promise<KvStatus>;
Expand Down Expand Up @@ -72,27 +72,42 @@ export class EncodedBucket<T> implements EncodedKV<T> {
return this.bucket.delete(k);
}

async toEncodedIter(
src: QueuedIterator<Entry>,
async history(
opts: { key?: string } = {},
): Promise<QueuedIterator<EncodedEntry<T>>> {
const iter = new QueuedIteratorImpl<EncodedEntry<T>>();
await (async () => {
for await (const e of src) {
iter.push(this.toEncodedEntry(e));
const qi = new QueuedIteratorImpl<EncodedEntry<T>>();
const iter = await this.bucket.history(opts);
(async () => {
for await (const e of iter) {
qi.received++;
qi.push(this.toEncodedEntry(e));
}
})();
iter.stop();
return iter;
}

async history(k: string): Promise<QueuedIterator<EncodedEntry<T>>> {
return this.toEncodedIter(await this.bucket.history(k));
})().then(() => {
qi.stop();
}).catch((err) => {
qi.stop(err);
});
return qi;
}

async watch(
opts: { key?: string } = {},
): Promise<QueuedIterator<EncodedEntry<T>>> {
return this.toEncodedIter(await this.bucket.watch(opts));
const qi = new QueuedIteratorImpl<EncodedEntry<T>>();
const iter = await this.bucket.watch(opts);
(async () => {
for await (const e of iter) {
qi.received++;
qi.push(this.toEncodedEntry(e));
}
})().then(() => {
qi.stop();
iter.stop();
}).catch((err) => {
qi.stop(err);
iter.stop();
});
return qi;
}

keys(): Promise<string[]> {
Expand Down
121 changes: 96 additions & 25 deletions nats-base-client/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,12 @@ const kvPrefix = "KV_";
const kvSubjectPrefix = "$KV";

const validKeyRe = /^[-/=.\w]+$/;
const validSearchKey = /^[-/=.>*\w]+$/;
const validBucketRe = /^[-\w]+$/;

export interface RoKV {
get(k: string): Promise<Entry | null>;
history(k: string): Promise<QueuedIterator<Entry>>;
history(opts?: { key?: string }): Promise<QueuedIterator<Entry>>;
watch(opts?: { key?: string }): Promise<QueuedIterator<Entry>>;
close(): Promise<void>;
status(): Promise<KvStatus>;
Expand All @@ -171,6 +172,37 @@ export function validateKey(k: string) {
}
}

export function validateSearchKey(k: string) {
if (k.startsWith(".") || k.endsWith(".") || !validSearchKey.test(k)) {
throw new Error(`invalid key: ${k}`);
}
}

export function hasWildcards(k: string) {
if (k.startsWith(".") || k.endsWith(".")) {
throw new Error(`invalid key: ${k}`);
}
const chunks = k.split(".");

let hasWildcards = false;
for (let i = 0; i < chunks.length; i++) {
switch (chunks[i]) {
case "*":
hasWildcards = true;
break;
case ">":
if (i !== chunks.length - 1) {
throw new Error(`invalid key: ${k}`);
}
hasWildcards = true;
break;
default:
// continue
}
}
return hasWildcards;
}

// this exported for tests
export function validateBucket(name: string) {
if (!validBucketRe.test(name)) {
Expand All @@ -184,12 +216,14 @@ export class Bucket implements KV {
stream!: string;
bucket: string;
codec!: KvCodecs;
_prefixLen: number;

constructor(bucket: string, jsm: JetStreamManager, js: JetStreamClient) {
validateBucket(bucket);
this.jsm = jsm;
this.js = js;
this.bucket = bucket;
this._prefixLen = 0;
}

static async create(
Expand Down Expand Up @@ -236,13 +270,20 @@ export class Bucket implements KV {
}

subjectForBucket(): string {
return `${kvSubjectPrefix}.${this.bucket}.*`;
return `${kvSubjectPrefix}.${this.bucket}.>`;
}

subjectForKey(k: string): string {
return `${kvSubjectPrefix}.${this.bucket}.${k}`;
}

get prefixLen(): number {
if (this._prefixLen === 0) {
this._prefixLen = `${kvSubjectPrefix}.${this.bucket}.`.length;
}
return this._prefixLen;
}

encodeKey(key: string): string {
const chunks: string[] = [];
for (const t of key.split(".")) {
Expand All @@ -259,8 +300,28 @@ export class Bucket implements KV {
return chunks.join(".");
}

decodeKey(ekey: string): string {
const chunks: string[] = [];
for (const t of ekey.split(".")) {
switch (t) {
case ">":
case "*":
chunks.push(t);
break;
default:
chunks.push(this.codec.key.decode(t));
break;
}
}
return chunks.join(".");
}

validateKey = validateKey;

validateSearchKey = validateSearchKey;

hasWildcards = hasWildcards;

close(): Promise<void> {
return Promise.resolve();
}
Expand All @@ -279,8 +340,7 @@ export class Bucket implements KV {
}

jmToEntry(k: string, jm: JsMsg): Entry {
const chunks = jm.subject.split(".");
const key = this.codec.key.decode(chunks[chunks.length - 1]);
const key = this.decodeKey(jm.subject.substring(this.prefixLen));
const e = {
bucket: this.bucket,
key: key,
Expand All @@ -291,7 +351,7 @@ export class Bucket implements KV {
operation: jm.headers?.get(kvOperationHdr) === "DEL" ? "DEL" : "PUT",
} as Entry;

if (k !== "*") {
if (k !== ">") {
e.delta = jm.info.pending;
}
return e;
Expand Down Expand Up @@ -345,28 +405,28 @@ export class Bucket implements KV {
await this.js.publish(this.subjectForKey(ek), Empty, { headers: h });
}

consumerOn(k: string, lastOnly = false): Promise<ConsumerInfo> {
consumerOn(k: string, history = false): Promise<ConsumerInfo> {
const ek = this.encodeKey(k);
if (k !== "*") {
this.validateKey(ek);
}
this.validateSearchKey(k);

const ji = this.js as JetStreamClientImpl;
const nc = ji.nc;
const inbox = createInbox(nc.options.inboxPrefix);
const opts: Partial<ConsumerConfig> = {
"deliver_subject": inbox,
"deliver_policy": lastOnly
? DeliverPolicy.LastPerSubject
: DeliverPolicy.All,
"deliver_policy": history
? DeliverPolicy.All
: DeliverPolicy.LastPerSubject,
"ack_policy": AckPolicy.Explicit,
"filter_subject": this.subjectForKey(ek),
"flow_control": k === "*",
"flow_control": true,
};
return this.jsm.consumers.add(this.stream, opts);
}

async history(k: string): Promise<QueuedIterator<Entry>> {
const ci = await this.consumerOn(k);
async history(opts: { key?: string } = {}): Promise<QueuedIterator<Entry>> {
const k = opts.key ?? ">";
const ci = await this.consumerOn(k, true);
const max = ci.num_pending;
const qi = new QueuedIteratorImpl<Entry>();
if (max === 0) {
Expand All @@ -392,14 +452,18 @@ export class Bucket implements KV {
}
qi.received++;
const jm = toJsMsg(msg);
qi.push(this.jmToEntry(k, jm));
const e = this.jmToEntry(k, jm);
qi.push(e);
jm.ack();
if (qi.received === max) {
sub.unsubscribe();
}
}
},
});
qi.iterClosed.then(() => {
sub.unsubscribe();
});
sub.closed.then(() => {
qi.stop();
}).catch((err) => {
Expand All @@ -410,13 +474,14 @@ export class Bucket implements KV {
}

async watch(opts: { key?: string } = {}): Promise<QueuedIterator<Entry>> {
const k = opts.key ?? "*";
const ci = await this.consumerOn(k, k !== "*");
const k = opts.key ?? ">";
const ci = await this.consumerOn(k, false);
const qi = new QueuedIteratorImpl<Entry>();

const ji = this.jsm as JetStreamManagerImpl;
const nc = ji.nc;
const subj = ci.config.deliver_subject!;

const sub = nc.subscribe(subj, {
callback: (err, msg) => {
if (err === null) {
Expand All @@ -438,6 +503,9 @@ export class Bucket implements KV {
}
},
});
qi.iterClosed.then(() => {
sub.unsubscribe();
});
sub.closed.then(() => {
qi.stop();
}).catch((err) => {
Expand All @@ -449,8 +517,11 @@ export class Bucket implements KV {

async keys(): Promise<string[]> {
const d = deferred<string[]>();
const s: string[] = [];
const ci = await this.consumerOn("*", true);
const keys: string[] = [];
const ci = await this.consumerOn(">", false);
if (ci.num_pending === 0) {
return Promise.resolve(keys);
}
const ji = this.jsm as JetStreamManagerImpl;
const nc = ji.nc;
const subj = ci.config.deliver_subject!;
Expand All @@ -461,21 +532,21 @@ export class Bucket implements KV {
if (err) {
sub.unsubscribe();
d.reject(err);
} else if (isFlowControlMsg(m)) {
} else if (isFlowControlMsg(m) || isHeartbeatMsg(m)) {
m.respond();
} else {
const chunks = m.subject.split(".");
s.push(this.codec.key.decode(chunks[chunks.length - 1]));
const jm = toJsMsg(m);
const key = this.decodeKey(jm.subject.substring(this.prefixLen));
keys.push(key);
m.respond();
const info = parseInfo(m.reply!);
if (info.pending === 0) {
sub.unsubscribe();
d.resolve(s);
d.resolve(keys);
}
}
}
})();

return d;
}

Expand Down
Loading