From 046f4df9c8fdc1a3be78f3061c87964aba873995 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 13 Oct 2021 16:07:34 -0500 Subject: [PATCH 1/3] [FIX] flow control/protocol messages now handled when they would be dispatched. The queued iterator has a filterFn that tests the inbound message to see if it should be presented to the callback/iterator. This enables the processing of the flow control messages when the user would have seen it, rather than when the protocol injected it into the subscription. --- nats-base-client/jsclient.ts | 41 +++++-------- nats-base-client/jsmsg.ts | 2 +- nats-base-client/queued_iterator.ts | 13 +++- nats-base-client/subscription.ts | 24 ++++++-- nats-base-client/typedsub.ts | 12 +++- tests/sub_extensions_test.ts | 93 ++++++++++++++++++++++++++--- 6 files changed, 139 insertions(+), 46 deletions(-) diff --git a/nats-base-client/jsclient.ts b/nats-base-client/jsclient.ts index 26ad1838..4cae6e91 100644 --- a/nats-base-client/jsclient.ts +++ b/nats-base-client/jsclient.ts @@ -47,7 +47,7 @@ import { validateStreamName, } from "./jsutil.ts"; import { ConsumerAPIImpl } from "./jsmconsumer_api.ts"; -import { toJsMsg } from "./jsmsg.ts"; +import { JsMsgImpl, toJsMsg } from "./jsmsg.ts"; import { MsgAdapter, TypedSubscription, @@ -179,6 +179,8 @@ export class JetStreamClientImpl extends BaseApiClient const qi = new QueuedIteratorImpl(); const wants = args.batch; let received = 0; + // FIXME: this looks weird, we want to stop the iterator + // but doing it from a dispatchedFn... qi.dispatchedFn = (m: JsMsg | null) => { if (m) { received++; @@ -384,6 +386,15 @@ export class JetStreamClientImpl extends BaseApiClient if (jsi.callbackFn) { so.callback = jsi.callbackFn; } + so.filterFn = (jm): boolean => { + const jsmi = jm as JsMsgImpl; + if (isFlowControlMsg(jsmi.msg)) { + // FIXME: ordered consumer needs to work on this + jsmi.msg.respond(); + return false; + } + return true; + }; if (!jsi.mack) { so.dispatchedFn = autoAckJsMsg; } @@ -503,18 +514,8 @@ function cbMsgAdapter( if (err) { return [err, null]; } - if (isFlowControlMsg(msg)) { - msg.respond(); - return [null, null]; - } - const jm = toJsMsg(msg); - try { - // this will throw if not a JsMsg - jm.info; - return [null, jm]; - } catch (err) { - return [err, null]; - } + // assuming that the filterFn is set! + return [null, toJsMsg(msg)]; } function iterMsgAdapter( @@ -538,18 +539,8 @@ function iterMsgAdapter( return [ne, null]; } } - if (isFlowControlMsg(msg)) { - msg.respond(); - return [null, null]; - } - const jm = toJsMsg(msg); - try { - // this will throw if not a JsMsg - jm.info; - return [null, jm]; - } catch (err) { - return [err, null]; - } + // assuming that the filterFn is set + return [null, toJsMsg(msg)]; } function autoAckJsMsg(data: JsMsg | null) { diff --git a/nats-base-client/jsmsg.ts b/nats-base-client/jsmsg.ts index e7364c15..18ff0d09 100644 --- a/nats-base-client/jsmsg.ts +++ b/nats-base-client/jsmsg.ts @@ -68,7 +68,7 @@ export function parseInfo(s: string): DeliveryInfo { return di; } -class JsMsgImpl implements JsMsg { +export class JsMsgImpl implements JsMsg { msg: Msg; di?: DeliveryInfo; didAck: boolean; diff --git a/nats-base-client/queued_iterator.ts b/nats-base-client/queued_iterator.ts index 26b76367..9858b19a 100644 --- a/nats-base-client/queued_iterator.ts +++ b/nats-base-client/queued_iterator.ts @@ -19,6 +19,7 @@ export interface Dispatcher { push(v: T): void; } +export type FilterFn = (data: T | null) => boolean; export type DispatchedFn = (data: T | null) => void; export interface QueuedIterator extends Dispatcher { @@ -32,12 +33,14 @@ export interface QueuedIterator extends Dispatcher { export class QueuedIteratorImpl implements QueuedIterator { inflight: number; processed: number; - received: number; // this is updated by the protocol + // FIXME: this is updated by the protocol + received: number; protected noIterator: boolean; iterClosed: Deferred; protected done: boolean; private signal: Deferred; private yields: T[]; + filterFn?: FilterFn; dispatchedFn?: DispatchedFn; private err?: Error; @@ -80,8 +83,12 @@ export class QueuedIteratorImpl implements QueuedIterator { this.inflight = yields.length; this.yields = []; for (let i = 0; i < yields.length; i++) { - this.processed++; - yield yields[i]; + // only pass messages that pass the filter + const ok = this.filterFn ? this.filterFn(yields[i]) : true; + if (ok) { + this.processed++; + yield yields[i]; + } if (this.dispatchedFn && yields[i]) { this.dispatchedFn(yields[i]); } diff --git a/nats-base-client/subscription.ts b/nats-base-client/subscription.ts index 61b413b9..ffd7cbd0 100644 --- a/nats-base-client/subscription.ts +++ b/nats-base-client/subscription.ts @@ -12,7 +12,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { DispatchedFn, QueuedIteratorImpl } from "./queued_iterator.ts"; +import { + DispatchedFn, + FilterFn, + QueuedIteratorImpl, +} from "./queued_iterator.ts"; import type { Base, Msg, Subscription, SubscriptionOptions } from "./types.ts"; import { Deferred, deferred, extend, Timeout, timeout } from "./util.ts"; import { ErrorCode, NatsError } from "./error.ts"; @@ -69,16 +73,24 @@ export class SubscriptionImpl extends QueuedIteratorImpl } } - setDispatchedFn(cb: DispatchedFn) { - // user specified callback + setPrePostHandlers( + opts: { filterFn?: FilterFn; dispatchedFn?: DispatchedFn }, + ) { if (this.noIterator) { const uc = this.callback; + const filter = opts.filterFn ? opts.filterFn : () => { + return true; + }; + const dispatched = opts.dispatchedFn ? opts.dispatchedFn : () => {}; this.callback = (err: NatsError | null, msg: Msg) => { - uc(err, msg); - cb(msg); + if (filter(msg)) { + uc(err, msg); + } + dispatched(msg); }; } else { - this.dispatchedFn = cb; + this.filterFn = opts.filterFn; + this.dispatchedFn = opts.dispatchedFn; } } diff --git a/nats-base-client/typedsub.ts b/nats-base-client/typedsub.ts index 6f89feb8..2207958a 100644 --- a/nats-base-client/typedsub.ts +++ b/nats-base-client/typedsub.ts @@ -14,7 +14,7 @@ */ import { Deferred, deferred } from "./util.ts"; -import type { DispatchedFn } from "./queued_iterator.ts"; +import type { DispatchedFn, FilterFn } from "./queued_iterator.ts"; import type { Msg, NatsConnection, @@ -48,6 +48,7 @@ export type TypedCallback = (err: NatsError | null, msg: T | null) => void; export interface TypedSubscriptionOptions extends SubOpts { adapter: MsgAdapter; callback?: TypedCallback; + filterFn?: FilterFn; dispatchedFn?: DispatchedFn; cleanupFn?: (sub: Subscription, info?: unknown) => void; } @@ -94,6 +95,10 @@ export class TypedSubscription extends QueuedIteratorImpl } this.noIterator = typeof opts.callback === "function"; + if (opts.filterFn) { + checkFn(opts.dispatchedFn, "dispatchedFn"); + this.filterFn = opts.filterFn; + } if (opts.dispatchedFn) { checkFn(opts.dispatchedFn, "dispatchedFn"); this.dispatchedFn = opts.dispatchedFn; @@ -109,7 +114,10 @@ export class TypedSubscription extends QueuedIteratorImpl const uh = opts.callback; callback = (err: NatsError | null, msg: Msg) => { const [jer, tm] = this.adapter(err, msg); - uh(jer, tm); + const ok = this.filterFn ? this.filterFn(tm) : true; + if (ok) { + uh(jer, tm); + } if (this.dispatchedFn && tm) { this.dispatchedFn(tm); } diff --git a/tests/sub_extensions_test.ts b/tests/sub_extensions_test.ts index fd1c6a6e..9718e64c 100644 --- a/tests/sub_extensions_test.ts +++ b/tests/sub_extensions_test.ts @@ -16,7 +16,7 @@ import { assert, assertEquals, } from "https://deno.land/std@0.95.0/testing/asserts.ts"; -import { createInbox, Msg } from "../src/mod.ts"; +import { createInbox, Msg, StringCodec } from "../src/mod.ts"; import { deferred, SubscriptionImpl, @@ -104,10 +104,13 @@ Deno.test("extensions - dispatched called on callback", async () => { const subj = createInbox(); const sub = nc.subscribe(subj, { callback: () => {} }) as SubscriptionImpl; let count = 0; - sub.setDispatchedFn((msg: Msg | null) => { - if (msg) { - count++; - } + + sub.setPrePostHandlers({ + dispatchedFn: (msg: Msg | null) => { + if (msg) { + count++; + } + }, }); nc.publish(subj); await nc.flush(); @@ -120,10 +123,12 @@ Deno.test("extensions - dispatched called on iterator", async () => { const subj = createInbox(); const sub = nc.subscribe(subj, { max: 1 }) as SubscriptionImpl; let count = 0; - sub.setDispatchedFn((msg: Msg | null) => { - if (msg) { - count++; - } + sub.setPrePostHandlers({ + dispatchedFn: (msg: Msg | null) => { + if (msg) { + count++; + } + }, }); const done = (async () => { for await (const _m of sub) { @@ -136,3 +141,73 @@ Deno.test("extensions - dispatched called on iterator", async () => { assertEquals(count, 1); await cleanup(ns, nc); }); + +Deno.test("extensions - filter called on callback", async () => { + const { ns, nc } = await setup(); + const subj = createInbox(); + const sub = nc.subscribe(subj, { + max: 3, + callback: (err, m) => { + filtered.push(sc.decode(m.data)); + }, + }) as SubscriptionImpl; + const sc = StringCodec(); + const all: string[] = []; + const filtered: string[] = []; + sub.setPrePostHandlers({ + filterFn: (msg: Msg | null): boolean => { + if (msg) { + return sc.decode(msg.data).startsWith("A"); + } + return false; + }, + dispatchedFn: (msg: Msg | null): void => { + all.push(msg ? sc.decode(msg.data) : ""); + }, + }); + + nc.publish(subj, sc.encode("A")); + nc.publish(subj, sc.encode("B")); + nc.publish(subj, sc.encode("C")); + await sub.closed; + + assertEquals(filtered, ["A"]); + assertEquals(all, ["A", "B", "C"]); + + await cleanup(ns, nc); +}); + +Deno.test("extensions - filter called on iterator", async () => { + const { ns, nc } = await setup(); + const subj = createInbox(); + const sub = nc.subscribe(subj, { max: 3 }) as SubscriptionImpl; + const sc = StringCodec(); + const all: string[] = []; + const filtered: string[] = []; + sub.setPrePostHandlers({ + filterFn: (msg: Msg | null): boolean => { + if (msg) { + return sc.decode(msg.data).startsWith("A"); + } + return false; + }, + dispatchedFn: (msg: Msg | null): void => { + all.push(msg ? sc.decode(msg.data) : ""); + }, + }); + const done = (async () => { + for await (const m of sub) { + filtered.push(sc.decode(m.data)); + } + })(); + + nc.publish(subj, sc.encode("A")); + nc.publish(subj, sc.encode("B")); + nc.publish(subj, sc.encode("C")); + await done; + + assertEquals(filtered, ["A"]); + assertEquals(all, ["A", "B", "C"]); + + await cleanup(ns, nc); +}); From 105f57330b26e582393cbfbf2c401d504d82734e Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 13 Oct 2021 16:12:18 -0500 Subject: [PATCH 2/3] c/p error --- nats-base-client/typedsub.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nats-base-client/typedsub.ts b/nats-base-client/typedsub.ts index 2207958a..91e8eb62 100644 --- a/nats-base-client/typedsub.ts +++ b/nats-base-client/typedsub.ts @@ -96,7 +96,7 @@ export class TypedSubscription extends QueuedIteratorImpl this.noIterator = typeof opts.callback === "function"; if (opts.filterFn) { - checkFn(opts.dispatchedFn, "dispatchedFn"); + checkFn(opts.filterFn, "filterFn"); this.filterFn = opts.filterFn; } if (opts.dispatchedFn) { From 6117a74f04023b2f1f60b61ef4b380b42bda9fcf Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 13 Oct 2021 16:59:50 -0500 Subject: [PATCH 3/3] changes as per discussions with ivan. Changed the dispatchedFn to only be called if the protocolFilterFn allows the message to be presented. --- nats-base-client/jsclient.ts | 6 +++--- nats-base-client/queued_iterator.ts | 14 +++++++------ nats-base-client/subscription.ts | 13 +++++++----- nats-base-client/typedsub.ts | 18 ++++++++-------- tests/sub_extensions_test.ts | 32 ++++++++++++++++++----------- 5 files changed, 48 insertions(+), 35 deletions(-) diff --git a/nats-base-client/jsclient.ts b/nats-base-client/jsclient.ts index 4cae6e91..5baae2ee 100644 --- a/nats-base-client/jsclient.ts +++ b/nats-base-client/jsclient.ts @@ -386,7 +386,7 @@ export class JetStreamClientImpl extends BaseApiClient if (jsi.callbackFn) { so.callback = jsi.callbackFn; } - so.filterFn = (jm): boolean => { + so.protocolFilterFn = (jm): boolean => { const jsmi = jm as JsMsgImpl; if (isFlowControlMsg(jsmi.msg)) { // FIXME: ordered consumer needs to work on this @@ -514,7 +514,7 @@ function cbMsgAdapter( if (err) { return [err, null]; } - // assuming that the filterFn is set! + // assuming that the protocolFilterFn is set! return [null, toJsMsg(msg)]; } @@ -539,7 +539,7 @@ function iterMsgAdapter( return [ne, null]; } } - // assuming that the filterFn is set + // assuming that the protocolFilterFn is set return [null, toJsMsg(msg)]; } diff --git a/nats-base-client/queued_iterator.ts b/nats-base-client/queued_iterator.ts index 9858b19a..fbd3115a 100644 --- a/nats-base-client/queued_iterator.ts +++ b/nats-base-client/queued_iterator.ts @@ -19,7 +19,7 @@ export interface Dispatcher { push(v: T): void; } -export type FilterFn = (data: T | null) => boolean; +export type ProtocolFilterFn = (data: T | null) => boolean; export type DispatchedFn = (data: T | null) => void; export interface QueuedIterator extends Dispatcher { @@ -40,7 +40,7 @@ export class QueuedIteratorImpl implements QueuedIterator { protected done: boolean; private signal: Deferred; private yields: T[]; - filterFn?: FilterFn; + protocolFilterFn?: ProtocolFilterFn; dispatchedFn?: DispatchedFn; private err?: Error; @@ -84,13 +84,15 @@ export class QueuedIteratorImpl implements QueuedIterator { this.yields = []; for (let i = 0; i < yields.length; i++) { // only pass messages that pass the filter - const ok = this.filterFn ? this.filterFn(yields[i]) : true; + const ok = this.protocolFilterFn + ? this.protocolFilterFn(yields[i]) + : true; if (ok) { this.processed++; yield yields[i]; - } - if (this.dispatchedFn && yields[i]) { - this.dispatchedFn(yields[i]); + if (this.dispatchedFn && yields[i]) { + this.dispatchedFn(yields[i]); + } } this.inflight--; } diff --git a/nats-base-client/subscription.ts b/nats-base-client/subscription.ts index ffd7cbd0..82cd2d79 100644 --- a/nats-base-client/subscription.ts +++ b/nats-base-client/subscription.ts @@ -14,7 +14,7 @@ */ import { DispatchedFn, - FilterFn, + ProtocolFilterFn, QueuedIteratorImpl, } from "./queued_iterator.ts"; import type { Base, Msg, Subscription, SubscriptionOptions } from "./types.ts"; @@ -74,22 +74,25 @@ export class SubscriptionImpl extends QueuedIteratorImpl } setPrePostHandlers( - opts: { filterFn?: FilterFn; dispatchedFn?: DispatchedFn }, + opts: { + protocolFilterFn?: ProtocolFilterFn; + dispatchedFn?: DispatchedFn; + }, ) { if (this.noIterator) { const uc = this.callback; - const filter = opts.filterFn ? opts.filterFn : () => { + const filter = opts.protocolFilterFn ? opts.protocolFilterFn : () => { return true; }; const dispatched = opts.dispatchedFn ? opts.dispatchedFn : () => {}; this.callback = (err: NatsError | null, msg: Msg) => { if (filter(msg)) { uc(err, msg); + dispatched(msg); } - dispatched(msg); }; } else { - this.filterFn = opts.filterFn; + this.protocolFilterFn = opts.protocolFilterFn; this.dispatchedFn = opts.dispatchedFn; } } diff --git a/nats-base-client/typedsub.ts b/nats-base-client/typedsub.ts index 91e8eb62..06c2ca6f 100644 --- a/nats-base-client/typedsub.ts +++ b/nats-base-client/typedsub.ts @@ -14,7 +14,7 @@ */ import { Deferred, deferred } from "./util.ts"; -import type { DispatchedFn, FilterFn } from "./queued_iterator.ts"; +import type { DispatchedFn, ProtocolFilterFn } from "./queued_iterator.ts"; import type { Msg, NatsConnection, @@ -48,7 +48,7 @@ export type TypedCallback = (err: NatsError | null, msg: T | null) => void; export interface TypedSubscriptionOptions extends SubOpts { adapter: MsgAdapter; callback?: TypedCallback; - filterFn?: FilterFn; + protocolFilterFn?: ProtocolFilterFn; dispatchedFn?: DispatchedFn; cleanupFn?: (sub: Subscription, info?: unknown) => void; } @@ -95,9 +95,9 @@ export class TypedSubscription extends QueuedIteratorImpl } this.noIterator = typeof opts.callback === "function"; - if (opts.filterFn) { - checkFn(opts.filterFn, "filterFn"); - this.filterFn = opts.filterFn; + if (opts.protocolFilterFn) { + checkFn(opts.protocolFilterFn, "filterFn"); + this.protocolFilterFn = opts.protocolFilterFn; } if (opts.dispatchedFn) { checkFn(opts.dispatchedFn, "dispatchedFn"); @@ -114,12 +114,12 @@ export class TypedSubscription extends QueuedIteratorImpl const uh = opts.callback; callback = (err: NatsError | null, msg: Msg) => { const [jer, tm] = this.adapter(err, msg); - const ok = this.filterFn ? this.filterFn(tm) : true; + const ok = this.protocolFilterFn ? this.protocolFilterFn(tm) : true; if (ok) { uh(jer, tm); - } - if (this.dispatchedFn && tm) { - this.dispatchedFn(tm); + if (this.dispatchedFn && tm) { + this.dispatchedFn(tm); + } } }; } diff --git a/tests/sub_extensions_test.ts b/tests/sub_extensions_test.ts index 9718e64c..33315fdb 100644 --- a/tests/sub_extensions_test.ts +++ b/tests/sub_extensions_test.ts @@ -148,21 +148,24 @@ Deno.test("extensions - filter called on callback", async () => { const sub = nc.subscribe(subj, { max: 3, callback: (err, m) => { - filtered.push(sc.decode(m.data)); + processed.push(sc.decode(m.data)); }, }) as SubscriptionImpl; const sc = StringCodec(); const all: string[] = []; - const filtered: string[] = []; + const processed: string[] = []; + const dispatched: string[] = []; sub.setPrePostHandlers({ - filterFn: (msg: Msg | null): boolean => { + protocolFilterFn: (msg: Msg | null): boolean => { if (msg) { - return sc.decode(msg.data).startsWith("A"); + const d = sc.decode(msg.data); + all.push(d); + return d.startsWith("A"); } return false; }, dispatchedFn: (msg: Msg | null): void => { - all.push(msg ? sc.decode(msg.data) : ""); + dispatched.push(msg ? sc.decode(msg.data) : ""); }, }); @@ -171,7 +174,8 @@ Deno.test("extensions - filter called on callback", async () => { nc.publish(subj, sc.encode("C")); await sub.closed; - assertEquals(filtered, ["A"]); + assertEquals(processed, ["A"]); + assertEquals(dispatched, ["A"]); assertEquals(all, ["A", "B", "C"]); await cleanup(ns, nc); @@ -183,21 +187,24 @@ Deno.test("extensions - filter called on iterator", async () => { const sub = nc.subscribe(subj, { max: 3 }) as SubscriptionImpl; const sc = StringCodec(); const all: string[] = []; - const filtered: string[] = []; + const processed: string[] = []; + const dispatched: string[] = []; sub.setPrePostHandlers({ - filterFn: (msg: Msg | null): boolean => { + protocolFilterFn: (msg: Msg | null): boolean => { if (msg) { - return sc.decode(msg.data).startsWith("A"); + const d = sc.decode(msg.data); + all.push(d); + return d.startsWith("A"); } return false; }, dispatchedFn: (msg: Msg | null): void => { - all.push(msg ? sc.decode(msg.data) : ""); + dispatched.push(msg ? sc.decode(msg.data) : ""); }, }); const done = (async () => { for await (const m of sub) { - filtered.push(sc.decode(m.data)); + processed.push(sc.decode(m.data)); } })(); @@ -206,7 +213,8 @@ Deno.test("extensions - filter called on iterator", async () => { nc.publish(subj, sc.encode("C")); await done; - assertEquals(filtered, ["A"]); + assertEquals(processed, ["A"]); + assertEquals(dispatched, ["A"]); assertEquals(all, ["A", "B", "C"]); await cleanup(ns, nc);