From 4af5a1b3fc73d7ba350e10d3dcaa26fd141d75af Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Mon, 16 May 2022 14:57:35 -0500 Subject: [PATCH 1/2] [FEAT] implements the ability in KV to request a specific revision for a key FIX #301 --- nats-base-client/kv.ts | 16 ++++++++++++---- nats-base-client/types.ts | 2 +- tests/jetstream_test.ts | 3 --- tests/kv_test.ts | 32 ++++++++++++++++++++++++++++++++ 4 files changed, 45 insertions(+), 8 deletions(-) diff --git a/nats-base-client/kv.ts b/nats-base-client/kv.ts index 404dda34..4312f518 100644 --- a/nats-base-client/kv.ts +++ b/nats-base-client/kv.ts @@ -35,6 +35,7 @@ import { KvPutOptions, KvRemove, KvStatus, + MsgRequest, PurgeOpts, PurgeResponse, RetentionPolicy, @@ -367,13 +368,20 @@ export class Bucket implements KV, KvRemove { return pa.seq; } - async get(k: string): Promise { + async get( + k: string, + opts: { revision: number } = { revision: 0 }, + ): Promise { const ek = this.encodeKey(k); this.validateKey(ek); + + let arg: MsgRequest = { last_by_subj: this.fullKeyName(ek) }; + if (opts.revision !== 0) { + arg = { seq: opts.revision }; + } + try { - const sm = await this.jsm.streams.getMessage(this.bucketName(), { - last_by_subj: this.fullKeyName(ek), - }); + const sm = await this.jsm.streams.getMessage(this.bucketName(), arg); return this.smToEntry(k, sm); } catch (err) { if (err.message === "no message found") { diff --git a/nats-base-client/types.ts b/nats-base-client/types.ts index 004b83bc..e2dd2e63 100644 --- a/nats-base-client/types.ts +++ b/nats-base-client/types.ts @@ -914,7 +914,7 @@ export interface KvRemove { } export interface RoKV { - get(k: string): Promise; + get(k: string, opts?: { revision: number }): Promise; history(opts?: { key?: string }): Promise>; watch( opts?: { key?: string; headers_only?: boolean; initializedFn?: callbackFn }, diff --git a/tests/jetstream_test.ts b/tests/jetstream_test.ts index 5632cba8..b84e3e0c 100644 --- a/tests/jetstream_test.ts +++ b/tests/jetstream_test.ts @@ -39,8 +39,6 @@ import { JsMsg, JsMsgCallback, JSONCodec, - millis, - Nanos, nanos, NatsConnectionImpl, NatsError, @@ -51,7 +49,6 @@ import { StringCodec, } from "../nats-base-client/internal_mod.ts"; import { - assertArrayIncludes, assertEquals, assertRejects, assertThrows, diff --git a/tests/kv_test.ts b/tests/kv_test.ts index cc8d8385..469a14db 100644 --- a/tests/kv_test.ts +++ b/tests/kv_test.ts @@ -1271,3 +1271,35 @@ Deno.test("kv - watch init callback exceptions terminate the iterator", async () assertEquals(err.message, "crash"); await cleanup(ns, nc); }); + +Deno.test("kv - get revision", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}, true), + ); + const js = nc.jetstream(); + const sc = StringCodec(); + + const b = await js.views.kv("a", { history: 3 }) as Bucket; + await b.put("A", sc.encode("a")); + await b.put("A", sc.encode("b")); + await b.put("A", sc.encode("c")); + + async function check(value: string | null, revision = 0) { + const e = await b.get("A", { revision }); + if (value === null) { + assertEquals(e, null); + } else { + assertEquals(sc.decode(e!.value), value); + } + } + + await check("c"); + await check("a", 1); + await check("b", 2); + + await b.put("A", sc.encode("d")); + await check("d"); + await check(null, 1); + + await cleanup(ns, nc); +}); From 6bb73321442fc35c75e370b6b53fe08371ab9085 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Mon, 16 May 2022 17:08:53 -0500 Subject: [PATCH 2/2] [FEAT] added check when retrieving a key, but also specifying a sequence (new feature), that the key matched. If not, null is returned as the sequence is not referencing the expected key. [FIX] conversion from stored message captured the key as provided by the client, and returned an entry with the said key. --- nats-base-client/kv.ts | 20 ++++++++++++-------- tests/kv_test.ts | 26 +++++++++++++++----------- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/nats-base-client/kv.ts b/nats-base-client/kv.ts index 4312f518..cf600e6d 100644 --- a/nats-base-client/kv.ts +++ b/nats-base-client/kv.ts @@ -312,10 +312,10 @@ export class Bucket implements KV, KvRemove { return data.length; } - smToEntry(key: string, sm: StoredMsg): KvEntry { + smToEntry(sm: StoredMsg): KvEntry { return { bucket: this.bucket, - key: key, + key: sm.subject.substring(this.prefixLen), value: sm.data, delta: 0, created: sm.time, @@ -325,7 +325,7 @@ export class Bucket implements KV, KvRemove { }; } - jmToEntry(_k: string, jm: JsMsg): KvEntry { + jmToEntry(jm: JsMsg): KvEntry { const key = this.decodeKey(jm.subject.substring(this.prefixLen)); return { bucket: this.bucket, @@ -370,19 +370,23 @@ export class Bucket implements KV, KvRemove { async get( k: string, - opts: { revision: number } = { revision: 0 }, + opts?: { revision: number }, ): Promise { const ek = this.encodeKey(k); this.validateKey(ek); let arg: MsgRequest = { last_by_subj: this.fullKeyName(ek) }; - if (opts.revision !== 0) { + if (opts && opts.revision > 0) { arg = { seq: opts.revision }; } try { const sm = await this.jsm.streams.getMessage(this.bucketName(), arg); - return this.smToEntry(k, sm); + const ke = this.smToEntry(sm); + if (ke.key !== ek) { + return null; + } + return ke; } catch (err) { if (err.message === "no message found") { return null; @@ -476,7 +480,7 @@ export class Bucket implements KV, KvRemove { return; } if (jm) { - const e = this.jmToEntry(k, jm); + const e = this.jmToEntry(jm); qi.push(e); qi.received++; //@ts-ignore - function will be removed @@ -553,7 +557,7 @@ export class Bucket implements KV, KvRemove { return; } if (jm) { - const e = this.jmToEntry(k, jm); + const e = this.jmToEntry(jm); qi.push(e); qi.received++; diff --git a/tests/kv_test.ts b/tests/kv_test.ts index 469a14db..c1d9145e 100644 --- a/tests/kv_test.ts +++ b/tests/kv_test.ts @@ -1279,13 +1279,10 @@ Deno.test("kv - get revision", async () => { const js = nc.jetstream(); const sc = StringCodec(); - const b = await js.views.kv("a", { history: 3 }) as Bucket; - await b.put("A", sc.encode("a")); - await b.put("A", sc.encode("b")); - await b.put("A", sc.encode("c")); + const b = await js.views.kv(nuid.next(), { history: 3 }) as Bucket; - async function check(value: string | null, revision = 0) { - const e = await b.get("A", { revision }); + async function check(key: string, value: string | null, revision = 0) { + const e = await b.get(key, { revision }); if (value === null) { assertEquals(e, null); } else { @@ -1293,13 +1290,20 @@ Deno.test("kv - get revision", async () => { } } - await check("c"); - await check("a", 1); - await check("b", 2); + await b.put("A", sc.encode("a")); + await b.put("A", sc.encode("b")); + await b.put("A", sc.encode("c")); + + // expect null, as sequence 1, holds "A" + await check("B", null, 1); + + await check("A", "c"); + await check("A", "a", 1); + await check("A", "b", 2); await b.put("A", sc.encode("d")); - await check("d"); - await check(null, 1); + await check("A", "d"); + await check("A", null, 1); await cleanup(ns, nc); });