Skip to content

transport browser rewrite #263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions packages/addressable/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
"doc": "typedoc ./src --out ./doc --mode file --name 'Scalecube API' --hideGenerator --readme ./README.md"
},
"author": "Scalecube (https://github.com/scalecube/scalecube-js)",
"dependencies": {},
"dependencies": {
"@babel/plugin-transform-runtime": "^7.11.0"
},
"devDependencies": {
"@rollup/plugin-typescript": "^5.0.2",
"@scalecube/utils": "^0.2.9",
"@types/expect-puppeteer": "^4.4.3",
"@types/jest-environment-puppeteer": "^4.3.2",
Expand All @@ -43,7 +46,6 @@
"rollup-plugin-node-resolve": "^5.2.0",
"rollup-plugin-replace": "^2.2.0",
"rollup-plugin-terser": "^5.3.0",
"@rollup/plugin-typescript": "^5.0.2",
"rollup-plugin-uglify-es": "^0.0.1",
"rollup-plugin-visualizer": "^2.6.0",
"tslint": "^5.11.0",
Expand Down
29 changes: 26 additions & 3 deletions packages/addressable/src/ConnectionClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,19 @@ export function createConnectionClient(): { listen: api.listen; connect: api.con
e.ports[0]
) {
debug('incoming server connection');
e.ports[0].addEventListener('message', (msg) => {
const l = (msg: any) => {
debug('invoke', e.data.remoteAddress);
listeners[e.data.remoteAddress](msg, e.ports[0]);
});
};
e.ports[0].addEventListener('message', l);
const clean = () => {
e.ports[0].removeEventListener('message', l);
e.ports[0].close();
};
if (Array.isArray(listeners[e.data.remoteAddress].cleanFns)) {
// @ts-ignore
listeners[e.data.remoteAddress].cleanFns.push(clean);
}
e.ports[0].start();
}
}
Expand Down Expand Up @@ -66,9 +75,23 @@ export function createConnectionClient(): { listen: api.listen; connect: api.con
},
listen: (addr: string, fn: api.Listener) => {
listeners[addr] = fn;
peer.subscribe(({ port }) => {
listeners[addr].cleanFns = [];

const sub = peer.subscribe(({ port }) => {
port.postMessage({ type: EVENT.registerAddress, peerId: peer.id, address: addr });
});

return () => {
sub();
for (const clean of listeners[addr].cleanFns || []) {
clean();
}
delete listeners[addr];
const peers = peer.get();
for (const p in peers) {
peers[p].postMessage({ type: EVENT.unregisterAddress, peerId: peer.id, address: addr });
}
};
},
connect: (addr: string, to = 5000): Promise<MessagePort> => {
return new Promise((resolve, reject) => {
Expand Down
12 changes: 11 additions & 1 deletion packages/addressable/src/ConnectionServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ export function createConnectionServer() {
// tslint:disable-next-line:no-console
const debug = DEBUG ? (...args: any[]) => console.log('debug', peer.id, ...args) : () => {};

function handleUnRegisterAddress(e: any) {
if (e.data.type === EVENT.unregisterAddress && e.data.address && e.data.peerId) {
debug('unregister address', e.data.address);
delete addresses[e.data.address];
return true;
}
return false;
}
function handleRegisterAddress(e: any) {
if (e.data.type === EVENT.registerAddress && e.data.address && e.data.peerId) {
debug('register address', e.data.address);
Expand All @@ -19,6 +27,7 @@ export function createConnectionServer() {
address: e.data.address,
peerId: e.data.peerId,
});
return true;
}
return false;
}
Expand All @@ -40,12 +49,13 @@ export function createConnectionServer() {
]);
}
});
return true;
}
return false;
}
function eventHandler(e: any) {
if (e && e.data) {
handleConnect(e) || handleRegisterAddress(e);
handleConnect(e) || handleRegisterAddress(e) || handleUnRegisterAddress(e);
}
}

Expand Down
8 changes: 6 additions & 2 deletions packages/addressable/src/api.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
export type Listener = (msg: any, port: MessagePort) => void;
export type listen = (addr: string, fn: Listener) => void;
type ListenerFn = (msg: any, port: MessagePort) => void;
export interface Listener extends ListenerFn {
cleanFns?: Array<() => void>;
}
type remove = () => void;
export type listen = (addr: string, fn: Listener) => remove;
export type connect = (addr: string, timeout?: number) => Promise<MessagePort>;
14 changes: 9 additions & 5 deletions packages/addressable/src/boostrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,21 @@ export function bootstrap(window: any, worker: any) {
localChannel.port2.start();
worker.addEventListener('message', server.channelHandler);
localChannel.port2.addEventListener('message', server.channelHandler);
client.createChannel(localChannel.port1.postMessage.bind(localChannel.port1));
client.createChannel(worker.postMessage.bind(worker));
client.createChannel(localChannel.port1.postMessage.bind(localChannel.port1)).catch(() => {});
client.createChannel(worker.postMessage.bind(worker)).catch(() => {});
// iframe
} else if (window && window.top && window.top !== window.self) {
client.createChannel((msg: any, port: MessagePort) => window.postMessage.bind(window)(msg, '*', port));
client.createChannel((msg: any, port: MessagePort) => window.top.postMessage.bind(window.top)(msg, '*', port));
client
.createChannel((msg: any, port: MessagePort) => window.postMessage.bind(window)(msg, '*', port))
.catch(() => {});
client
.createChannel((msg: any, port: MessagePort) => window.top.postMessage.bind(window.top)(msg, '*', port))
.catch(() => {});
window.addEventListener('message', server.channelHandler);
}
// main
else {
client.createChannel((msg: any, port: MessagePort) => window.postMessage(msg, '*', port));
client.createChannel((msg: any, port: MessagePort) => window.postMessage(msg, '*', port)).catch(() => {});
window.addEventListener('message', server.channelHandler);
}

Expand Down
1 change: 1 addition & 0 deletions packages/addressable/src/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export const EVENT = {
addChannel: 'addChannel',
channelInit: 'channelInit',
registerAddress: 'registerAddress',
unregisterAddress: 'unregisterAddress',
connect: 'connect',
incomingServerConnection: 'incomingServerConnection',
incomingClientConnection: 'incomingClientConnection',
Expand Down
15 changes: 12 additions & 3 deletions packages/addressable/tests/connection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,17 @@ describe('connection suite', () => {
port.postMessage('ping');
});
// TODO clean up after close connection
test('close connections', async () => {
// tslint:disable-next-line:no-console
console.log('FEATURE NOT IMPLEMENTED');
test('close connections', async (done) => {
const { client1, client2 } = createServerAndClients();
const close = client1.listen('my address', (msg, p) => {
msg.data === 'ping' && p.postMessage('pong');
});
const port = await client2.connect('my address');
port.addEventListener('message', (_) => {
expect(1).toBe(0);
});
close();
port.postMessage('ping');
setTimeout(done, 100);
});
});
2 changes: 1 addition & 1 deletion packages/api/src/cluster/tests/cluster.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { JoinCluster } from '../index';
import { getAddress } from '@scalecube/utils';

export function clusterSpec(joinCluster: JoinCluster) {
describe('Cluster suite', () => {
describe('Cluster API suite', () => {
function createABC(perfix: string) {
const clusterA = joinCluster({
address: getAddress(perfix + 'A'),
Expand Down
10 changes: 5 additions & 5 deletions packages/api/src/transport/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ export interface Transport {
export interface ClientTransport {
/**
* @property start
* open connection to remote container and resolve with RequestHandler to call the remote container
* open connection to remote container and resolve with Invoker to call the remote container
*/
start: (options: ClientTransportOptions) => Promise<RequestHandler>;
start: (options: ClientTransportOptions) => Promise<Invoker>;
/**
* @method destroy
* remove open connection to a specific microserivce container
Expand Down Expand Up @@ -60,7 +60,7 @@ export interface ServerTransportOptions {
* @property serviceCall
* callback for handling the request
*/
serviceCall: RequestHandler;
serviceCall: Invoker;
/**
* @method logger
* add logs to scalecube eco-system
Expand All @@ -69,9 +69,9 @@ export interface ServerTransportOptions {
}

/**
* @interface RequestHandler
* @interface Invoker
*/
export interface RequestHandler {
export interface Invoker {
/**
* @method requestResponse
* @message - data to pass in the request
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/transport/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export {
ServerTransportOptions,
ClientTransportOptions,
RequestHandler,
Invoker,
Transport,
ServerStop,
ClientTransport,
Expand Down
176 changes: 176 additions & 0 deletions packages/api/src/transport/tests/transport.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import { Transport } from '../index';
import { getAddress, getFullAddress } from '@scalecube/utils';
import { from, interval, throwError } from 'rxjs';
import { finalize } from 'rxjs/operators';

export function transportSpec(transport: Transport) {
function create() {
const client = transport.clientTransport.start({
logger: (_) => {},
remoteAddress: getAddress('server'),
});
transport.serverTransport({
logger: (_) => {},
serviceCall: {
requestStream: (message) =>
message.qualifier === 'stream' && message.data[0] === '$'
? from(['A', 'B', 'C'])
: throwError('error stream'),
requestResponse: (message) =>
message.qualifier === 'ping' && message.data[0] === 'ping'
? Promise.resolve('pong')
: Promise.reject('error pong'),
},
localAddress: getAddress('server'),
});

return client;
}

describe('transport API suite', () => {
test('Transport.ClientTransport.start should return Promise to invoker', async () => {
// when promise resolved invoker should be ready
const t = await transport.clientTransport.start({
logger: (_) => {},
remoteAddress: getAddress('server'),
});
expect(typeof t.requestResponse).toBe('function');
expect(typeof t.requestStream).toBe('function');
});
test('Transport.ClientTransport.destroy should trigger error for all open invocation', async (done) => {
const res: any = [];
const client = await transport.clientTransport.start({
logger: (_) => {},
remoteAddress: getAddress('server'),
});
transport.serverTransport({
logger: (_) => {},
serviceCall: {
requestStream: (_) =>
interval(1).pipe(
finalize(() => {
res.push('finalize');
})
),
requestResponse: (_) => Promise.resolve(),
},
localAddress: getAddress('server'),
});
client.requestStream({ data: ['a'], qualifier: 'q' }).subscribe(
() => {},
(e) => {
res.push(e);
}
);

setTimeout(() => {
transport.clientTransport.destroy({
address: getFullAddress(getAddress('server')),
logger: (_) => {},
});
}, 20);
setTimeout(() => {
expect(res).toEqual(['Transport client shutdown', 'finalize']);
done();
}, 40);
});
test('Transport.ServerTransport should open a transport server and return destroy function', () => {
const destroy = transport.serverTransport({
logger: (_) => {},
serviceCall: {
requestStream: (_) => from([]),
requestResponse: (_) => Promise.resolve(),
},
localAddress: getAddress('server'),
});
expect(typeof destroy).toBe('function');
});
test('Server destroy function should unsubscribe all streams and emit error for all open requests', async (done) => {
const res: any = [];
const client = await transport.clientTransport.start({
logger: (_) => {},
remoteAddress: getAddress('server1'),
});
const destroy = transport.serverTransport({
logger: (_) => {},
serviceCall: {
requestStream: (_) =>
interval(1).pipe(
finalize(() => {
res.push('finalize');
})
),
requestResponse: (_) => Promise.resolve(),
},
localAddress: getAddress('server1'),
});
client.requestStream({ data: ['a'], qualifier: 'q' }).subscribe(
() => {},
(e) => {
res.push(e);
}
);

setTimeout(() => {
destroy();
}, 20);
setTimeout(() => {
expect(res).toEqual(['Transport server shutdown', 'finalize']);
done();
}, 40);
});
test('Invoker.RequestResponse Ping pong', async () => {
const client = await create();
const res = await client.requestResponse({ data: ['ping'], qualifier: 'ping' });
expect(res).toBe('pong');
});
test('Invoker.RequestResponse Ping error', async () => {
const client = await create();
try {
await client.requestResponse({ data: ['error'], qualifier: 'ping' });
} catch (e) {
expect(e).toBe('error pong');
}
});
test('Invoker.RequestStream ^-A-B-C-$ test', async (done) => {
const client = await create();
const res: any = [];

client.requestStream({ data: ['$'], qualifier: 'stream' }).subscribe(
(i) => res.push(i),
(_) => expect(1).toBe(0),
() => {
expect(res).toEqual(['A', 'B', 'C']);
done();
}
);
});
test('Invoker.RequestStream ^-! test', async (done) => {
const client = await create();

client.requestStream({ data: ['!'], qualifier: 'stream' }).subscribe(
(_) => expect(1).toBe(0),
(_) => done(),
() => expect(1).toBe(0)
);
});
test('Invoker requestStream.unsubscribe should trigger serviceCall unsubscribe', async (done) => {
const client = await transport.clientTransport.start({
logger: (_) => {},
remoteAddress: getAddress('server'),
});
transport.serverTransport({
logger: (_) => {},
serviceCall: {
requestStream: (_) => interval(1).pipe(finalize(() => done())),
requestResponse: (_) => Promise.resolve(),
},
localAddress: getAddress('server'),
});

const sub = client.requestStream({ data: ['a'], qualifier: 'q' }).subscribe();

setTimeout(() => sub.unsubscribe(), 20);
});
});
}
Loading