Skip to content

Commit

Permalink
made tests faster
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Mar 1, 2023
1 parent cefcd4c commit dace717
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 66 deletions.
14 changes: 8 additions & 6 deletions tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1170,9 +1170,8 @@ Deno.test("jetstream - fetch one - breaks after expires", async () => {
durable_name: "me",
ack_policy: AckPolicy.Explicit,
});
nc.publish(subj);

const js = nc.jetstream();
await js.publish(subj);

const sw = time();
const batch = js.fetch(stream, "me", {
Expand Down Expand Up @@ -1920,7 +1919,8 @@ Deno.test("jetstream - idle heartbeats", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));

const { stream, subj } = await initStream(nc);
nc.publish(subj);
const js = nc.jetstream();
await js.publish(subj);
const jsm = await nc.jetstreamManager();
const inbox = createInbox();
await jsm.consumers.add(stream, {
Expand All @@ -1947,10 +1947,12 @@ Deno.test("jetstream - flow control", async () => {
const { ns, nc } = await setup(jetstreamServerConf({}, true));
const { stream, subj } = await initStream(nc);
const data = new Uint8Array(1024 * 100);
const js = nc.jetstream();
const proms = [];
for (let i = 0; i < 2000; i++) {
nc.publish(subj, data);
proms.push(js.publish(subj, data));
}
await nc.flush();
await Promise.all(proms);

const jsm = await nc.jetstreamManager();
const inbox = createInbox();
Expand Down Expand Up @@ -2666,7 +2668,7 @@ Deno.test("jetstream - rollup subject", async () => {

const h = headers();
h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject);
await js.publish(`${stream}.A`, jc.encode({ value: 42 }), {
await js.publish(`${stream}.A`, jc.encode({ value: 0 }), {
headers: h,
});

Expand Down
141 changes: 81 additions & 60 deletions tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ Deno.test("jsm - add stream", async () => {
fn(lister[0]);

// add some data
nc.publish(name, Empty);
await nc.jetstream().publish(name, Empty);
si = await jsm.streams.info(name);
lister = await jsm.streams.list().next();
fn(lister[0]);
Expand Down Expand Up @@ -321,7 +321,7 @@ Deno.test("jsm - stream purge", async () => {
const { stream, subj } = await initStream(nc);
const jsm = await nc.jetstreamManager();

nc.publish(subj, Empty);
await nc.jetstream().publish(subj, Empty);

let si = await jsm.streams.info(stream);
assertEquals(si.state.messages, 1);
Expand All @@ -341,15 +341,18 @@ Deno.test("jsm - purge by sequence", async () => {
{ name: stream, subjects: [`${stream}.*`] },
);

nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
const js = nc.jetstream();
await Promise.all([
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
]);

const pi = await jsm.streams.purge(stream, { seq: 4 });
assertEquals(pi.purged, 3);
Expand All @@ -367,15 +370,18 @@ Deno.test("jsm - purge by filtered sequence", async () => {
{ name: stream, subjects: [`${stream}.*`] },
);

nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
const js = nc.jetstream();
await Promise.all([
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
]);

const pi = await jsm.streams.purge(stream, { seq: 4, filter: `${stream}.b` });
assertEquals(pi.purged, 1);
Expand All @@ -394,15 +400,18 @@ Deno.test("jsm - purge by subject", async () => {
{ name: stream, subjects: [`${stream}.*`] },
);

nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
const js = nc.jetstream();
await Promise.all([
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
]);

const pi = await jsm.streams.purge(stream, { filter: `${stream}.b` });
assertEquals(pi.purged, 3);
Expand All @@ -421,15 +430,18 @@ Deno.test("jsm - purge by subject", async () => {
{ name: stream, subjects: [`${stream}.*`] },
);

nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
const js = nc.jetstream();
await Promise.all([
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
]);

const pi = await jsm.streams.purge(stream, { filter: `${stream}.b` });
assertEquals(pi.purged, 3);
Expand All @@ -448,15 +460,19 @@ Deno.test("jsm - purge keep", async () => {
{ name: stream, subjects: [`${stream}.*`] },
);

nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
const js = nc.jetstream();

await Promise.all([
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
]);

const pi = await jsm.streams.purge(stream, { keep: 1 });
assertEquals(pi.purged, 8);
Expand All @@ -475,15 +491,18 @@ Deno.test("jsm - purge filtered keep", async () => {
{ name: stream, subjects: [`${stream}.*`] },
);

nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
nc.publish(`${stream}.a`);
nc.publish(`${stream}.b`);
nc.publish(`${stream}.c`);
const js = nc.jetstream();
await Promise.all([
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
js.publish(`${stream}.a`),
js.publish(`${stream}.b`),
js.publish(`${stream}.c`),
]);

let pi = await jsm.streams.purge(stream, { keep: 1, filter: `${stream}.a` });
assertEquals(pi.purged, 2);
Expand Down Expand Up @@ -517,7 +536,7 @@ Deno.test("jsm - stream delete", async () => {
const { stream, subj } = await initStream(nc);
const jsm = await nc.jetstreamManager();

nc.publish(subj, Empty);
await nc.jetstream().publish(subj, Empty);
await jsm.streams.delete(stream);
await assertRejects(
async () => {
Expand All @@ -534,7 +553,7 @@ Deno.test("jsm - stream delete message", async () => {
const { stream, subj } = await initStream(nc);
const jsm = await nc.jetstreamManager();

nc.publish(subj, Empty);
await nc.jetstream().publish(subj, Empty);

let si = await jsm.streams.info(stream);
assertEquals(si.state.messages, 1);
Expand Down Expand Up @@ -703,8 +722,10 @@ Deno.test("jsm - get message", async () => {
const jc = JSONCodec();
const h = headers();
h.set("xxx", "a");
nc.publish(subj, jc.encode(1), { headers: h });
nc.publish(subj, jc.encode(2));

const js = nc.jetstream();
await js.publish(subj, jc.encode(1), { headers: h });
await js.publish(subj, jc.encode(2));

const jsm = await nc.jetstreamManager();
let sm = await jsm.streams.getMessage(stream, { seq: 1 });
Expand Down

0 comments on commit dace717

Please sign in to comment.