From 26b7de2057247e551bce1119ba299e93c576d084 Mon Sep 17 00:00:00 2001 From: IdanILT Date: Thu, 27 Aug 2020 22:22:02 +0300 Subject: [PATCH 01/10] transport browser rewrite --- packages/transport-browser/jest.config.js | 1 + packages/transport-browser/package.json | 3 +- .../src/Provider/Provider.ts | 26 --- .../src/Provider/ProviderClient.ts | 20 --- .../src/Provider/ProviderServer.ts | 18 --- packages/transport-browser/src/connection.ts | 148 ++++++++++++++++++ packages/transport-browser/src/index.ts | 38 ++++- .../tests/messageChannelMock.ts | 3 + .../transport-browser/tests/provider.spec.ts | 120 -------------- .../transport-browser/tests/transport.spec.ts | 29 ++++ yarn.lock | 12 +- 11 files changed, 226 insertions(+), 192 deletions(-) delete mode 100644 packages/transport-browser/src/Provider/Provider.ts delete mode 100644 packages/transport-browser/src/Provider/ProviderClient.ts delete mode 100644 packages/transport-browser/src/Provider/ProviderServer.ts create mode 100644 packages/transport-browser/src/connection.ts create mode 100644 packages/transport-browser/tests/messageChannelMock.ts delete mode 100644 packages/transport-browser/tests/provider.spec.ts create mode 100644 packages/transport-browser/tests/transport.spec.ts diff --git a/packages/transport-browser/jest.config.js b/packages/transport-browser/jest.config.js index a1fe48af..f8963bac 100644 --- a/packages/transport-browser/jest.config.js +++ b/packages/transport-browser/jest.config.js @@ -6,4 +6,5 @@ module.exports = { testPathIgnorePatterns: ['/es/', '/lib/', '/node_modules/'], moduleFileExtensions: ['ts', 'tsx', 'js'], moduleDirectories: ['node_modules', 'app/src'], + setupFilesAfterEnv: ['/tests/messageChannelMock.ts'], }; diff --git a/packages/transport-browser/package.json b/packages/transport-browser/package.json index beac2c3d..b15fb837 100644 --- a/packages/transport-browser/package.json +++ b/packages/transport-browser/package.json @@ -36,6 +36,7 @@ "@scalecube/utils": "^0.2.10", "rsocket-core": "^0.0.16", "rsocket-events-client": "^0.0.22", - "rsocket-events-server": "^0.0.22" + "rsocket-events-server": "^0.0.22", + "rxjs": "^6.4.0" } } diff --git a/packages/transport-browser/src/Provider/Provider.ts b/packages/transport-browser/src/Provider/Provider.ts deleted file mode 100644 index 04e578df..00000000 --- a/packages/transport-browser/src/Provider/Provider.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { Provider } from '@scalecube/rsocket-adapter'; -import { clientFactory } from './ProviderClient'; -import { serverFactory } from './ProviderServer'; - -const serializers = { - data: { - deserialize: (data: any) => data, - serialize: (data: any) => data, - }, - metadata: { - deserialize: (data: any) => data, - serialize: (data: any) => data, - }, -}; - -export const clientProvider: Provider = { - providerFactory: clientFactory, - serializers, - factoryOptions: null, -}; - -export const serverProvider: Provider = { - providerFactory: serverFactory, - serializers, - factoryOptions: null, -}; diff --git a/packages/transport-browser/src/Provider/ProviderClient.ts b/packages/transport-browser/src/Provider/ProviderClient.ts deleted file mode 100644 index 1c551584..00000000 --- a/packages/transport-browser/src/Provider/ProviderClient.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { Address } from '@scalecube/api'; -import { getFullAddress, validateAddress, constants } from '@scalecube/utils'; -// @ts-ignore -import RSocketEventsClient from 'rsocket-events-client'; -import { ProviderFactory } from '@scalecube/rsocket-adapter'; - -export const clientFactory: ProviderFactory = (options: { address: Address; factoryOptions?: any }) => { - const { address, factoryOptions } = options; - - validateAddress(address); - - const { protocol } = address; - - switch (protocol.toLowerCase()) { - case 'pm': - return new RSocketEventsClient({ address: getFullAddress(address) }); - default: - throw Error(constants.NOT_VALID_PROTOCOL); - } -}; diff --git a/packages/transport-browser/src/Provider/ProviderServer.ts b/packages/transport-browser/src/Provider/ProviderServer.ts deleted file mode 100644 index 98b71456..00000000 --- a/packages/transport-browser/src/Provider/ProviderServer.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { Address } from '@scalecube/api'; -import { getFullAddress, validateAddress, constants } from '@scalecube/utils'; -// @ts-ignore -import RSocketEventsServer from 'rsocket-events-server'; -import { ProviderFactory } from '@scalecube/rsocket-adapter'; - -export const serverFactory: ProviderFactory = (options: { address: Address; factoryOptions?: any }) => { - const { address, factoryOptions } = options; - validateAddress(address); - - const { protocol } = address; - switch (protocol.toLowerCase()) { - case 'pm': - return new RSocketEventsServer({ address: getFullAddress(address) }); - default: - throw Error(constants.NOT_VALID_PROTOCOL); - } -}; diff --git a/packages/transport-browser/src/connection.ts b/packages/transport-browser/src/connection.ts new file mode 100644 index 00000000..47535fbe --- /dev/null +++ b/packages/transport-browser/src/connection.ts @@ -0,0 +1,148 @@ +import { connect, listen } from '@scalecube/addressable'; +import { AsyncModel, ServiceCall } from '@scalecube/api/lib/microservice'; +import { Observable } from 'rxjs'; + +export function createConnection() { + const connections: { [address: string]: Promise } = {}; + + function getConnection(addr: string) { + const address = addr + '/transport'; + connections[address] = connections[address] || connect(address); + return connections[address]; + } + function requestResponse(address: string, msg: any) { + return new Promise(async (resolve, reject) => { + const cid = Date.now() + Math.random(); + const con = await getConnection(address); + con.postMessage({ + header: { + cid, + asyncModel: 'requestResponse' as AsyncModel, + }, + msg, + }); + + const to = setTimeout(() => { + con.removeEventListener('message', listener); + reject('timeout'); + }, 5000); + const listener = (ev: { data: any }) => { + if (ev.data.header.cid === cid) { + con.removeEventListener('message', listener); + clearTimeout(to); + if (ev.data.header.error) { + reject(ev.data.header.error); + } + resolve(ev.data.msg); + } + }; + + con.addEventListener('message', listener); + }); + } + function requestStream(address: string, msg: any) { + return new Observable((obs) => { + const cid = Date.now() + Math.random(); + getConnection(address).then((con) => { + const to = setTimeout(() => { + con.removeEventListener('message', listener); + obs.error('timeout'); + }, 5000); + const listener = (ev: { data: any }) => { + if (ev.data.header.cid === cid) { + switch (ev.data.header.type) { + case 'NEXT': + obs.next(ev.data.msg); + break; + case 'COMPLETE': + con.removeEventListener('message', listener); + obs.complete(); + break; + case 'ERROR': + con.removeEventListener('message', listener); + obs.error(ev.data.header.error); + break; + case 'ACK': + clearTimeout(to); + break; + } + } + }; + con.addEventListener('message', listener); + con.postMessage({ + header: { + cid, + asyncModel: 'requestStream' as AsyncModel, + }, + msg, + }); + }); + }); + } + function server(address: string, serviceCall: ServiceCall) { + listen(address + '/transport', (msg, port) => { + console.log(address); + if (msg.data.header && msg.data.header.cid) { + switch (msg.data.header.asyncModel as AsyncModel) { + case 'requestResponse': { + serviceCall + .requestResponse(msg.data.msg) + .then((res) => + port.postMessage({ + header: { + cid: msg.data.header.cid, + }, + msg: res, + }) + ) + .catch((reason) => + port.postMessage({ + header: { + cid: msg.data.header.cid, + error: reason, + }, + }) + ); + break; + } + case 'requestStream': { + port.postMessage({ + header: { + cid: msg.data.header.cid, + type: 'ACK', + }, + }); + serviceCall.requestStream(msg.data.msg).subscribe( + (res) => + port.postMessage({ + header: { + cid: msg.data.header.cid, + type: 'NEXT', + }, + msg: res, + }), + (reason) => + port.postMessage({ + header: { + cid: msg.data.header.cid, + error: reason, + type: 'ERROR', + }, + }), + () => + port.postMessage({ + header: { + cid: msg.data.header.cid, + type: 'COMPLETE', + }, + }) + ); + break; + } + } + } + }); + } + + return { requestResponse, requestStream, server }; +} diff --git a/packages/transport-browser/src/index.ts b/packages/transport-browser/src/index.ts index d5e938df..416987fe 100644 --- a/packages/transport-browser/src/index.ts +++ b/packages/transport-browser/src/index.ts @@ -1,7 +1,33 @@ -import { setupServer, setupClient } from '@scalecube/rsocket-adapter'; -import { clientProvider, serverProvider } from './Provider/Provider'; +import { TransportApi } from '@scalecube/api'; +import { ClientTransportOptions, RequestHandler } from '@scalecube/api/lib/transport'; +import { Message } from '@scalecube/api/lib/microservice'; +import { from, Observable } from 'rxjs'; +import { createConnection } from './connection'; +import { getAddress, getFullAddress } from '@scalecube/utils'; -export const transport = { - clientTransport: setupClient(clientProvider), - serverTransport: setupServer(serverProvider), -}; +const obs = new Observable((o) => {}); + +function createTransport() { + const con = createConnection(); + + return { + clientTransport: { + start: (options: ClientTransportOptions): Promise => { + return Promise.resolve({ + requestResponse: (message) => con.requestResponse(getFullAddress(options.remoteAddress), message), + requestStream: (message) => con.requestStream(getFullAddress(options.remoteAddress), message), + }); + }, + destroy: () => {}, + }, + serverTransport: (options) => { + con.server(getFullAddress(options.localAddress), options.serviceCall); + + return () => { + console.log('server stop not impl'); + }; + }, + } as TransportApi.Transport; +} + +export const transport: TransportApi.Transport = createTransport(); diff --git a/packages/transport-browser/tests/messageChannelMock.ts b/packages/transport-browser/tests/messageChannelMock.ts new file mode 100644 index 00000000..fed2e270 --- /dev/null +++ b/packages/transport-browser/tests/messageChannelMock.ts @@ -0,0 +1,3 @@ +import { mockMessageChannel } from '@scalecube/utils'; + +mockMessageChannel(); diff --git a/packages/transport-browser/tests/provider.spec.ts b/packages/transport-browser/tests/provider.spec.ts deleted file mode 100644 index 1789ae59..00000000 --- a/packages/transport-browser/tests/provider.spec.ts +++ /dev/null @@ -1,120 +0,0 @@ -import { clientProvider, serverProvider } from '../src/Provider/Provider'; -import { constants } from '@scalecube/utils'; -/* tslint:disable */ - -const mockServer = jest.fn(); -const mockClient = jest.fn(); - -jest.mock('rsocket-events-server', () => { - return class RSocketEventsServer { - constructor(data: any) { - mockServer(data); - } - }; -}); - -jest.mock('rsocket-events-client', () => { - return class RSocketEventsClient { - constructor(...data: any) { - mockClient(data); - } - }; -}); - -beforeEach(() => { - mockClient.mockClear(); - mockServer.mockClear(); -}); - -describe(` - Background: rsocket provider is selected base on protocol - Given serverFactory - And clientFactory - And address with path, host, port - `, () => { - const address = { - path: 'path', - host: 'host', - port: 8080, - protocol: '', - fullAddress: '', - }; - - describe.each([ - { - mock: mockServer, - providerFactory: serverProvider.providerFactory, - }, - { - mock: mockClient, - providerFactory: clientProvider.providerFactory, - }, - ])( - ` - Given RSocket provider: - # RSocketServerProvider - server - # RSocketClientProvider - client - `, - ({ mock, providerFactory, options }) => { - test.each(['pm'])( - ` - Scenario: create RSocketServerProvider | RSocketClientProvider - Given protocol - And partial configured address - And rsocket transport provider - When invoking transport provider - Then the provider will be determine by the protocol - `, - (protocol) => { - expect.assertions(1); - - address.protocol = protocol; - address.fullAddress = `${protocol}://${address.host}:${address.port}/${address.path}`; - - providerFactory({ - factoryOptions: null, - address, - }); - - expect(mock).toHaveBeenCalledTimes(1); - } - ); - - test.each([10, [], {}, true, null, undefined, '', 'dfsdf'])( - ` - Scenario: validation check - protocol - Given protocol - - | type | value | - -------------------------------------- - | number | 10 | - | array | [] | - | object | {} | - | boolean | true | - | null | null | - | undefined | undefined | - | empty string | '' | - | not valid protocol | 'dfsdf' | - - And a transport provider Factory - When invoking transport provider - Then error will be thrown - `, - (protocol) => { - expect.assertions(1); - address.protocol = protocol; - address.fullAddress = `${protocol}://${address.host}:${address.port}/${address.path}`; - - try { - serverProvider.providerFactory({ - factoryOptions: null, - address, - }); - } catch (e) { - expect(e.message).toMatch(constants.NOT_VALID_PROTOCOL); - } - } - ); - } - ); -}); diff --git a/packages/transport-browser/tests/transport.spec.ts b/packages/transport-browser/tests/transport.spec.ts new file mode 100644 index 00000000..7a16bc99 --- /dev/null +++ b/packages/transport-browser/tests/transport.spec.ts @@ -0,0 +1,29 @@ +import { transport } from '../src'; +import { getAddress } from '@scalecube/utils'; +import { from } from 'rxjs'; + +describe('transport', () => { + it('should request respond and request stream', async (done) => { + transport.serverTransport({ + localAddress: getAddress('server'), + serviceCall: { + requestResponse: (message) => Promise.resolve('promise'), + requestStream: (message) => from(['obs']), + }, + logger: (msg) => {}, + }); + const client1 = await transport.clientTransport.start({ + remoteAddress: getAddress('server'), + logger: (msg) => {}, + }); + const client2 = await transport.clientTransport.start({ + remoteAddress: getAddress('server'), + logger: (msg) => {}, + }); + + const res1 = await client1.requestResponse({ data: ['hello'], qualifier: 'hello' }); + client1.requestStream({ data: ['hello'], qualifier: 'hello' }).subscribe(() => { + done(); + }); + }); +}); diff --git a/yarn.lock b/yarn.lock index af70ec05..4ef02a23 100644 --- a/yarn.lock +++ b/yarn.lock @@ -684,6 +684,16 @@ dependencies: "@babel/helper-plugin-utils" "^7.10.4" +"@babel/plugin-transform-runtime@^7.11.0": + version "7.11.0" + resolved "https://registry.yarnpkg.com/@babel/plugin-transform-runtime/-/plugin-transform-runtime-7.11.0.tgz#e27f78eb36f19448636e05c33c90fd9ad9b8bccf" + integrity sha512-LFEsP+t3wkYBlis8w6/kmnd6Kb1dxTd+wGJ8MlxTGzQo//ehtqlVL4S9DNUa53+dtPSQobN2CXx4d81FqC58cw== + dependencies: + "@babel/helper-module-imports" "^7.10.4" + "@babel/helper-plugin-utils" "^7.10.4" + resolve "^1.8.1" + semver "^5.5.1" + "@babel/plugin-transform-shorthand-properties@^7.10.4": version "7.10.4" resolved "https://registry.yarnpkg.com/@babel/plugin-transform-shorthand-properties/-/plugin-transform-shorthand-properties-7.10.4.tgz#9fd25ec5cdd555bb7f473e5e6ee1c971eede4dd6" @@ -10669,7 +10679,7 @@ resolve@1.10.1: dependencies: path-parse "^1.0.6" -resolve@1.x, resolve@^1.1.5, resolve@^1.1.6, resolve@^1.10.0, resolve@^1.11.0, resolve@^1.11.1, resolve@^1.14.1, resolve@^1.17.0, resolve@^1.3.2, resolve@^1.4.0: +resolve@1.x, resolve@^1.1.5, resolve@^1.1.6, resolve@^1.10.0, resolve@^1.11.0, resolve@^1.11.1, resolve@^1.14.1, resolve@^1.17.0, resolve@^1.3.2, resolve@^1.4.0, resolve@^1.8.1: version "1.17.0" resolved "https://registry.yarnpkg.com/resolve/-/resolve-1.17.0.tgz#b25941b54968231cc2d1bb76a79cb7f2c0bf8444" integrity sha512-ic+7JYiV8Vi2yzQGFWOkiZD5Z9z7O2Zhm9XMaTxdJExKasieFCr+yXZ/WmXsckHiKl12ar0y6XiXDx3m4RHn1w== From 9f983e95367f8f9fa3943e6fae1397c7f96cd87c Mon Sep 17 00:00:00 2001 From: IdanILT Date: Fri, 28 Aug 2020 00:41:30 +0300 Subject: [PATCH 02/10] fix build and tests --- packages/addressable/package.json | 6 ++++-- packages/browser/rollup.cjs.config.js | 3 +-- packages/browser/rollup.iife.config.js | 2 +- .../tests/integration/remoteCall-positive-scenarios.spec.ts | 6 +++--- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/packages/addressable/package.json b/packages/addressable/package.json index 66c12e68..ca93760d 100755 --- a/packages/addressable/package.json +++ b/packages/addressable/package.json @@ -27,8 +27,11 @@ "doc": "typedoc ./src --out ./doc --mode file --name 'Scalecube API' --hideGenerator --readme ./README.md" }, "author": "Scalecube (https://github.com/scalecube/scalecube-js)", - "dependencies": {}, + "dependencies": { + "@babel/plugin-transform-runtime": "^7.11.0" + }, "devDependencies": { + "@rollup/plugin-typescript": "^5.0.2", "@scalecube/utils": "^0.2.9", "@types/expect-puppeteer": "^4.4.3", "@types/jest-environment-puppeteer": "^4.3.2", @@ -43,7 +46,6 @@ "rollup-plugin-node-resolve": "^5.2.0", "rollup-plugin-replace": "^2.2.0", "rollup-plugin-terser": "^5.3.0", - "@rollup/plugin-typescript": "^5.0.2", "rollup-plugin-uglify-es": "^0.0.1", "rollup-plugin-visualizer": "^2.6.0", "tslint": "^5.11.0", diff --git a/packages/browser/rollup.cjs.config.js b/packages/browser/rollup.cjs.config.js index 905db6fc..ec66ceb1 100644 --- a/packages/browser/rollup.cjs.config.js +++ b/packages/browser/rollup.cjs.config.js @@ -5,7 +5,6 @@ import filesize from 'rollup-plugin-filesize'; import resolve from 'rollup-plugin-node-resolve'; import commonjs from 'rollup-plugin-commonjs'; import pkg from './package.json'; -import replace from 'rollup-plugin-replace'; import babel from 'rollup-plugin-babel'; export default { @@ -28,7 +27,7 @@ export default { }), resolve(), babel({ - plugins: ['@babel/plugin-transform-arrow-functions'], + plugins: ['@babel/plugin-transform-arrow-functions', '@babel/plugin-transform-runtime'], babelrc: false, runtimeHelpers: true, presets: [ diff --git a/packages/browser/rollup.iife.config.js b/packages/browser/rollup.iife.config.js index 7e7c345f..4bd3cf54 100644 --- a/packages/browser/rollup.iife.config.js +++ b/packages/browser/rollup.iife.config.js @@ -26,7 +26,7 @@ export default { }), resolve(), babel({ - plugins: ['@babel/plugin-transform-arrow-functions'], + plugins: ['@babel/plugin-transform-arrow-functions', '@babel/plugin-transform-runtime'], babelrc: true, runtimeHelpers: true, presets: [ diff --git a/packages/scalecube-microservice/tests/integration/remoteCall-positive-scenarios.spec.ts b/packages/scalecube-microservice/tests/integration/remoteCall-positive-scenarios.spec.ts index b81d2c7e..14bce361 100644 --- a/packages/scalecube-microservice/tests/integration/remoteCall-positive-scenarios.spec.ts +++ b/packages/scalecube-microservice/tests/integration/remoteCall-positive-scenarios.spec.ts @@ -88,7 +88,7 @@ describe(`Test positive-scenarios of usage Then successful RequestStream is emitted `, (done) => { expect.assertions(2); - const address = getAddress('createProxy-requestResponse'); + const address = getAddress('createProxy-requestStream'); const microserviceWithoutServices = createMS({ services: [], address, @@ -98,7 +98,7 @@ describe(`Test positive-scenarios of usage const proxy = microserviceWithoutServices.createProxy({ serviceDefinition, router: defaultRouter }); proxy.greet$([defaultUser]).subscribe( - (res: string) => {}, + (_: string) => {}, (e: Error) => { expect(e.message).toMatch( getNotFoundByRouterError(getFullAddress(address), `${serviceDefinition.serviceName}/greet$`) @@ -171,7 +171,7 @@ describe(`Test positive-scenarios of usage const serviceCall = microserviceWithoutServices.createServiceCall({ router: defaultRouter }); serviceCall.requestStream(message).subscribe( - (res: MicroserviceApi.Message) => {}, + (_: MicroserviceApi.Message) => {}, (e: Error) => { expect(e.message).toMatch( getNotFoundByRouterError(getFullAddress(address), `${serviceDefinition.serviceName}/greet$`) From de878fd767c152e73d5864b93768f53c1d1ce704 Mon Sep 17 00:00:00 2001 From: IdanILT Date: Fri, 28 Aug 2020 00:47:55 +0300 Subject: [PATCH 03/10] remove console.log --- packages/transport-browser/src/connection.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/transport-browser/src/connection.ts b/packages/transport-browser/src/connection.ts index 47535fbe..79d2673c 100644 --- a/packages/transport-browser/src/connection.ts +++ b/packages/transport-browser/src/connection.ts @@ -81,7 +81,6 @@ export function createConnection() { } function server(address: string, serviceCall: ServiceCall) { listen(address + '/transport', (msg, port) => { - console.log(address); if (msg.data.header && msg.data.header.cid) { switch (msg.data.header.asyncModel as AsyncModel) { case 'requestResponse': { From 32e513785050d1c549a59002d88a7225313a06bd Mon Sep 17 00:00:00 2001 From: IdanILT Date: Fri, 28 Aug 2020 01:07:16 +0300 Subject: [PATCH 04/10] linting --- packages/transport-browser/src/index.ts | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/packages/transport-browser/src/index.ts b/packages/transport-browser/src/index.ts index 416987fe..e6e90410 100644 --- a/packages/transport-browser/src/index.ts +++ b/packages/transport-browser/src/index.ts @@ -1,11 +1,7 @@ import { TransportApi } from '@scalecube/api'; import { ClientTransportOptions, RequestHandler } from '@scalecube/api/lib/transport'; -import { Message } from '@scalecube/api/lib/microservice'; -import { from, Observable } from 'rxjs'; import { createConnection } from './connection'; -import { getAddress, getFullAddress } from '@scalecube/utils'; - -const obs = new Observable((o) => {}); +import { getFullAddress } from '@scalecube/utils'; function createTransport() { const con = createConnection(); @@ -24,7 +20,7 @@ function createTransport() { con.server(getFullAddress(options.localAddress), options.serviceCall); return () => { - console.log('server stop not impl'); + // console.log('server stop not impl'); }; }, } as TransportApi.Transport; From c5f32c91caabeff1228101b847e4cde4f13a2ce7 Mon Sep 17 00:00:00 2001 From: IdanILT Date: Fri, 28 Aug 2020 02:01:21 +0300 Subject: [PATCH 05/10] Change transport API: RequestHandler -> Invoker --- packages/api/src/transport/Transport.ts | 10 +++++----- packages/api/src/transport/index.ts | 2 +- packages/api/src/transport/tests/transport.spec.ts | 12 ++++++++++++ packages/rsocket-adapter/src/Client/startClient.ts | 2 +- packages/rsocket-adapter/src/Server/createServer.ts | 2 +- packages/rsocket-adapter/tests/validation.spec.ts | 2 +- .../src/ServiceCall/RemoteCall.ts | 4 ++-- packages/transport-browser/src/index.ts | 4 ++-- 8 files changed, 25 insertions(+), 13 deletions(-) create mode 100644 packages/api/src/transport/tests/transport.spec.ts diff --git a/packages/api/src/transport/Transport.ts b/packages/api/src/transport/Transport.ts index c143a818..42f2e7c3 100644 --- a/packages/api/src/transport/Transport.ts +++ b/packages/api/src/transport/Transport.ts @@ -22,9 +22,9 @@ export interface Transport { export interface ClientTransport { /** * @property start - * open connection to remote container and resolve with RequestHandler to call the remote container + * open connection to remote container and resolve with Invoker to call the remote container */ - start: (options: ClientTransportOptions) => Promise; + start: (options: ClientTransportOptions) => Promise; /** * @method destroy * remove open connection to a specific microserivce container @@ -60,7 +60,7 @@ export interface ServerTransportOptions { * @property serviceCall * callback for handling the request */ - serviceCall: RequestHandler; + serviceCall: Invoker; /** * @method logger * add logs to scalecube eco-system @@ -69,9 +69,9 @@ export interface ServerTransportOptions { } /** - * @interface RequestHandler + * @interface Invoker */ -export interface RequestHandler { +export interface Invoker { /** * @method requestResponse * @message - data to pass in the request diff --git a/packages/api/src/transport/index.ts b/packages/api/src/transport/index.ts index 20218df6..7929a593 100644 --- a/packages/api/src/transport/index.ts +++ b/packages/api/src/transport/index.ts @@ -1,7 +1,7 @@ export { ServerTransportOptions, ClientTransportOptions, - RequestHandler, + Invoker, Transport, ServerStop, ClientTransport, diff --git a/packages/api/src/transport/tests/transport.spec.ts b/packages/api/src/transport/tests/transport.spec.ts new file mode 100644 index 00000000..094ef72b --- /dev/null +++ b/packages/api/src/transport/tests/transport.spec.ts @@ -0,0 +1,12 @@ +describe('transport suite', () => { + test('Transport.ClientTransport.start should return Promise to invoker', () => { + // when promise resolved invoker should be ready + }); + test('Transport.ClientTransport.destroy should trigger error for all open invocation', () => {}); + test('Transport.ServerTransport should open a transport server and return destroy function', () => {}); + test('Server destroy function should unsubscribe all streams and emit error for all open requests', () => {}); + test('Invoker.RequestResponse Ping pong', () => {}); + test('Invoker.RequestResponse Ping error', () => {}); + test('Invoker.RequestStream ^-A-B-C-$ test', () => {}); + test('Invoker.RequestStream ^-A-B-C-! test', () => {}); +}); diff --git a/packages/rsocket-adapter/src/Client/startClient.ts b/packages/rsocket-adapter/src/Client/startClient.ts index a1360697..77ce62eb 100644 --- a/packages/rsocket-adapter/src/Client/startClient.ts +++ b/packages/rsocket-adapter/src/Client/startClient.ts @@ -38,7 +38,7 @@ export const setupClient = (configuration: any) => { }; return { - start: async (options: TransportApi.ClientTransportOptions): Promise => { + start: async (options: TransportApi.ClientTransportOptions): Promise => { const { remoteAddress, logger } = options; const socket = await getClientConnection({ diff --git a/packages/rsocket-adapter/src/Server/createServer.ts b/packages/rsocket-adapter/src/Server/createServer.ts index 54425643..c402d016 100644 --- a/packages/rsocket-adapter/src/Server/createServer.ts +++ b/packages/rsocket-adapter/src/Server/createServer.ts @@ -12,7 +12,7 @@ export const createServer = ({ }: { address: Address; serverProvider: Provider; - serviceCall: TransportApi.RequestHandler; + serviceCall: TransportApi.Invoker; }) => { const { factoryOptions, providerFactory } = serverProvider; diff --git a/packages/rsocket-adapter/tests/validation.spec.ts b/packages/rsocket-adapter/tests/validation.spec.ts index 419d96e3..434f2297 100644 --- a/packages/rsocket-adapter/tests/validation.spec.ts +++ b/packages/rsocket-adapter/tests/validation.spec.ts @@ -5,7 +5,7 @@ import { TransportApi } from '@scalecube/api'; import { CLIENT_NOT_IMPL, SERVER_NOT_IMPL } from '../src/helpers/constants'; describe('Test RSocket-adapter validation check', () => { - const serviceCall: TransportApi.RequestHandler = { + const serviceCall: TransportApi.Invoker = { requestStream: (message: any) => of({}), requestResponse: (message: any) => Promise.resolve(), }; diff --git a/packages/scalecube-microservice/src/ServiceCall/RemoteCall.ts b/packages/scalecube-microservice/src/ServiceCall/RemoteCall.ts index 0277a5cc..107973d1 100644 --- a/packages/scalecube-microservice/src/ServiceCall/RemoteCall.ts +++ b/packages/scalecube-microservice/src/ServiceCall/RemoteCall.ts @@ -21,7 +21,7 @@ export const remoteCall = (options: RemoteCallOptions) => { .then((endpoint: MicroserviceApi.Endpoint) => { transportClient .start({ remoteAddress: endpoint.address, logger }) - .then(({ requestStream }: TransportApi.RequestHandler) => { + .then(({ requestStream }: TransportApi.Invoker) => { requestStream(message).subscribe( (data: any) => obs.next(data), (err: Error) => obs.error(err), @@ -39,7 +39,7 @@ export const remoteCall = (options: RemoteCallOptions) => { .then((endpoint: MicroserviceApi.Endpoint) => { transportClient .start({ remoteAddress: endpoint.address, logger }) - .then(({ requestResponse }: TransportApi.RequestHandler) => { + .then(({ requestResponse }: TransportApi.Invoker) => { requestResponse(message) .then((response: any) => resolve(response)) .catch((e: Error) => reject(e)); diff --git a/packages/transport-browser/src/index.ts b/packages/transport-browser/src/index.ts index e6e90410..74674f5b 100644 --- a/packages/transport-browser/src/index.ts +++ b/packages/transport-browser/src/index.ts @@ -1,5 +1,5 @@ import { TransportApi } from '@scalecube/api'; -import { ClientTransportOptions, RequestHandler } from '@scalecube/api/lib/transport'; +import { ClientTransportOptions, Invoker } from '@scalecube/api/lib/transport'; import { createConnection } from './connection'; import { getFullAddress } from '@scalecube/utils'; @@ -8,7 +8,7 @@ function createTransport() { return { clientTransport: { - start: (options: ClientTransportOptions): Promise => { + start: (options: ClientTransportOptions): Promise => { return Promise.resolve({ requestResponse: (message) => con.requestResponse(getFullAddress(options.remoteAddress), message), requestStream: (message) => con.requestStream(getFullAddress(options.remoteAddress), message), From 9d4a5b787f342694d8edb1f8f7ad99e82e68e6c6 Mon Sep 17 00:00:00 2001 From: IdanILT Date: Fri, 28 Aug 2020 03:11:55 +0300 Subject: [PATCH 06/10] bug fix cluster ...members cause sending on() and send() --- packages/cluster-browser/src/Cluster/joinCluster.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/cluster-browser/src/Cluster/joinCluster.ts b/packages/cluster-browser/src/Cluster/joinCluster.ts index 1e1c01ae..1e13341c 100644 --- a/packages/cluster-browser/src/Cluster/joinCluster.ts +++ b/packages/cluster-browser/src/Cluster/joinCluster.ts @@ -31,7 +31,8 @@ function notifyMembersChanged(ctx: Context, send: any, to: string) { DEBUG(ctx, 'notify', to, member); if (member.from !== to && member.sender !== to) { send({ - ...member, + from: member.from, + items: member.items, type: member.type === 'REMOVED' ? 'REMOVED' : 'ADDED', sender: ctx.address, }); From 2e95fafb91bf1d30bf900eed1867fa3e31f1b334 Mon Sep 17 00:00:00 2001 From: IdanILT Date: Fri, 28 Aug 2020 03:28:35 +0300 Subject: [PATCH 07/10] catch rejection of create channel --- packages/addressable/src/boostrap.ts | 42 +++++++++++++++------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/packages/addressable/src/boostrap.ts b/packages/addressable/src/boostrap.ts index c9b73276..2957cead 100755 --- a/packages/addressable/src/boostrap.ts +++ b/packages/addressable/src/boostrap.ts @@ -17,25 +17,29 @@ export function bootstrap(window: any, worker: any) { }; } - // worker - if (typeof worker !== 'undefined') { - const localChannel = new MessageChannel(); - localChannel.port1.start(); - localChannel.port2.start(); - worker.addEventListener('message', server.channelHandler); - localChannel.port2.addEventListener('message', server.channelHandler); - client.createChannel(localChannel.port1.postMessage.bind(localChannel.port1)); - client.createChannel(worker.postMessage.bind(worker)); - // iframe - } else if (window && window.top && window.top !== window.self) { - client.createChannel((msg: any, port: MessagePort) => window.postMessage.bind(window)(msg, '*', port)); - client.createChannel((msg: any, port: MessagePort) => window.top.postMessage.bind(window.top)(msg, '*', port)); - window.addEventListener('message', server.channelHandler); - } - // main - else { - client.createChannel((msg: any, port: MessagePort) => window.postMessage(msg, '*', port)); - window.addEventListener('message', server.channelHandler); + try { + // worker + if (typeof worker !== 'undefined') { + const localChannel = new MessageChannel(); + localChannel.port1.start(); + localChannel.port2.start(); + worker.addEventListener('message', server.channelHandler); + localChannel.port2.addEventListener('message', server.channelHandler); + client.createChannel(localChannel.port1.postMessage.bind(localChannel.port1)); + client.createChannel(worker.postMessage.bind(worker)); + // iframe + } else if (window && window.top && window.top !== window.self) { + client.createChannel((msg: any, port: MessagePort) => window.postMessage.bind(window)(msg, '*', port)); + client.createChannel((msg: any, port: MessagePort) => window.top.postMessage.bind(window.top)(msg, '*', port)); + window.addEventListener('message', server.channelHandler); + } + // main + else { + client.createChannel((msg: any, port: MessagePort) => window.postMessage(msg, '*', port)); + window.addEventListener('message', server.channelHandler); + } + } catch (e) { + // create channel can be failed, it ok, this will catch the rejection } return client; From dc86376a534bfe4132b7e694ef991f4c4c53c3c1 Mon Sep 17 00:00:00 2001 From: IdanILT Date: Fri, 28 Aug 2020 03:45:43 +0300 Subject: [PATCH 08/10] catch promise instead of "try" --- packages/addressable/src/boostrap.ts | 44 +++++++++++++--------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/packages/addressable/src/boostrap.ts b/packages/addressable/src/boostrap.ts index 2957cead..e00c365f 100755 --- a/packages/addressable/src/boostrap.ts +++ b/packages/addressable/src/boostrap.ts @@ -17,29 +17,27 @@ export function bootstrap(window: any, worker: any) { }; } - try { - // worker - if (typeof worker !== 'undefined') { - const localChannel = new MessageChannel(); - localChannel.port1.start(); - localChannel.port2.start(); - worker.addEventListener('message', server.channelHandler); - localChannel.port2.addEventListener('message', server.channelHandler); - client.createChannel(localChannel.port1.postMessage.bind(localChannel.port1)); - client.createChannel(worker.postMessage.bind(worker)); - // iframe - } else if (window && window.top && window.top !== window.self) { - client.createChannel((msg: any, port: MessagePort) => window.postMessage.bind(window)(msg, '*', port)); - client.createChannel((msg: any, port: MessagePort) => window.top.postMessage.bind(window.top)(msg, '*', port)); - window.addEventListener('message', server.channelHandler); - } - // main - else { - client.createChannel((msg: any, port: MessagePort) => window.postMessage(msg, '*', port)); - window.addEventListener('message', server.channelHandler); - } - } catch (e) { - // create channel can be failed, it ok, this will catch the rejection + // worker + if (typeof worker !== 'undefined') { + const localChannel = new MessageChannel(); + localChannel.port1.start(); + localChannel.port2.start(); + worker.addEventListener('message', server.channelHandler); + localChannel.port2.addEventListener('message', server.channelHandler); + client.createChannel(localChannel.port1.postMessage.bind(localChannel.port1)).catch(); + client.createChannel(worker.postMessage.bind(worker)).catch(); + // iframe + } else if (window && window.top && window.top !== window.self) { + client.createChannel((msg: any, port: MessagePort) => window.postMessage.bind(window)(msg, '*', port)).catch(); + client + .createChannel((msg: any, port: MessagePort) => window.top.postMessage.bind(window.top)(msg, '*', port)) + .catch(); + window.addEventListener('message', server.channelHandler); + } + // main + else { + client.createChannel((msg: any, port: MessagePort) => window.postMessage(msg, '*', port)).catch(); + window.addEventListener('message', server.channelHandler); } return client; From 0321797c995c60e52b278d17d42b20f476b6c559 Mon Sep 17 00:00:00 2001 From: IdanILT Date: Fri, 28 Aug 2020 11:37:29 +0300 Subject: [PATCH 09/10] catch promise instead of "try" --- packages/addressable/src/boostrap.ts | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/packages/addressable/src/boostrap.ts b/packages/addressable/src/boostrap.ts index e00c365f..34ea0466 100755 --- a/packages/addressable/src/boostrap.ts +++ b/packages/addressable/src/boostrap.ts @@ -24,19 +24,21 @@ export function bootstrap(window: any, worker: any) { localChannel.port2.start(); worker.addEventListener('message', server.channelHandler); localChannel.port2.addEventListener('message', server.channelHandler); - client.createChannel(localChannel.port1.postMessage.bind(localChannel.port1)).catch(); - client.createChannel(worker.postMessage.bind(worker)).catch(); + client.createChannel(localChannel.port1.postMessage.bind(localChannel.port1)).catch(() => {}); + client.createChannel(worker.postMessage.bind(worker)).catch(() => {}); // iframe } else if (window && window.top && window.top !== window.self) { - client.createChannel((msg: any, port: MessagePort) => window.postMessage.bind(window)(msg, '*', port)).catch(); + client + .createChannel((msg: any, port: MessagePort) => window.postMessage.bind(window)(msg, '*', port)) + .catch(() => {}); client .createChannel((msg: any, port: MessagePort) => window.top.postMessage.bind(window.top)(msg, '*', port)) - .catch(); + .catch(() => {}); window.addEventListener('message', server.channelHandler); } // main else { - client.createChannel((msg: any, port: MessagePort) => window.postMessage(msg, '*', port)).catch(); + client.createChannel((msg: any, port: MessagePort) => window.postMessage(msg, '*', port)).catch(() => {}); window.addEventListener('message', server.channelHandler); } From 1f0a0333a74f4cf7e54eda151bc77c3bb9d8f0b4 Mon Sep 17 00:00:00 2001 From: IdanILT Date: Sat, 29 Aug 2020 02:39:25 +0300 Subject: [PATCH 10/10] catch promise instead of "try" --- packages/addressable/src/ConnectionClient.ts | 29 ++- packages/addressable/src/ConnectionServer.ts | 12 +- packages/addressable/src/api.ts | 8 +- packages/addressable/src/const.ts | 1 + packages/addressable/tests/connection.spec.ts | 15 +- .../api/src/cluster/tests/cluster.spec.ts | 2 +- .../api/src/transport/tests/transport.spec.ts | 186 ++++++++++++++++-- packages/transport-browser/src/connection.ts | 145 ++++++++++---- packages/transport-browser/src/index.ts | 18 +- .../transport-browser/tests/transport.spec.ts | 17 +- 10 files changed, 357 insertions(+), 76 deletions(-) diff --git a/packages/addressable/src/ConnectionClient.ts b/packages/addressable/src/ConnectionClient.ts index f58562e1..8e903bca 100755 --- a/packages/addressable/src/ConnectionClient.ts +++ b/packages/addressable/src/ConnectionClient.ts @@ -19,10 +19,19 @@ export function createConnectionClient(): { listen: api.listen; connect: api.con e.ports[0] ) { debug('incoming server connection'); - e.ports[0].addEventListener('message', (msg) => { + const l = (msg: any) => { debug('invoke', e.data.remoteAddress); listeners[e.data.remoteAddress](msg, e.ports[0]); - }); + }; + e.ports[0].addEventListener('message', l); + const clean = () => { + e.ports[0].removeEventListener('message', l); + e.ports[0].close(); + }; + if (Array.isArray(listeners[e.data.remoteAddress].cleanFns)) { + // @ts-ignore + listeners[e.data.remoteAddress].cleanFns.push(clean); + } e.ports[0].start(); } } @@ -66,9 +75,23 @@ export function createConnectionClient(): { listen: api.listen; connect: api.con }, listen: (addr: string, fn: api.Listener) => { listeners[addr] = fn; - peer.subscribe(({ port }) => { + listeners[addr].cleanFns = []; + + const sub = peer.subscribe(({ port }) => { port.postMessage({ type: EVENT.registerAddress, peerId: peer.id, address: addr }); }); + + return () => { + sub(); + for (const clean of listeners[addr].cleanFns || []) { + clean(); + } + delete listeners[addr]; + const peers = peer.get(); + for (const p in peers) { + peers[p].postMessage({ type: EVENT.unregisterAddress, peerId: peer.id, address: addr }); + } + }; }, connect: (addr: string, to = 5000): Promise => { return new Promise((resolve, reject) => { diff --git a/packages/addressable/src/ConnectionServer.ts b/packages/addressable/src/ConnectionServer.ts index 6883997d..ba1c5520 100755 --- a/packages/addressable/src/ConnectionServer.ts +++ b/packages/addressable/src/ConnectionServer.ts @@ -11,6 +11,14 @@ export function createConnectionServer() { // tslint:disable-next-line:no-console const debug = DEBUG ? (...args: any[]) => console.log('debug', peer.id, ...args) : () => {}; + function handleUnRegisterAddress(e: any) { + if (e.data.type === EVENT.unregisterAddress && e.data.address && e.data.peerId) { + debug('unregister address', e.data.address); + delete addresses[e.data.address]; + return true; + } + return false; + } function handleRegisterAddress(e: any) { if (e.data.type === EVENT.registerAddress && e.data.address && e.data.peerId) { debug('register address', e.data.address); @@ -19,6 +27,7 @@ export function createConnectionServer() { address: e.data.address, peerId: e.data.peerId, }); + return true; } return false; } @@ -40,12 +49,13 @@ export function createConnectionServer() { ]); } }); + return true; } return false; } function eventHandler(e: any) { if (e && e.data) { - handleConnect(e) || handleRegisterAddress(e); + handleConnect(e) || handleRegisterAddress(e) || handleUnRegisterAddress(e); } } diff --git a/packages/addressable/src/api.ts b/packages/addressable/src/api.ts index e4758195..0c43a760 100644 --- a/packages/addressable/src/api.ts +++ b/packages/addressable/src/api.ts @@ -1,3 +1,7 @@ -export type Listener = (msg: any, port: MessagePort) => void; -export type listen = (addr: string, fn: Listener) => void; +type ListenerFn = (msg: any, port: MessagePort) => void; +export interface Listener extends ListenerFn { + cleanFns?: Array<() => void>; +} +type remove = () => void; +export type listen = (addr: string, fn: Listener) => remove; export type connect = (addr: string, timeout?: number) => Promise; diff --git a/packages/addressable/src/const.ts b/packages/addressable/src/const.ts index 738b7273..c3779798 100644 --- a/packages/addressable/src/const.ts +++ b/packages/addressable/src/const.ts @@ -4,6 +4,7 @@ export const EVENT = { addChannel: 'addChannel', channelInit: 'channelInit', registerAddress: 'registerAddress', + unregisterAddress: 'unregisterAddress', connect: 'connect', incomingServerConnection: 'incomingServerConnection', incomingClientConnection: 'incomingClientConnection', diff --git a/packages/addressable/tests/connection.spec.ts b/packages/addressable/tests/connection.spec.ts index 9273cbd9..f4fecf06 100755 --- a/packages/addressable/tests/connection.spec.ts +++ b/packages/addressable/tests/connection.spec.ts @@ -144,8 +144,17 @@ describe('connection suite', () => { port.postMessage('ping'); }); // TODO clean up after close connection - test('close connections', async () => { - // tslint:disable-next-line:no-console - console.log('FEATURE NOT IMPLEMENTED'); + test('close connections', async (done) => { + const { client1, client2 } = createServerAndClients(); + const close = client1.listen('my address', (msg, p) => { + msg.data === 'ping' && p.postMessage('pong'); + }); + const port = await client2.connect('my address'); + port.addEventListener('message', (_) => { + expect(1).toBe(0); + }); + close(); + port.postMessage('ping'); + setTimeout(done, 100); }); }); diff --git a/packages/api/src/cluster/tests/cluster.spec.ts b/packages/api/src/cluster/tests/cluster.spec.ts index c621400e..0d7a4dee 100644 --- a/packages/api/src/cluster/tests/cluster.spec.ts +++ b/packages/api/src/cluster/tests/cluster.spec.ts @@ -2,7 +2,7 @@ import { JoinCluster } from '../index'; import { getAddress } from '@scalecube/utils'; export function clusterSpec(joinCluster: JoinCluster) { - describe('Cluster suite', () => { + describe('Cluster API suite', () => { function createABC(perfix: string) { const clusterA = joinCluster({ address: getAddress(perfix + 'A'), diff --git a/packages/api/src/transport/tests/transport.spec.ts b/packages/api/src/transport/tests/transport.spec.ts index 094ef72b..8da94551 100644 --- a/packages/api/src/transport/tests/transport.spec.ts +++ b/packages/api/src/transport/tests/transport.spec.ts @@ -1,12 +1,176 @@ -describe('transport suite', () => { - test('Transport.ClientTransport.start should return Promise to invoker', () => { - // when promise resolved invoker should be ready +import { Transport } from '../index'; +import { getAddress, getFullAddress } from '@scalecube/utils'; +import { from, interval, throwError } from 'rxjs'; +import { finalize } from 'rxjs/operators'; + +export function transportSpec(transport: Transport) { + function create() { + const client = transport.clientTransport.start({ + logger: (_) => {}, + remoteAddress: getAddress('server'), + }); + transport.serverTransport({ + logger: (_) => {}, + serviceCall: { + requestStream: (message) => + message.qualifier === 'stream' && message.data[0] === '$' + ? from(['A', 'B', 'C']) + : throwError('error stream'), + requestResponse: (message) => + message.qualifier === 'ping' && message.data[0] === 'ping' + ? Promise.resolve('pong') + : Promise.reject('error pong'), + }, + localAddress: getAddress('server'), + }); + + return client; + } + + describe('transport API suite', () => { + test('Transport.ClientTransport.start should return Promise to invoker', async () => { + // when promise resolved invoker should be ready + const t = await transport.clientTransport.start({ + logger: (_) => {}, + remoteAddress: getAddress('server'), + }); + expect(typeof t.requestResponse).toBe('function'); + expect(typeof t.requestStream).toBe('function'); + }); + test('Transport.ClientTransport.destroy should trigger error for all open invocation', async (done) => { + const res: any = []; + const client = await transport.clientTransport.start({ + logger: (_) => {}, + remoteAddress: getAddress('server'), + }); + transport.serverTransport({ + logger: (_) => {}, + serviceCall: { + requestStream: (_) => + interval(1).pipe( + finalize(() => { + res.push('finalize'); + }) + ), + requestResponse: (_) => Promise.resolve(), + }, + localAddress: getAddress('server'), + }); + client.requestStream({ data: ['a'], qualifier: 'q' }).subscribe( + () => {}, + (e) => { + res.push(e); + } + ); + + setTimeout(() => { + transport.clientTransport.destroy({ + address: getFullAddress(getAddress('server')), + logger: (_) => {}, + }); + }, 20); + setTimeout(() => { + expect(res).toEqual(['Transport client shutdown', 'finalize']); + done(); + }, 40); + }); + test('Transport.ServerTransport should open a transport server and return destroy function', () => { + const destroy = transport.serverTransport({ + logger: (_) => {}, + serviceCall: { + requestStream: (_) => from([]), + requestResponse: (_) => Promise.resolve(), + }, + localAddress: getAddress('server'), + }); + expect(typeof destroy).toBe('function'); + }); + test('Server destroy function should unsubscribe all streams and emit error for all open requests', async (done) => { + const res: any = []; + const client = await transport.clientTransport.start({ + logger: (_) => {}, + remoteAddress: getAddress('server1'), + }); + const destroy = transport.serverTransport({ + logger: (_) => {}, + serviceCall: { + requestStream: (_) => + interval(1).pipe( + finalize(() => { + res.push('finalize'); + }) + ), + requestResponse: (_) => Promise.resolve(), + }, + localAddress: getAddress('server1'), + }); + client.requestStream({ data: ['a'], qualifier: 'q' }).subscribe( + () => {}, + (e) => { + res.push(e); + } + ); + + setTimeout(() => { + destroy(); + }, 20); + setTimeout(() => { + expect(res).toEqual(['Transport server shutdown', 'finalize']); + done(); + }, 40); + }); + test('Invoker.RequestResponse Ping pong', async () => { + const client = await create(); + const res = await client.requestResponse({ data: ['ping'], qualifier: 'ping' }); + expect(res).toBe('pong'); + }); + test('Invoker.RequestResponse Ping error', async () => { + const client = await create(); + try { + await client.requestResponse({ data: ['error'], qualifier: 'ping' }); + } catch (e) { + expect(e).toBe('error pong'); + } + }); + test('Invoker.RequestStream ^-A-B-C-$ test', async (done) => { + const client = await create(); + const res: any = []; + + client.requestStream({ data: ['$'], qualifier: 'stream' }).subscribe( + (i) => res.push(i), + (_) => expect(1).toBe(0), + () => { + expect(res).toEqual(['A', 'B', 'C']); + done(); + } + ); + }); + test('Invoker.RequestStream ^-! test', async (done) => { + const client = await create(); + + client.requestStream({ data: ['!'], qualifier: 'stream' }).subscribe( + (_) => expect(1).toBe(0), + (_) => done(), + () => expect(1).toBe(0) + ); + }); + test('Invoker requestStream.unsubscribe should trigger serviceCall unsubscribe', async (done) => { + const client = await transport.clientTransport.start({ + logger: (_) => {}, + remoteAddress: getAddress('server'), + }); + transport.serverTransport({ + logger: (_) => {}, + serviceCall: { + requestStream: (_) => interval(1).pipe(finalize(() => done())), + requestResponse: (_) => Promise.resolve(), + }, + localAddress: getAddress('server'), + }); + + const sub = client.requestStream({ data: ['a'], qualifier: 'q' }).subscribe(); + + setTimeout(() => sub.unsubscribe(), 20); + }); }); - test('Transport.ClientTransport.destroy should trigger error for all open invocation', () => {}); - test('Transport.ServerTransport should open a transport server and return destroy function', () => {}); - test('Server destroy function should unsubscribe all streams and emit error for all open requests', () => {}); - test('Invoker.RequestResponse Ping pong', () => {}); - test('Invoker.RequestResponse Ping error', () => {}); - test('Invoker.RequestStream ^-A-B-C-$ test', () => {}); - test('Invoker.RequestStream ^-A-B-C-! test', () => {}); -}); +} diff --git a/packages/transport-browser/src/connection.ts b/packages/transport-browser/src/connection.ts index 79d2673c..ac3b7b19 100644 --- a/packages/transport-browser/src/connection.ts +++ b/packages/transport-browser/src/connection.ts @@ -1,9 +1,11 @@ import { connect, listen } from '@scalecube/addressable'; import { AsyncModel, ServiceCall } from '@scalecube/api/lib/microservice'; -import { Observable } from 'rxjs'; +import { Observable, Subject, throwError } from 'rxjs'; +import { catchError, takeUntil } from 'rxjs/operators'; -export function createConnection() { +export function createClient() { const connections: { [address: string]: Promise } = {}; + const shutdown$ = new Subject(); function getConnection(addr: string) { const address = addr + '/transport'; @@ -43,7 +45,11 @@ export function createConnection() { function requestStream(address: string, msg: any) { return new Observable((obs) => { const cid = Date.now() + Math.random(); + let cancel: any; getConnection(address).then((con) => { + if (cancel === true) { + return; + } const to = setTimeout(() => { con.removeEventListener('message', listener); obs.error('timeout'); @@ -76,42 +82,106 @@ export function createConnection() { }, msg, }); + const sub = shutdown$.subscribe((addr) => { + if (addr === address) { + obs.error('Transport client shutdown'); + con.postMessage({ + header: { + cid, + asyncModel: 'requestStream' as AsyncModel, + type: 'UNSUBSCRIBE', + }, + }); + con.removeEventListener('message', listener); + clearTimeout(to); + } + }); + cancel = () => { + sub.unsubscribe(); + con.postMessage({ + header: { + cid, + asyncModel: 'requestStream' as AsyncModel, + type: 'UNSUBSCRIBE', + }, + }); + con.removeEventListener('message', listener); + clearTimeout(to); + }; }); + + return () => { + if (typeof cancel === 'function') { + cancel(); + } else { + cancel = true; + } + }; }); } - function server(address: string, serviceCall: ServiceCall) { - listen(address + '/transport', (msg, port) => { - if (msg.data.header && msg.data.header.cid) { - switch (msg.data.header.asyncModel as AsyncModel) { - case 'requestResponse': { - serviceCall - .requestResponse(msg.data.msg) - .then((res) => - port.postMessage({ - header: { - cid: msg.data.header.cid, - }, - msg: res, - }) - ) - .catch((reason) => - port.postMessage({ - header: { - cid: msg.data.header.cid, - error: reason, - }, - }) - ); + function shutdown(address: string) { + shutdown$.next(address); + } + return { requestResponse, requestStream, shutdown }; +} + +export function createServer(address: string, serviceCall: ServiceCall) { + const openSubs: any = {}; + const shutdownSig$: any = new Subject(); + listen(address + '/transport', (msg, port) => { + if (msg.data.header && msg.data.header.cid) { + switch (msg.data.header.asyncModel) { + case 'requestResponse': { + serviceCall + .requestResponse(msg.data.msg) + .then((res) => + port.postMessage({ + header: { + cid: msg.data.header.cid, + }, + msg: res, + }) + ) + .catch((reason) => + port.postMessage({ + header: { + cid: msg.data.header.cid, + error: reason, + }, + }) + ); + break; + } + case 'requestStream': { + if (msg.data.header.type === 'UNSUBSCRIBE') { + openSubs[msg.data.header.cid] && openSubs[msg.data.header.cid].unsubscribe(); break; } - case 'requestStream': { - port.postMessage({ - header: { - cid: msg.data.header.cid, - type: 'ACK', - }, - }); - serviceCall.requestStream(msg.data.msg).subscribe( + port.postMessage({ + header: { + cid: msg.data.header.cid, + type: 'ACK', + }, + }); + openSubs[msg.data.header.cid] = serviceCall + .requestStream(msg.data.msg) + .pipe( + takeUntil( + shutdownSig$.pipe( + catchError((_) => { + port.postMessage({ + header: { + cid: msg.data.header.cid, + error: 'Transport server shutdown', + type: 'ERROR', + }, + }); + return throwError('server shutdown'); + }) + ) + ) + ) + .subscribe( (res) => port.postMessage({ header: { @@ -136,12 +206,11 @@ export function createConnection() { }, }) ); - break; - } + break; } } - }); - } + } + }); - return { requestResponse, requestStream, server }; + return () => shutdownSig$.error('server shutdown'); } diff --git a/packages/transport-browser/src/index.ts b/packages/transport-browser/src/index.ts index 74674f5b..3158df01 100644 --- a/packages/transport-browser/src/index.ts +++ b/packages/transport-browser/src/index.ts @@ -1,27 +1,25 @@ import { TransportApi } from '@scalecube/api'; import { ClientTransportOptions, Invoker } from '@scalecube/api/lib/transport'; -import { createConnection } from './connection'; +import { createClient, createServer } from './connection'; import { getFullAddress } from '@scalecube/utils'; function createTransport() { - const con = createConnection(); + const client = createClient(); return { clientTransport: { start: (options: ClientTransportOptions): Promise => { return Promise.resolve({ - requestResponse: (message) => con.requestResponse(getFullAddress(options.remoteAddress), message), - requestStream: (message) => con.requestStream(getFullAddress(options.remoteAddress), message), + requestResponse: (message) => client.requestResponse(getFullAddress(options.remoteAddress), message), + requestStream: (message) => client.requestStream(getFullAddress(options.remoteAddress), message), }); }, - destroy: () => {}, + destroy: (options) => { + client.shutdown(options.address); + }, }, serverTransport: (options) => { - con.server(getFullAddress(options.localAddress), options.serviceCall); - - return () => { - // console.log('server stop not impl'); - }; + return createServer(getFullAddress(options.localAddress), options.serviceCall); }, } as TransportApi.Transport; } diff --git a/packages/transport-browser/tests/transport.spec.ts b/packages/transport-browser/tests/transport.spec.ts index 7a16bc99..540174ce 100644 --- a/packages/transport-browser/tests/transport.spec.ts +++ b/packages/transport-browser/tests/transport.spec.ts @@ -1,27 +1,30 @@ import { transport } from '../src'; import { getAddress } from '@scalecube/utils'; import { from } from 'rxjs'; +import { transportSpec } from '../../api/src/transport/tests/transport.spec'; describe('transport', () => { + transportSpec(transport); + it('should request respond and request stream', async (done) => { transport.serverTransport({ localAddress: getAddress('server'), serviceCall: { - requestResponse: (message) => Promise.resolve('promise'), - requestStream: (message) => from(['obs']), + requestResponse: (_) => Promise.resolve('promise'), + requestStream: (_) => from(['obs']), }, - logger: (msg) => {}, + logger: (_) => {}, }); const client1 = await transport.clientTransport.start({ remoteAddress: getAddress('server'), - logger: (msg) => {}, + logger: (_) => {}, }); - const client2 = await transport.clientTransport.start({ + await transport.clientTransport.start({ remoteAddress: getAddress('server'), - logger: (msg) => {}, + logger: (_) => {}, }); - const res1 = await client1.requestResponse({ data: ['hello'], qualifier: 'hello' }); + await client1.requestResponse({ data: ['hello'], qualifier: 'hello' }); client1.requestStream({ data: ['hello'], qualifier: 'hello' }).subscribe(() => { done(); });