diff --git a/package-lock.json b/package-lock.json index 27cf7fca..34f87001 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2340,6 +2340,12 @@ "integrity": "sha512-6gOkRe7OIioWAXfnO/2lFiv+SJichKVSys1mSsgyrYHSEjk8Ctv4tSR/Odvnu+HWlH2C8j53dahU03XmQdd5fA==", "dev": true }, + "@types/promise-timeout": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/@types/promise-timeout/-/promise-timeout-1.3.0.tgz", + "integrity": "sha512-AtVKSZUtpBoZ4SshXJk5JcTXJllinHKKx615lsRNJUsbbFlI0AI8drlnoiQ+PNvjkeoF9Y8fJUh6UO2khsIBZw==", + "dev": true + }, "@types/redis": { "version": "2.8.28", "resolved": "https://registry.npmjs.org/@types/redis/-/redis-2.8.28.tgz", @@ -12275,6 +12281,11 @@ "retry": "^0.12.0" } }, + "promise-timeout": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/promise-timeout/-/promise-timeout-1.3.0.tgz", + "integrity": "sha512-5yANTE0tmi5++POym6OgtFmwfDvOXABD9oj/jLQr5GPEyuNEb7jH4wbbANJceJid49jwhi1RddxnhnEAb/doqg==" + }, "prompts": { "version": "2.4.0", "resolved": "https://registry.npmjs.org/prompts/-/prompts-2.4.0.tgz", diff --git a/package.json b/package.json index 8166e99d..0d85e78c 100644 --- a/package.json +++ b/package.json @@ -95,6 +95,7 @@ "javascript-state-machine": "^3.1.0", "openapi-backend": "^3.9.0", "path": "^0.12.7", + "promise-timeout": "^1.3.0", "redis": "^3.0.2", "ts-node": "^9.1.1", "tsconfig-paths": "^3.9.0", @@ -105,6 +106,7 @@ "@commitlint/cli": "^11.0.0", "@commitlint/config-conventional": "^11.0.0", "@mojaloop/api-snippets": "^12.0.4", + "@types/promise-timeout": "^1.3.0", "@types/redis-mock": "^0.17.0", "@typescript-eslint/eslint-plugin": "^4.15.1", "@typescript-eslint/parser": "^4.15.1", diff --git a/src/shared/deferred-job.ts b/src/shared/deferred-job.ts new file mode 100644 index 00000000..0076607a --- /dev/null +++ b/src/shared/deferred-job.ts @@ -0,0 +1,206 @@ +/***** + License + -------------- + Copyright © 2020 Mojaloop Foundation + The Mojaloop files are made available by the Mojaloop Foundation under the Apache License, Version 2.0 (the "License") + and you may not use these files except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed + on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and limitations under the License. + Contributors + -------------- + This is the official list of the Mojaloop project contributors for this file. + Names of the original copyright holders (individuals or organizations) + should be listed with a '*' in the first column. People who have + contributed from an organization can be listed under the organization + that actually holds the copyright for their contributions (see the + Gates Foundation organization for an example). Those individuals should have + their names indented and be marked with a '-'. Email address can be added + optionally within square brackets . + * Gates Foundation + - Name Surname + + - Paweł Marzec + -------------- + ******/ + +/** + * deferredJob is a workflow to + * - setup pub/sub one time subscription to channel + * - initiate the workflow start by jobInitiator callback + * - consume published message by jobListener callback + * - wait for workflow to fulfill till timeout reached + */ + +import { timeout as prTimeout } from 'promise-timeout' +import { PubSub, Message } from '~/shared/pub-sub' + +// re-export TimeoutError so client will not be bothered about promise-timeout +export { TimeoutError } from 'promise-timeout' + +// function responsible for initiate the flow which should result, somewhere in the future, +// in publishing message to the queue +// parameter to deferredJob(...).init(jobInitiator) +export type JobInitiator = (channel: string, sid: number) => Promise; + +// function responsible for consuming the message +// parameter to deferredJob(...).init().job(jobListener) +export type JobListener = (message: Message) => Promise; + +// minimal mvp validation for JobInitiator +export class InitiatorRequired extends Error { + public channel: string + + constructor (channel: string) { + super(`'init' expects JobInitiator value for channel: '${channel}'`) + this.channel = channel + } + + // validation logic + static throwIfInvalid (channel: string, jobInitiator: JobInitiator): void { + if (typeof jobInitiator !== 'function') { + throw new InitiatorRequired(channel) + } + } +} + +// minimal mvp validation for JobListener +export class ListenerRequired extends Error { + public channel: string + + constructor (channel: string) { + super(`'job' expects JobListener value for channel: '${channel}'`) + this.channel = channel + } + + // validation logic + static throwIfInvalid (channel: string, jobListener: JobListener): void { + if (typeof jobListener !== 'function') { + throw new ListenerRequired(channel) + } + } +} + +// minimal mvp validation for timeout +export class PositiveTimeoutRequired extends Error { + public channel: string + + constructor (channel: string) { + super(`'wait' expects to be positive number for channel: '${channel}'`) + this.channel = channel + } + + // validation logic + static throwIfInvalid (channel: string, timeout: number): void { + if (timeout <= 0) { + throw new PositiveTimeoutRequired(channel) + } + } +} + +// async method which returns promise resolved when JobListener consume the Message +// this method invokes JobInitiator and setup promise timeout +// throws TimeoutError if Message isn't published or JobListener doesn't finish Message consumption in time +// https://www.npmjs.com/package/promise-timeout +export interface DeferredWait { + wait: (timeout: number) => Promise +} + +// method to setup JobListener +// returns interface with next possible step method - DeferredWait +export interface DeferredJob { + job: (jobListener: JobListener) => DeferredWait +} + +// only two methods are allowed on fresh result from deferredJob function +// these two methods reflects two possible flows +// - init method -> setups JobInitiator and returns interface to setupDeferredJob +// which will effects in DeferredWait interface at end +// - trigger method -> used to publish message to the channel + +export interface DeferredInitOrTrigger { + init: (jobInitiator: JobInitiator) => DeferredJob + trigger: (message: Message) => Promise +} + +// deferredJob +export default function deferredJob (cache: PubSub, channel: string): DeferredInitOrTrigger { + return { + + // initialize the deferred job + init: (jobInitiator: JobInitiator) => { + // mvp validation for jobInitiator + InitiatorRequired.throwIfInvalid(channel, jobInitiator) + return { + job: (jobListener: JobListener) => { + // mvp validation for jobListener + ListenerRequired.throwIfInvalid(channel, jobListener) + return { + wait: async (timeout = 2000): Promise => { + // mvp validation for timeout + PositiveTimeoutRequired.throwIfInvalid(channel, timeout) + + // cache subscription id + let sid = 0 + // cache un-subscription wrapper + const unsubscribe = (): void => { + // unsubscribe only if elements needed are valid + if (sid && cache && channel) { + cache.unsubscribe(channel, sid) + // protect against multiple un-subscription + sid = 0 + } + } + + // eslint-disable-next-line no-async-promise-executor + const promise = new Promise(async (resolve, reject) => { + try { + // subscribe to the channel to execute the jobListener when the message arrive + sid = await cache.subscribe(channel, async (_channel, message: Message) => { + // consume message + try { + // unsubscribe first to be sure the jobListener will be executed only once + // and system resources are preserved + await unsubscribe() + + // invoke deferred job to consume received message + await jobListener(message) + } catch (err) { + return reject(err) + } + + // done + resolve() + }) + + // invoke the async task which should effects in the future + // by publishing the message to channel via trigger method + // so the jobListener will be invoked + await jobInitiator(channel, sid) + } catch (err) { + // unsubscribe from channel in case of any error + await unsubscribe() + reject(err) + } + }) + + // ensure the whole process will finish in specified timeout + // throws error if timeout happens + return prTimeout(promise, timeout) + .catch(async (err) => { + await unsubscribe() + throw err + }) + } + } + } + } + }, + + // trigger the deferred job + trigger: async (message: Message): Promise => { + return cache.publish(channel, message) + } + } +} diff --git a/test/unit/shared/deferred-job.test.ts b/test/unit/shared/deferred-job.test.ts new file mode 100644 index 00000000..00814dec --- /dev/null +++ b/test/unit/shared/deferred-job.test.ts @@ -0,0 +1,180 @@ +/***** + License + -------------- + Copyright © 2020 Mojaloop Foundation + The Mojaloop files are made available by the Mojaloop Foundation under the Apache License, Version 2.0 (the 'License') + and you may not use these files except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, the Mojaloop files are distributed + on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and limitations under the License. + Contributors + -------------- + This is the official list of the Mojaloop project contributors for this file. + Names of the original copyright holders (individuals or organizations) + should be listed with a '*' in the first column. People who have + contributed from an organization can be listed under the organization + that actually holds the copyright for their contributions (see the + Gates Foundation organization for an example). Those individuals should have + their names indented and be marked with a '-'. Email address can be added + optionally within square brackets . + * Gates Foundation + - Name Surname + + - Paweł Marzec + -------------- + ******/ + +import deferredJob, { JobInitiator, JobListener, PositiveTimeoutRequired, TimeoutError } from '~/shared/deferred-job' +import mockLogger from '../mockLogger' +import { Message, NotificationCallback, PubSub } from '~/shared/pub-sub' +import { + RedisConnectionConfig +} from '~/shared/redis-connection' +import { uuid } from 'uuidv4' +jest.mock('redis') + +describe('deferredJob', () => { + test('module layout', () => { + expect(typeof deferredJob).toEqual('function') + }) + + describe('workflow: deferredJob -> init -> job -> wait', () => { + const pubSubConfig: RedisConnectionConfig = { + port: 6789, + host: 'localhost', + logger: mockLogger() + } + let pubSub: PubSub + let spySubscribe: jest.SpyInstance + let spyUnsubscribe: jest.SpyInstance + let spyPublish: jest.SpyInstance + const channel = uuid() + const publishTimeoutInMs = 50 + beforeEach(async () => { + pubSub = new PubSub(pubSubConfig) + await pubSub.connect() + let notifyCb: NotificationCallback + spySubscribe = jest.spyOn(pubSub, 'subscribe') + .mockImplementation((_channel: string, cb: NotificationCallback) => { + // store callback to be used in `publish` + notifyCb = cb + return 1 // hardcoded sid + }) + spyUnsubscribe = jest.spyOn(pubSub, 'unsubscribe') + .mockImplementation(() => true) // true returned when unsubscribe done + spyPublish = jest.spyOn(pubSub, 'publish') + .mockImplementationOnce((channel: string, message: Message) => { + // invoke stored callback to simulate + setTimeout(() => notifyCb(channel, message, 1), publishTimeoutInMs) + return Promise.resolve() + }) + }) + + afterEach(async () => { + await pubSub.disconnect() + }) + + test('happy flow', async (done) => { + const jobInitiator = jest.fn(() => Promise.resolve()) + const jobListener = jest.fn(() => Promise.resolve()) + const initOrTrigger = deferredJob(pubSub, channel) + + // check workflow layout + expect(typeof initOrTrigger.init).toEqual('function') + expect(typeof initOrTrigger.trigger).toEqual('function') + + const dj = initOrTrigger.init(jobInitiator) + expect(typeof dj.job).toEqual('function') + + const dw = dj.job(jobListener) + expect(typeof dw.wait).toEqual('function') + + // wait phase - execution + dw.wait(publishTimeoutInMs + 10).then(() => { + expect(spyPublish).toHaveBeenCalledWith(channel, { the: 'message' }) + expect(spyUnsubscribe).toHaveBeenCalledWith(channel, 1) + done() + }) + expect(spySubscribe).toHaveBeenCalledWith(channel, expect.any(Function)) + await initOrTrigger.trigger({ the: 'message' }) + }) + + test('timeout', async (done) => { + const jobInitiator = jest.fn(() => Promise.resolve()) + const jobListener = jest.fn(() => Promise.resolve()) + + const dw = deferredJob(pubSub, channel) + .init(jobInitiator) + .job(jobListener) + + // wait phase - set timeout before publish will happen + dw.wait(publishTimeoutInMs - 10).catch((err) => { + expect(err).toBeInstanceOf(TimeoutError) + expect(spyPublish).toHaveBeenCalledWith(channel, { the: 'message' }) + expect(spyUnsubscribe).toHaveBeenCalledWith(channel, 1) + done() + }) + expect(spySubscribe).toHaveBeenCalledWith(channel, expect.any(Function)) + await deferredJob(pubSub, channel).trigger({ the: 'message' }) + }) + + test('exception from jobInitiator', (done) => { + const jobInitiator = jest.fn(() => { throw new Error('job-initiator throws') }) + const jobListener = jest.fn(() => Promise.resolve()) + + const dw = deferredJob(pubSub, channel) + .init(jobInitiator) + .job(jobListener) + + // wait phase - set timeout before publish will happen + dw.wait(publishTimeoutInMs + 10).catch((err) => { + expect(err.message).toEqual('job-initiator throws') + expect(spyPublish).not.toHaveBeenCalled() + expect(spyUnsubscribe).toHaveBeenCalledWith(channel, 1) + done() + }) + expect(spySubscribe).toHaveBeenCalledWith(channel, expect.any(Function)) + }) + + test('exception from jobListener', async (done) => { + const jobInitiator = jest.fn(() => Promise.resolve()) + const jobListener = jest.fn(() => { throw new Error('job-listener throws') }) + + const dw = deferredJob(pubSub, channel) + .init(jobInitiator) + .job(jobListener) + + // wait phase - set timeout before publish will happen + // testing default argument for wait + dw.wait(undefined as unknown as number).catch((err) => { + expect(err.message).toEqual('job-listener throws') + expect(spySubscribe).toHaveBeenCalledWith(channel, expect.any(Function)) + expect(spyUnsubscribe).toHaveBeenCalledWith(channel, 1) + done() + }) + expect(spySubscribe).toHaveBeenCalledWith(channel, expect.any(Function)) + await deferredJob(pubSub, channel).trigger({ the: 'message' }) + }) + + test('input validation', () => { + const jobInitiator = jest.fn(() => Promise.resolve()) + const jobListener = jest.fn(() => Promise.resolve()) + + expect(() => deferredJob(pubSub, channel) + .init(null as unknown as JobInitiator) + ).toThrowError() + + expect(() => deferredJob(pubSub, channel) + .init(jobInitiator) + .job(null as unknown as JobListener) + ).toThrowError() + + expect(deferredJob(pubSub, channel) + .init(jobInitiator) + .job(jobListener) + .wait(-1) + ).rejects.toBeInstanceOf(PositiveTimeoutRequired) + }) + }) +})