Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] flow control/protocol messages now handled when they would be dispatched. #208

Merged
merged 3 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 16 additions & 25 deletions nats-base-client/jsclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import {
validateStreamName,
} from "./jsutil.ts";
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
import { toJsMsg } from "./jsmsg.ts";
import { JsMsgImpl, toJsMsg } from "./jsmsg.ts";
import {
MsgAdapter,
TypedSubscription,
Expand Down Expand Up @@ -179,6 +179,8 @@ export class JetStreamClientImpl extends BaseApiClient
const qi = new QueuedIteratorImpl<JsMsg>();
const wants = args.batch;
let received = 0;
// FIXME: this looks weird, we want to stop the iterator
// but doing it from a dispatchedFn...
qi.dispatchedFn = (m: JsMsg | null) => {
if (m) {
received++;
Expand Down Expand Up @@ -384,6 +386,15 @@ export class JetStreamClientImpl extends BaseApiClient
if (jsi.callbackFn) {
so.callback = jsi.callbackFn;
}
so.protocolFilterFn = (jm): boolean => {
const jsmi = jm as JsMsgImpl;
if (isFlowControlMsg(jsmi.msg)) {
// FIXME: ordered consumer needs to work on this
jsmi.msg.respond();
return false;
}
return true;
};
if (!jsi.mack) {
so.dispatchedFn = autoAckJsMsg;
}
Expand Down Expand Up @@ -503,18 +514,8 @@ function cbMsgAdapter(
if (err) {
return [err, null];
}
if (isFlowControlMsg(msg)) {
msg.respond();
return [null, null];
}
const jm = toJsMsg(msg);
try {
// this will throw if not a JsMsg
jm.info;
return [null, jm];
} catch (err) {
return [err, null];
}
// assuming that the protocolFilterFn is set!
return [null, toJsMsg(msg)];
}

function iterMsgAdapter(
Expand All @@ -538,18 +539,8 @@ function iterMsgAdapter(
return [ne, null];
}
}
if (isFlowControlMsg(msg)) {
msg.respond();
return [null, null];
}
const jm = toJsMsg(msg);
try {
// this will throw if not a JsMsg
jm.info;
return [null, jm];
} catch (err) {
return [err, null];
}
// assuming that the protocolFilterFn is set
return [null, toJsMsg(msg)];
}

function autoAckJsMsg(data: JsMsg | null) {
Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export function parseInfo(s: string): DeliveryInfo {
return di;
}

class JsMsgImpl implements JsMsg {
export class JsMsgImpl implements JsMsg {
msg: Msg;
di?: DeliveryInfo;
didAck: boolean;
Expand Down
19 changes: 14 additions & 5 deletions nats-base-client/queued_iterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export interface Dispatcher<T> {
push(v: T): void;
}

export type ProtocolFilterFn<T> = (data: T | null) => boolean;
export type DispatchedFn<T> = (data: T | null) => void;

export interface QueuedIterator<T> extends Dispatcher<T> {
Expand All @@ -32,12 +33,14 @@ export interface QueuedIterator<T> extends Dispatcher<T> {
export class QueuedIteratorImpl<T> implements QueuedIterator<T> {
inflight: number;
processed: number;
received: number; // this is updated by the protocol
// FIXME: this is updated by the protocol
received: number;
protected noIterator: boolean;
iterClosed: Deferred<void>;
protected done: boolean;
private signal: Deferred<void>;
private yields: T[];
protocolFilterFn?: ProtocolFilterFn<T>;
dispatchedFn?: DispatchedFn<T>;
private err?: Error;

Expand Down Expand Up @@ -80,10 +83,16 @@ export class QueuedIteratorImpl<T> implements QueuedIterator<T> {
this.inflight = yields.length;
this.yields = [];
for (let i = 0; i < yields.length; i++) {
this.processed++;
yield yields[i];
if (this.dispatchedFn && yields[i]) {
this.dispatchedFn(yields[i]);
// only pass messages that pass the filter
const ok = this.protocolFilterFn
? this.protocolFilterFn(yields[i])
: true;
if (ok) {
this.processed++;
yield yields[i];
if (this.dispatchedFn && yields[i]) {
this.dispatchedFn(yields[i]);
}
}
this.inflight--;
}
Expand Down
27 changes: 21 additions & 6 deletions nats-base-client/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { DispatchedFn, QueuedIteratorImpl } from "./queued_iterator.ts";
import {
DispatchedFn,
ProtocolFilterFn,
QueuedIteratorImpl,
} from "./queued_iterator.ts";
import type { Base, Msg, Subscription, SubscriptionOptions } from "./types.ts";
import { Deferred, deferred, extend, Timeout, timeout } from "./util.ts";
import { ErrorCode, NatsError } from "./error.ts";
Expand Down Expand Up @@ -69,16 +73,27 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
}
}

setDispatchedFn(cb: DispatchedFn<Msg>) {
// user specified callback
setPrePostHandlers(
opts: {
protocolFilterFn?: ProtocolFilterFn<Msg>;
dispatchedFn?: DispatchedFn<Msg>;
},
) {
if (this.noIterator) {
const uc = this.callback;
const filter = opts.protocolFilterFn ? opts.protocolFilterFn : () => {
return true;
};
const dispatched = opts.dispatchedFn ? opts.dispatchedFn : () => {};
this.callback = (err: NatsError | null, msg: Msg) => {
uc(err, msg);
cb(msg);
if (filter(msg)) {
uc(err, msg);
dispatched(msg);
}
};
} else {
this.dispatchedFn = cb;
this.protocolFilterFn = opts.protocolFilterFn;
this.dispatchedFn = opts.dispatchedFn;
}
}

Expand Down
16 changes: 12 additions & 4 deletions nats-base-client/typedsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/

import { Deferred, deferred } from "./util.ts";
import type { DispatchedFn } from "./queued_iterator.ts";
import type { DispatchedFn, ProtocolFilterFn } from "./queued_iterator.ts";
import type {
Msg,
NatsConnection,
Expand Down Expand Up @@ -48,6 +48,7 @@ export type TypedCallback<T> = (err: NatsError | null, msg: T | null) => void;
export interface TypedSubscriptionOptions<T> extends SubOpts<T> {
adapter: MsgAdapter<T>;
callback?: TypedCallback<T>;
protocolFilterFn?: ProtocolFilterFn<T>;
dispatchedFn?: DispatchedFn<T>;
cleanupFn?: (sub: Subscription, info?: unknown) => void;
}
Expand Down Expand Up @@ -94,6 +95,10 @@ export class TypedSubscription<T> extends QueuedIteratorImpl<T>
}
this.noIterator = typeof opts.callback === "function";

if (opts.protocolFilterFn) {
checkFn(opts.protocolFilterFn, "filterFn");
this.protocolFilterFn = opts.protocolFilterFn;
}
if (opts.dispatchedFn) {
checkFn(opts.dispatchedFn, "dispatchedFn");
this.dispatchedFn = opts.dispatchedFn;
Expand All @@ -109,9 +114,12 @@ export class TypedSubscription<T> extends QueuedIteratorImpl<T>
const uh = opts.callback;
callback = (err: NatsError | null, msg: Msg) => {
const [jer, tm] = this.adapter(err, msg);
uh(jer, tm);
if (this.dispatchedFn && tm) {
this.dispatchedFn(tm);
const ok = this.protocolFilterFn ? this.protocolFilterFn(tm) : true;
if (ok) {
uh(jer, tm);
if (this.dispatchedFn && tm) {
this.dispatchedFn(tm);
}
}
};
}
Expand Down
101 changes: 92 additions & 9 deletions tests/sub_extensions_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
assert,
assertEquals,
} from "https://deno.land/std@0.95.0/testing/asserts.ts";
import { createInbox, Msg } from "../src/mod.ts";
import { createInbox, Msg, StringCodec } from "../src/mod.ts";
import {
deferred,
SubscriptionImpl,
Expand Down Expand Up @@ -104,10 +104,13 @@ Deno.test("extensions - dispatched called on callback", async () => {
const subj = createInbox();
const sub = nc.subscribe(subj, { callback: () => {} }) as SubscriptionImpl;
let count = 0;
sub.setDispatchedFn((msg: Msg | null) => {
if (msg) {
count++;
}

sub.setPrePostHandlers({
dispatchedFn: (msg: Msg | null) => {
if (msg) {
count++;
}
},
});
nc.publish(subj);
await nc.flush();
Expand All @@ -120,10 +123,12 @@ Deno.test("extensions - dispatched called on iterator", async () => {
const subj = createInbox();
const sub = nc.subscribe(subj, { max: 1 }) as SubscriptionImpl;
let count = 0;
sub.setDispatchedFn((msg: Msg | null) => {
if (msg) {
count++;
}
sub.setPrePostHandlers({
dispatchedFn: (msg: Msg | null) => {
if (msg) {
count++;
}
},
});
const done = (async () => {
for await (const _m of sub) {
Expand All @@ -136,3 +141,81 @@ Deno.test("extensions - dispatched called on iterator", async () => {
assertEquals(count, 1);
await cleanup(ns, nc);
});

Deno.test("extensions - filter called on callback", async () => {
const { ns, nc } = await setup();
const subj = createInbox();
const sub = nc.subscribe(subj, {
max: 3,
callback: (err, m) => {
processed.push(sc.decode(m.data));
},
}) as SubscriptionImpl;
const sc = StringCodec();
const all: string[] = [];
const processed: string[] = [];
const dispatched: string[] = [];
sub.setPrePostHandlers({
protocolFilterFn: (msg: Msg | null): boolean => {
if (msg) {
const d = sc.decode(msg.data);
all.push(d);
return d.startsWith("A");
}
return false;
},
dispatchedFn: (msg: Msg | null): void => {
dispatched.push(msg ? sc.decode(msg.data) : "");
},
});

nc.publish(subj, sc.encode("A"));
nc.publish(subj, sc.encode("B"));
nc.publish(subj, sc.encode("C"));
await sub.closed;

assertEquals(processed, ["A"]);
assertEquals(dispatched, ["A"]);
assertEquals(all, ["A", "B", "C"]);

await cleanup(ns, nc);
});

Deno.test("extensions - filter called on iterator", async () => {
const { ns, nc } = await setup();
const subj = createInbox();
const sub = nc.subscribe(subj, { max: 3 }) as SubscriptionImpl;
const sc = StringCodec();
const all: string[] = [];
const processed: string[] = [];
const dispatched: string[] = [];
sub.setPrePostHandlers({
protocolFilterFn: (msg: Msg | null): boolean => {
if (msg) {
const d = sc.decode(msg.data);
all.push(d);
return d.startsWith("A");
}
return false;
},
dispatchedFn: (msg: Msg | null): void => {
dispatched.push(msg ? sc.decode(msg.data) : "");
},
});
const done = (async () => {
for await (const m of sub) {
processed.push(sc.decode(m.data));
}
})();

nc.publish(subj, sc.encode("A"));
nc.publish(subj, sc.encode("B"));
nc.publish(subj, sc.encode("C"));
await done;

assertEquals(processed, ["A"]);
assertEquals(dispatched, ["A"]);
assertEquals(all, ["A", "B", "C"]);

await cleanup(ns, nc);
});