Skip to content

Commit

Permalink
[FIX] flow control/protocol messages now handled when they would be d…
Browse files Browse the repository at this point in the history
…ispatched.

The queued iterator has a filterFn that tests the inbound message to see if it should be presented to the callback/iterator.
This enables the processing of the flow control messages when the user would have seen it, rather than when the protocol injected it into the subscription.
  • Loading branch information
aricart committed Oct 13, 2021
1 parent eb2c6b9 commit 046f4df
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 46 deletions.
41 changes: 16 additions & 25 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import {
validateStreamName,
} from "./jsutil.ts";
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
import { toJsMsg } from "./jsmsg.ts";
import { JsMsgImpl, toJsMsg } from "./jsmsg.ts";
import {
MsgAdapter,
TypedSubscription,
Expand Down Expand Up @@ -179,6 +179,8 @@ export class JetStreamClientImpl extends BaseApiClient
const qi = new QueuedIteratorImpl<JsMsg>();
const wants = args.batch;
let received = 0;
// FIXME: this looks weird, we want to stop the iterator
// but doing it from a dispatchedFn...
qi.dispatchedFn = (m: JsMsg | null) => {
if (m) {
received++;
Expand Down Expand Up @@ -384,6 +386,15 @@ export class JetStreamClientImpl extends BaseApiClient
if (jsi.callbackFn) {
so.callback = jsi.callbackFn;
}
so.filterFn = (jm): boolean => {
const jsmi = jm as JsMsgImpl;
if (isFlowControlMsg(jsmi.msg)) {
// FIXME: ordered consumer needs to work on this
jsmi.msg.respond();
return false;
}
return true;
};
if (!jsi.mack) {
so.dispatchedFn = autoAckJsMsg;
}
Expand Down Expand Up @@ -503,18 +514,8 @@ function cbMsgAdapter(
if (err) {
return [err, null];
}
if (isFlowControlMsg(msg)) {
msg.respond();
return [null, null];
}
const jm = toJsMsg(msg);
try {
// this will throw if not a JsMsg
jm.info;
return [null, jm];
} catch (err) {
return [err, null];
}
// assuming that the filterFn is set!
return [null, toJsMsg(msg)];
}

function iterMsgAdapter(
Expand All @@ -538,18 +539,8 @@ function iterMsgAdapter(
return [ne, null];
}
}
if (isFlowControlMsg(msg)) {
msg.respond();
return [null, null];
}
const jm = toJsMsg(msg);
try {
// this will throw if not a JsMsg
jm.info;
return [null, jm];
} catch (err) {
return [err, null];
}
// assuming that the filterFn is set
return [null, toJsMsg(msg)];
}

function autoAckJsMsg(data: JsMsg | null) {
Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export function parseInfo(s: string): DeliveryInfo {
return di;
}

class JsMsgImpl implements JsMsg {
export class JsMsgImpl implements JsMsg {
msg: Msg;
di?: DeliveryInfo;
didAck: boolean;
Expand Down
13 changes: 10 additions & 3 deletions nats-base-client/queued_iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export interface Dispatcher<T> {
push(v: T): void;
}

export type FilterFn<T> = (data: T | null) => boolean;
export type DispatchedFn<T> = (data: T | null) => void;

export interface QueuedIterator<T> extends Dispatcher<T> {
Expand All @@ -32,12 +33,14 @@ export interface QueuedIterator<T> extends Dispatcher<T> {
export class QueuedIteratorImpl<T> implements QueuedIterator<T> {
inflight: number;
processed: number;
received: number; // this is updated by the protocol
// FIXME: this is updated by the protocol
received: number;
protected noIterator: boolean;
iterClosed: Deferred<void>;
protected done: boolean;
private signal: Deferred<void>;
private yields: T[];
filterFn?: FilterFn<T>;
dispatchedFn?: DispatchedFn<T>;
private err?: Error;

Expand Down Expand Up @@ -80,8 +83,12 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T> {
this.inflight = yields.length;
this.yields = [];
for (let i = 0; i < yields.length; i++) {
this.processed++;
yield yields[i];
// only pass messages that pass the filter
const ok = this.filterFn ? this.filterFn(yields[i]) : true;
if (ok) {
this.processed++;
yield yields[i];
}
if (this.dispatchedFn && yields[i]) {
this.dispatchedFn(yields[i]);
}
Expand Down
24 changes: 18 additions & 6 deletions nats-base-client/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { DispatchedFn, QueuedIteratorImpl } from "./queued_iterator.ts";
import {
DispatchedFn,
FilterFn,
QueuedIteratorImpl,
} from "./queued_iterator.ts";
import type { Base, Msg, Subscription, SubscriptionOptions } from "./types.ts";
import { Deferred, deferred, extend, Timeout, timeout } from "./util.ts";
import { ErrorCode, NatsError } from "./error.ts";
Expand Down Expand Up @@ -69,16 +73,24 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
}
}

setDispatchedFn(cb: DispatchedFn<Msg>) {
// user specified callback
setPrePostHandlers(
opts: { filterFn?: FilterFn<Msg>; dispatchedFn?: DispatchedFn<Msg> },
) {
if (this.noIterator) {
const uc = this.callback;
const filter = opts.filterFn ? opts.filterFn : () => {
return true;
};
const dispatched = opts.dispatchedFn ? opts.dispatchedFn : () => {};
this.callback = (err: NatsError | null, msg: Msg) => {
uc(err, msg);
cb(msg);
if (filter(msg)) {
uc(err, msg);
}
dispatched(msg);
};
} else {
this.dispatchedFn = cb;
this.filterFn = opts.filterFn;
this.dispatchedFn = opts.dispatchedFn;
}
}

Expand Down
12 changes: 10 additions & 2 deletions nats-base-client/typedsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/

import { Deferred, deferred } from "./util.ts";
import type { DispatchedFn } from "./queued_iterator.ts";
import type { DispatchedFn, FilterFn } from "./queued_iterator.ts";
import type {
Msg,
NatsConnection,
Expand Down Expand Up @@ -48,6 +48,7 @@ export type TypedCallback<T> = (err: NatsError | null, msg: T | null) => void;
export interface TypedSubscriptionOptions<T> extends SubOpts<T> {
adapter: MsgAdapter<T>;
callback?: TypedCallback<T>;
filterFn?: FilterFn<T>;
dispatchedFn?: DispatchedFn<T>;
cleanupFn?: (sub: Subscription, info?: unknown) => void;
}
Expand Down Expand Up @@ -94,6 +95,10 @@ export class TypedSubscription<T> extends QueuedIteratorImpl<T>
}
this.noIterator = typeof opts.callback === "function";

if (opts.filterFn) {
checkFn(opts.dispatchedFn, "dispatchedFn");
this.filterFn = opts.filterFn;
}
if (opts.dispatchedFn) {
checkFn(opts.dispatchedFn, "dispatchedFn");
this.dispatchedFn = opts.dispatchedFn;
Expand All @@ -109,7 +114,10 @@ export class TypedSubscription<T> extends QueuedIteratorImpl<T>
const uh = opts.callback;
callback = (err: NatsError | null, msg: Msg) => {
const [jer, tm] = this.adapter(err, msg);
uh(jer, tm);
const ok = this.filterFn ? this.filterFn(tm) : true;
if (ok) {
uh(jer, tm);
}
if (this.dispatchedFn && tm) {
this.dispatchedFn(tm);
}
Expand Down
93 changes: 84 additions & 9 deletions tests/sub_extensions_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
assert,
assertEquals,
} from "https://deno.land/std@0.95.0/testing/asserts.ts";
import { createInbox, Msg } from "../src/mod.ts";
import { createInbox, Msg, StringCodec } from "../src/mod.ts";
import {
deferred,
SubscriptionImpl,
Expand Down Expand Up @@ -104,10 +104,13 @@ Deno.test("extensions - dispatched called on callback", async () => {
const subj = createInbox();
const sub = nc.subscribe(subj, { callback: () => {} }) as SubscriptionImpl;
let count = 0;
sub.setDispatchedFn((msg: Msg | null) => {
if (msg) {
count++;
}

sub.setPrePostHandlers({
dispatchedFn: (msg: Msg | null) => {
if (msg) {
count++;
}
},
});
nc.publish(subj);
await nc.flush();
Expand All @@ -120,10 +123,12 @@ Deno.test("extensions - dispatched called on iterator", async () => {
const subj = createInbox();
const sub = nc.subscribe(subj, { max: 1 }) as SubscriptionImpl;
let count = 0;
sub.setDispatchedFn((msg: Msg | null) => {
if (msg) {
count++;
}
sub.setPrePostHandlers({
dispatchedFn: (msg: Msg | null) => {
if (msg) {
count++;
}
},
});
const done = (async () => {
for await (const _m of sub) {
Expand All @@ -136,3 +141,73 @@ Deno.test("extensions - dispatched called on iterator", async () => {
assertEquals(count, 1);
await cleanup(ns, nc);
});

Deno.test("extensions - filter called on callback", async () => {
const { ns, nc } = await setup();
const subj = createInbox();
const sub = nc.subscribe(subj, {
max: 3,
callback: (err, m) => {
filtered.push(sc.decode(m.data));
},
}) as SubscriptionImpl;
const sc = StringCodec();
const all: string[] = [];
const filtered: string[] = [];
sub.setPrePostHandlers({
filterFn: (msg: Msg | null): boolean => {
if (msg) {
return sc.decode(msg.data).startsWith("A");
}
return false;
},
dispatchedFn: (msg: Msg | null): void => {
all.push(msg ? sc.decode(msg.data) : "");
},
});

nc.publish(subj, sc.encode("A"));
nc.publish(subj, sc.encode("B"));
nc.publish(subj, sc.encode("C"));
await sub.closed;

assertEquals(filtered, ["A"]);
assertEquals(all, ["A", "B", "C"]);

await cleanup(ns, nc);
});

Deno.test("extensions - filter called on iterator", async () => {
const { ns, nc } = await setup();
const subj = createInbox();
const sub = nc.subscribe(subj, { max: 3 }) as SubscriptionImpl;
const sc = StringCodec();
const all: string[] = [];
const filtered: string[] = [];
sub.setPrePostHandlers({
filterFn: (msg: Msg | null): boolean => {
if (msg) {
return sc.decode(msg.data).startsWith("A");
}
return false;
},
dispatchedFn: (msg: Msg | null): void => {
all.push(msg ? sc.decode(msg.data) : "");
},
});
const done = (async () => {
for await (const m of sub) {
filtered.push(sc.decode(m.data));
}
})();

nc.publish(subj, sc.encode("A"));
nc.publish(subj, sc.encode("B"));
nc.publish(subj, sc.encode("C"));
await done;

assertEquals(filtered, ["A"]);
assertEquals(all, ["A", "B", "C"]);

await cleanup(ns, nc);
});

0 comments on commit 046f4df

Please sign in to comment.