diff --git a/tests/jetstream_test.ts b/tests/jetstream_test.ts index 2a8ac980..addc79f1 100644 --- a/tests/jetstream_test.ts +++ b/tests/jetstream_test.ts @@ -1170,9 +1170,8 @@ Deno.test("jetstream - fetch one - breaks after expires", async () => { durable_name: "me", ack_policy: AckPolicy.Explicit, }); - nc.publish(subj); - const js = nc.jetstream(); + await js.publish(subj); const sw = time(); const batch = js.fetch(stream, "me", { @@ -1920,7 +1919,8 @@ Deno.test("jetstream - idle heartbeats", async () => { const { ns, nc } = await setup(jetstreamServerConf({}, true)); const { stream, subj } = await initStream(nc); - nc.publish(subj); + const js = nc.jetstream(); + await js.publish(subj); const jsm = await nc.jetstreamManager(); const inbox = createInbox(); await jsm.consumers.add(stream, { @@ -1947,10 +1947,12 @@ Deno.test("jetstream - flow control", async () => { const { ns, nc } = await setup(jetstreamServerConf({}, true)); const { stream, subj } = await initStream(nc); const data = new Uint8Array(1024 * 100); + const js = nc.jetstream(); + const proms = []; for (let i = 0; i < 2000; i++) { - nc.publish(subj, data); + proms.push(js.publish(subj, data)); } - await nc.flush(); + await Promise.all(proms); const jsm = await nc.jetstreamManager(); const inbox = createInbox(); @@ -2666,7 +2668,7 @@ Deno.test("jetstream - rollup subject", async () => { const h = headers(); h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject); - await js.publish(`${stream}.A`, jc.encode({ value: 42 }), { + await js.publish(`${stream}.A`, jc.encode({ value: 0 }), { headers: h, }); diff --git a/tests/jsm_test.ts b/tests/jsm_test.ts index 6d0808de..cc50d7f7 100644 --- a/tests/jsm_test.ts +++ b/tests/jsm_test.ts @@ -279,7 +279,7 @@ Deno.test("jsm - add stream", async () => { fn(lister[0]); // add some data - nc.publish(name, Empty); + await nc.jetstream().publish(name, Empty); si = await jsm.streams.info(name); lister = await jsm.streams.list().next(); fn(lister[0]); @@ -321,7 +321,7 @@ Deno.test("jsm - stream purge", async () => { const { stream, subj } = await initStream(nc); const jsm = await nc.jetstreamManager(); - nc.publish(subj, Empty); + await nc.jetstream().publish(subj, Empty); let si = await jsm.streams.info(stream); assertEquals(si.state.messages, 1); @@ -341,15 +341,18 @@ Deno.test("jsm - purge by sequence", async () => { { name: stream, subjects: [`${stream}.*`] }, ); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); + const js = nc.jetstream(); + await Promise.all([ + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + ]); const pi = await jsm.streams.purge(stream, { seq: 4 }); assertEquals(pi.purged, 3); @@ -367,15 +370,18 @@ Deno.test("jsm - purge by filtered sequence", async () => { { name: stream, subjects: [`${stream}.*`] }, ); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); + const js = nc.jetstream(); + await Promise.all([ + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + ]); const pi = await jsm.streams.purge(stream, { seq: 4, filter: `${stream}.b` }); assertEquals(pi.purged, 1); @@ -394,15 +400,18 @@ Deno.test("jsm - purge by subject", async () => { { name: stream, subjects: [`${stream}.*`] }, ); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); + const js = nc.jetstream(); + await Promise.all([ + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + ]); const pi = await jsm.streams.purge(stream, { filter: `${stream}.b` }); assertEquals(pi.purged, 3); @@ -421,15 +430,18 @@ Deno.test("jsm - purge by subject", async () => { { name: stream, subjects: [`${stream}.*`] }, ); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); + const js = nc.jetstream(); + await Promise.all([ + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + ]); const pi = await jsm.streams.purge(stream, { filter: `${stream}.b` }); assertEquals(pi.purged, 3); @@ -448,15 +460,19 @@ Deno.test("jsm - purge keep", async () => { { name: stream, subjects: [`${stream}.*`] }, ); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); + const js = nc.jetstream(); + + await Promise.all([ + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + ]); const pi = await jsm.streams.purge(stream, { keep: 1 }); assertEquals(pi.purged, 8); @@ -475,15 +491,18 @@ Deno.test("jsm - purge filtered keep", async () => { { name: stream, subjects: [`${stream}.*`] }, ); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); - nc.publish(`${stream}.a`); - nc.publish(`${stream}.b`); - nc.publish(`${stream}.c`); + const js = nc.jetstream(); + await Promise.all([ + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + js.publish(`${stream}.a`), + js.publish(`${stream}.b`), + js.publish(`${stream}.c`), + ]); let pi = await jsm.streams.purge(stream, { keep: 1, filter: `${stream}.a` }); assertEquals(pi.purged, 2); @@ -517,7 +536,7 @@ Deno.test("jsm - stream delete", async () => { const { stream, subj } = await initStream(nc); const jsm = await nc.jetstreamManager(); - nc.publish(subj, Empty); + await nc.jetstream().publish(subj, Empty); await jsm.streams.delete(stream); await assertRejects( async () => { @@ -534,7 +553,7 @@ Deno.test("jsm - stream delete message", async () => { const { stream, subj } = await initStream(nc); const jsm = await nc.jetstreamManager(); - nc.publish(subj, Empty); + await nc.jetstream().publish(subj, Empty); let si = await jsm.streams.info(stream); assertEquals(si.state.messages, 1); @@ -703,8 +722,10 @@ Deno.test("jsm - get message", async () => { const jc = JSONCodec(); const h = headers(); h.set("xxx", "a"); - nc.publish(subj, jc.encode(1), { headers: h }); - nc.publish(subj, jc.encode(2)); + + const js = nc.jetstream(); + await js.publish(subj, jc.encode(1), { headers: h }); + await js.publish(subj, jc.encode(2)); const jsm = await nc.jetstreamManager(); let sm = await jsm.streams.getMessage(stream, { seq: 1 });