From 8513d540754dde91c9b254e79613dce9229d86ff Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 19 Oct 2021 16:57:25 -0500 Subject: [PATCH] Kv changes (#213) [CHANGE] updated KV implementation to use ordered consumers, headers_only for history, watches, and keys. [CHANGE] updated API to match changes in the ADR. --- nats-base-client/kv.ts | 319 ++++++++++++++++++-------------------- nats-base-client/types.ts | 13 +- tests/jetstream_test.ts | 4 +- tests/kv_test.ts | 87 ++++++++--- 4 files changed, 227 insertions(+), 196 deletions(-) diff --git a/nats-base-client/kv.ts b/nats-base-client/kv.ts index 38cfabfa..7c9db42e 100644 --- a/nats-base-client/kv.ts +++ b/nats-base-client/kv.ts @@ -23,6 +23,7 @@ import { JetStreamClient, JetStreamManager, JetStreamPublishOptions, + JsHeaders, JsMsg, KV, KvCodec, @@ -41,20 +42,12 @@ import { StreamConfig, } from "./types.ts"; import { JetStreamClientImpl } from "./jsclient.ts"; -import { JetStreamManagerImpl } from "./jsm.ts"; -import { - checkJsError, - isFlowControlMsg, - isHeartbeatMsg, - millis, - nanos, -} from "./jsutil.ts"; -import { isNatsError } from "./error.ts"; +import { millis, nanos } from "./jsutil.ts"; import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts"; import { deferred } from "./util.ts"; -import { parseInfo, toJsMsg } from "./jsmsg.ts"; import { headers } from "./headers.ts"; import { createInbox } from "./protocol.ts"; +import { consumerOpts } from "./mod.ts"; export function Base64KeyCodec(): KvCodec { return { @@ -99,7 +92,8 @@ export function defaultBucketOpts(): Partial { }; } -export const kvOriginClusterHdr = "KV-Origin-Cluster"; +type OperationType = "PUT" | "DEL" | "PURGE"; + export const kvOperationHdr = "KV-Operation"; const kvPrefix = "KV_"; const kvSubjectPrefix = "$KV"; @@ -198,6 +192,7 @@ export class Bucket implements KV, KvRemove { if (bo.ttl) { sc.max_age = nanos(bo.ttl); } + sc.allow_rollup_hdrs = true; try { await this.jsm.streams.info(sc.name); @@ -276,9 +271,8 @@ export class Bucket implements KV, KvRemove { value: sm.data, delta: 0, created: sm.time, - seq: sm.seq, - origin_cluster: sm.header.get(kvOriginClusterHdr), - operation: sm.header.get(kvOperationHdr) === "DEL" ? "DEL" : "PUT", + revision: sm.seq, + operation: sm.header.get(kvOperationHdr) as OperationType || "PUT", }; } @@ -289,9 +283,8 @@ export class Bucket implements KV, KvRemove { key: key, value: jm.data, created: new Date(millis(jm.info.timestampNanos)), - seq: jm.seq, - origin_cluster: jm.headers?.get(kvOriginClusterHdr), - operation: jm.headers?.get(kvOperationHdr) === "DEL" ? "DEL" : "PUT", + revision: jm.seq, + operation: jm.headers?.get(kvOperationHdr) as OperationType || "PUT", } as KvEntry; if (k !== ">") { @@ -300,6 +293,17 @@ export class Bucket implements KV, KvRemove { return e; } + create(k: string, data: Uint8Array): Promise { + return this.put(k, data, { previousSeq: 0 }); + } + + update(k: string, data: Uint8Array, version: number): Promise { + if (version <= 0) { + throw new Error("version must be greater than 0"); + } + return this.put(k, data, { previousSeq: version }); + } + async put( k: string, data: Uint8Array, @@ -308,14 +312,11 @@ export class Bucket implements KV, KvRemove { const ek = this.encodeKey(k); this.validateKey(ek); - const ji = this.js as JetStreamClientImpl; - const cluster = ji.nc.info?.cluster ?? ""; - const h = headers(); - h.set(kvOriginClusterHdr, cluster); - const o = { headers: h } as JetStreamPublishOptions; - if (opts.previousSeq) { - o.expect = {}; - o.expect.lastSubjectSequence = opts.previousSeq; + const o = {} as JetStreamPublishOptions; + if (opts.previousSeq !== undefined) { + const h = headers(); + o.headers = h; + h.set("Nats-Expected-Last-Subject-Sequence", `${opts.previousSeq}`); } const pa = await this.js.publish(this.subjectForKey(ek), data, o); return pa.seq; @@ -337,20 +338,17 @@ export class Bucket implements KV, KvRemove { } } - async _delete(k: string): Promise { - const ek = this.encodeKey(k); - this.validateKey(ek); - const ji = this.js as JetStreamClientImpl; - const cluster = ji.nc.info?.cluster ?? ""; - const h = headers(); - h.set(kvOriginClusterHdr, cluster); - h.set(kvOperationHdr, "DEL"); - await this.js.publish(this.subjectForKey(ek), Empty, { headers: h }); + purge(k: string): Promise { + return this._deleteOrPurge(k, "PURGE"); + } + + delete(k: string): Promise { + return this._deleteOrPurge(k, "DEL"); } - async delete(k: string): Promise { + async _deleteOrPurge(k: string, op: "DEL" | "PURGE") { if (!this.hasWildcards(k)) { - return this._delete(k); + return this._doDeleteOrPurge(k, op); } const keys = await this.keys(k); if (keys.length === 0) { @@ -359,7 +357,7 @@ export class Bucket implements KV, KvRemove { const d = deferred(); const buf: Promise[] = []; for (const k of keys) { - buf.push(this._delete(k)); + buf.push(this._doDeleteOrPurge(k, op)); } Promise.all(buf) .then(() => { @@ -372,99 +370,83 @@ export class Bucket implements KV, KvRemove { return d; } - consumerOn(k: string, history = false): Promise { + async _doDeleteOrPurge(k: string, op: "DEL" | "PURGE"): Promise { + const ek = this.encodeKey(k); + this.validateKey(ek); + const h = headers(); + h.set(kvOperationHdr, op); + if (op === "PURGE") { + h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject); + } + await this.js.publish(this.subjectForKey(ek), Empty, { headers: h }); + } + + _buildCC( + k: string, + history = false, + opts: Partial = {}, + ): Partial { const ek = this.encodeKey(k); this.validateSearchKey(k); - const ji = this.js as JetStreamClientImpl; - const nc = ji.nc; - const inbox = createInbox(nc.options.inboxPrefix); - const opts: Partial = { - "deliver_subject": inbox, + return Object.assign({ "deliver_policy": history ? DeliverPolicy.All : DeliverPolicy.LastPerSubject, - "ack_policy": AckPolicy.Explicit, + "ack_policy": AckPolicy.None, "filter_subject": this.subjectForKey(ek), "flow_control": true, - "idle_heartbeat": nanos(60 * 1000), - }; - return this.jsm.consumers.add(this.stream, opts); + "idle_heartbeat": nanos(5 * 1000), + }, opts) as Partial; } - async remove(k: string): Promise { - const ci = await this.consumerOn(k, true); - if (ci.num_pending === 0) { - await this.jsm.consumers.delete(this.stream, ci.name); - return; - } else { - const ji = this.js as JetStreamClientImpl; - const nc = ji.nc; - const buf: Promise[] = []; - const sub = nc.subscribe(ci.config.deliver_subject!, { - callback: (err, msg) => { - if (err === null) { - err = checkJsError(msg); - } - if (err) { - sub.unsubscribe(); - return; - } - if (isFlowControlMsg(msg) || isHeartbeatMsg(msg)) { - msg.respond(); - return; - } - const jm = toJsMsg(msg); - buf.push(this.jsm.streams.deleteMessage(this.stream, jm.seq)); - if (jm.info.pending === 0) { - sub.unsubscribe(); - } - jm.ack(); - }, - }); - if (buf.length) { - await Promise.all(buf); - } - await sub.closed; - } + /** + * @deprecated + */ + consumerOn(k: string, history = false): Promise { + return this.jsm.consumers.add( + this.stream, + this._buildCC(k, history, { + ack_policy: AckPolicy.Explicit, + deliver_subject: createInbox(), + }), + ); + } + + remove(k: string): Promise { + return this.purge(k); } async history(opts: { key?: string } = {}): Promise> { const k = opts.key ?? ">"; - const ci = await this.consumerOn(k, true); - const max = ci.num_pending; const qi = new QueuedIteratorImpl(); - if (max === 0) { - qi.stop(); - return qi; - } - const ji = this.jsm as JetStreamManagerImpl; - const nc = ji.nc; - const subj = ci.config.deliver_subject!; - const sub = nc.subscribe(subj, { - callback: (err, msg) => { - if (err === null) { - err = checkJsError(msg); - } - if (err) { - if (isNatsError(err)) { - qi.stop(err); - } - } else { - if (isFlowControlMsg(msg) || isHeartbeatMsg(msg)) { - msg.respond(); - return; - } - qi.received++; - const jm = toJsMsg(msg); - const e = this.jmToEntry(k, jm); - qi.push(e); - jm.ack(); - if (qi.received === max) { - sub.unsubscribe(); - } + const done = deferred(); + const cc = this._buildCC(k, true); + const subj = cc.filter_subject!; + const copts = consumerOpts(cc); + copts.orderedConsumer(); + copts.callback((err, jm) => { + if (err) { + // sub done + qi.stop(err); + return; + } + if (jm) { + const e = this.jmToEntry(k, jm); + qi.push(e); + qi.received++; + if (jm.info.pending === 0) { + done.resolve(); } - }, + } + }); + + const sub = await this.js.subscribe(subj, copts); + done.then(() => { + sub.unsubscribe(); + }); + done.catch((_err) => { + sub.unsubscribe(); }); qi.iterClosed.then(() => { sub.unsubscribe(); @@ -475,39 +457,37 @@ export class Bucket implements KV, KvRemove { qi.stop(err); }); + this.jsm.streams.getMessage(this.stream, { + "last_by_subj": subj, + }).catch(() => { + // we don't have a value for this + done.resolve(); + }); + return qi; } async watch(opts: { key?: string } = {}): Promise> { const k = opts.key ?? ">"; - const ci = await this.consumerOn(k, false); const qi = new QueuedIteratorImpl(); - - const ji = this.jsm as JetStreamManagerImpl; - const nc = ji.nc; - const subj = ci.config.deliver_subject!; - - const sub = nc.subscribe(subj, { - callback: (err, msg) => { - if (err === null) { - err = checkJsError(msg); - } - if (err) { - if (isNatsError(err)) { - qi.stop(err); - } - } else { - if (isFlowControlMsg(msg) || isHeartbeatMsg(msg)) { - msg.respond(); - return; - } - qi.received++; - const jm = toJsMsg(msg); - qi.push(this.jmToEntry(k, jm)); - jm.ack(); - } - }, + const cc = this._buildCC(k, false); + const subj = cc.filter_subject!; + const copts = consumerOpts(cc); + copts.orderedConsumer(); + copts.callback((err, jm) => { + if (err) { + // sub done + qi.stop(err); + return; + } + if (jm) { + const e = this.jmToEntry(k, jm); + qi.push(e); + qi.received++; + } }); + + const sub = await this.js.subscribe(subj, copts); qi.iterClosed.then(() => { sub.unsubscribe(); }); @@ -523,41 +503,42 @@ export class Bucket implements KV, KvRemove { async keys(k = ">"): Promise { const d = deferred(); const keys: string[] = []; - const ci = await this.consumerOn(k, false); - if (ci.num_pending === 0) { - return Promise.resolve(keys); - } - const ji = this.jsm as JetStreamManagerImpl; - const nc = ji.nc; - const subj = ci.config.deliver_subject!; - const sub = nc.subscribe(subj); - await (async () => { - for await (const m of sub) { - const err = checkJsError(m); - if (err) { + const cc = this._buildCC(k, false, { headers_only: true }); + const subj = cc.filter_subject!; + const copts = consumerOpts(cc); + copts.orderedConsumer(); + + const sub = await this.js.subscribe(subj, copts); + (async () => { + for await (const jm of sub) { + const op = jm.headers?.get(kvOperationHdr); + if (op !== "DEL" && op !== "PURGE") { + const key = this.decodeKey(jm.subject.substring(this.prefixLen)); + keys.push(key); + } + if (jm.info.pending === 0) { sub.unsubscribe(); - d.reject(err); - } else if (isFlowControlMsg(m) || isHeartbeatMsg(m)) { - m.respond(); - } else { - const jm = toJsMsg(m); - if (jm.headers?.get(kvOperationHdr) !== "DEL") { - const key = this.decodeKey(jm.subject.substring(this.prefixLen)); - keys.push(key); - } - m.respond(); - const info = parseInfo(m.reply!); - if (info.pending === 0) { - sub.unsubscribe(); - d.resolve(keys); - } } } - })(); + })() + .then(() => { + d.resolve(keys); + }) + .catch((err) => { + d.reject(err); + }); + + this.jsm.streams.getMessage(this.stream, { + "last_by_subj": subj, + }).catch(() => { + // we don't have a value for this + sub.unsubscribe(); + }); + return d; } - purge(opts?: PurgeOpts): Promise { + purgeBucket(opts?: PurgeOpts): Promise { return this.jsm.streams.purge(this.bucketName(), opts); } diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index 09458187..e429f850 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -813,10 +813,9 @@ export interface KvEntry { key: string; value: Uint8Array; created: Date; - seq: number; + revision: number; delta?: number; - "origin_cluster"?: string; - operation: "PUT" | "DEL"; + operation: "PUT" | "DEL" | "PURGE"; } export interface KvCodec { @@ -834,7 +833,6 @@ export interface KvStatus { values: number; history: number; ttl: Nanos; - cluster?: string; backingStore: StorageType; } @@ -851,6 +849,9 @@ export interface KvOptions { codec: KvCodecs; } +/** + * @deprecated use purge(k) + */ export interface KvRemove { remove(k: string): Promise; } @@ -865,13 +866,15 @@ export interface RoKV { } export interface KV extends RoKV { + create(k: string, data: Uint8Array): Promise; + update(k: string, data: Uint8Array, version: number): Promise; put( k: string, data: Uint8Array, opts?: Partial, ): Promise; delete(k: string): Promise; - purge(opts?: PurgeOpts): Promise; + purge(k: string): Promise; destroy(): Promise; } diff --git a/tests/jetstream_test.ts b/tests/jetstream_test.ts index 88ffc18e..d500dec8 100644 --- a/tests/jetstream_test.ts +++ b/tests/jetstream_test.ts @@ -2458,7 +2458,7 @@ Deno.test("jetstream - rollup all", async () => { const opts = consumerOpts(); opts.manualAck(); opts.deliverTo(createInbox()); - opts.callback((err, jm) => { + opts.callback((_err, jm) => { assert(jm); assertEquals(jm.subject, `${stream}.summary`); const obj = jc.decode(jm.data) as Record; @@ -2589,7 +2589,7 @@ Deno.test("jetstream - headers only", async () => { opts.deliverTo(createInbox()); opts.headersOnly(); opts.manualAck(); - opts.callback((err, jm) => { + opts.callback((_err, jm) => { assert(jm); assert(jm.headers); const size = parseInt(jm.headers.get(JsHeaders.MessageSizeHdr), 10); diff --git a/tests/kv_test.ts b/tests/kv_test.ts index ec975968..4707a83d 100644 --- a/tests/kv_test.ts +++ b/tests/kv_test.ts @@ -27,6 +27,7 @@ import { assertArrayIncludes, assertEquals, assertThrows, + assertThrowsAsync, } from "https://deno.land/std@0.95.0/testing/asserts.ts"; import { KvEntry } from "../nats-base-client/types.ts"; @@ -120,7 +121,7 @@ Deno.test("kv - bucket name validation", () => { Deno.test("kv - init creates stream", async () => { const { ns, nc } = await setup(jetstreamServerConf({}, true)); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const jsm = await nc.jetstreamManager(); @@ -170,7 +171,7 @@ async function crud(bucket: Bucket): Promise { assertEquals(buf[1], "bye"); assertEquals(buf[2], ""); - const pr = await bucket.purge(); + const pr = await bucket.purgeBucket(); assertEquals(pr.purged, 3); assert(pr.success); @@ -185,7 +186,7 @@ Deno.test("kv - crud", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const n = nuid.next(); @@ -197,7 +198,7 @@ Deno.test("kv - codec crud", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const jsm = await nc.jetstreamManager(); @@ -220,7 +221,7 @@ Deno.test("kv - history", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const n = nuid.next(); @@ -243,7 +244,7 @@ Deno.test("kv - cleanups/empty", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const n = nuid.next(); @@ -268,7 +269,7 @@ Deno.test("kv - history cleanup", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const n = nuid.next(); @@ -297,7 +298,7 @@ Deno.test("kv - bucket watch", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const sc = StringCodec(); @@ -378,7 +379,7 @@ Deno.test("kv - key watch", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const bucket = await Bucket.create(nc, nuid.next()) as Bucket; @@ -395,7 +396,7 @@ Deno.test("kv - codec key watch", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const bucket = await Bucket.create(nc, nuid.next(), { @@ -434,7 +435,7 @@ Deno.test("kv - keys", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const b = await Bucket.create(nc, nuid.next()); @@ -451,7 +452,7 @@ Deno.test("kv - codec keys", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const b = await Bucket.create(nc, nuid.next(), { @@ -474,7 +475,7 @@ Deno.test("kv - ttl", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } @@ -501,7 +502,7 @@ Deno.test("kv - no ttl", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const sc = StringCodec(); @@ -524,7 +525,7 @@ Deno.test("kv - complex key", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const sc = StringCodec(); @@ -569,7 +570,7 @@ Deno.test("kv - remove key", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const sc = StringCodec(); @@ -580,12 +581,14 @@ Deno.test("kv - remove key", async () => { assert(v); assertEquals(sc.decode(v.value), "ab"); - await b.remove("a.b"); + await b.purge("a.b"); v = await b.get("a.b"); - assertEquals(v, null); + assert(v); + assertEquals(v.operation, "PURGE"); const status = await b.status(); - assertEquals(status.values, 0); + // the purged value + assertEquals(status.values, 1); await cleanup(ns, nc); }); @@ -594,7 +597,7 @@ Deno.test("kv - remove subkey", async () => { const { ns, nc } = await setup( jetstreamServerConf({}, true), ); - if (await notCompatible(ns, nc)) { + if (await notCompatible(ns, nc, "2.6.3")) { return; } const b = await Bucket.create(nc, nuid.next()) as Bucket; @@ -613,3 +616,47 @@ Deno.test("kv - remove subkey", async () => { await cleanup(ns, nc); }); + +Deno.test("kv - create key", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}, true), + ); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const b = await Bucket.create(nc, nuid.next()) as Bucket; + const sc = StringCodec(); + await b.create("a", Empty); + await assertThrowsAsync( + async () => { + await b.create("a", sc.encode("a")); + }, + Error, + "wrong last sequence: 1", + ); + + await cleanup(ns, nc); +}); + +Deno.test("kv - update key", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}, true), + ); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + const b = await Bucket.create(nc, nuid.next()) as Bucket; + const sc = StringCodec(); + const seq = await b.create("a", Empty); + await assertThrowsAsync( + async () => { + await b.update("a", sc.encode("a"), 100); + }, + Error, + "wrong last sequence: 1", + ); + + await b.update("a", sc.encode("b"), seq); + + await cleanup(ns, nc); +});