Skip to content

Commit

Permalink
RUN-4677 - Channel lifecycle (HadoukenIO#203)
Browse files Browse the repository at this point in the history
* disconnect and destroy

* undo gruntfile

* lifecycle calls

* add error messages for timing issue on destroy and disconnect
  • Loading branch information
tgoc99 authored and rdepena committed Nov 27, 2018
1 parent 4a2ea86 commit fdabe25
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 4 deletions.
5 changes: 4 additions & 1 deletion js-adapter/src/api/interappbus/channel/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export interface ChannelMessagePayload extends Identity {
}

export class ChannelBase {
protected removeChannel: (mapKey: string) => void;
protected subscriptions: any;
public defaultAction: (action?: string, payload?: any, senderIdentity?: ProviderIdentity) => any;
private preAction: (...args: any[]) => any;
Expand All @@ -35,6 +36,7 @@ export class ChannelBase {
private defaultSet: boolean;
protected send: (to: Identity, action: string, payload: any) => Promise<Message<void>>;
protected providerIdentity: ProviderIdentity;
protected sendRaw: Transport['sendAction'];

constructor (providerIdentity: ProviderIdentity, send: Transport['sendAction']) {
this.defaultSet = false;
Expand All @@ -43,6 +45,7 @@ export class ChannelBase {
this.defaultAction = () => {
throw new Error('No action registered');
};
this.sendRaw = send;
this.send = async (to: Identity, action: string, payload: any) => {
const raw = await send('send-channel-message', { ...to, providerIdentity: this.providerIdentity, action, payload })
.catch(reason => {
Expand Down Expand Up @@ -77,7 +80,7 @@ export class ChannelBase {
this.preAction = idOrResult(func);
}

public onError(func: (e: any, action: string, id: Identity) => any) {
public onError(func: (action: string, error: any, id: Identity) => any) {
if (this.errorMiddleware) {
throw new Error('Already registered error middleware');
}
Expand Down
16 changes: 16 additions & 0 deletions js-adapter/src/api/interappbus/channel/client.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
import { ChannelBase, ProviderIdentity } from './channel';
import Transport from '../../../transport/transport';

type DisconnectionListener = (providerIdentity: ProviderIdentity) => any;

export class ChannelClient extends ChannelBase {
private disconnectListener: DisconnectionListener;

constructor(providerIdentity: ProviderIdentity, send: Transport['sendAction']) {
super(providerIdentity, send);
this.disconnectListener = () => undefined;
}

public async dispatch(action: string, payload?: any): Promise<any> {
return this.send(this.providerIdentity, action, payload);
}

public onDisconnection(listener: DisconnectionListener): void {
this.disconnectListener = listener;
}

public async disconnect(): Promise<void> {
const { channelName } = this.providerIdentity;
await this.sendRaw('disconnect-from-channel', { channelName });
const { channelId } = this.providerIdentity;
this.removeChannel(channelId);
}
}
24 changes: 22 additions & 2 deletions js-adapter/src/api/interappbus/channel/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ export class Channel extends EmitterBase<ChannelEvents> {
const channel = new ChannelClient(providerIdentity, this.wire.sendAction.bind(this.wire));
const key = providerIdentity.channelId;
this.channelMap.set(key, channel);
//@ts-ignore use of protected property
channel.removeChannel = this.removeChannelFromMap.bind(this);
this.on('disconnected', (eventPayload: ProviderIdentity) => {
if (eventPayload.channelName === channelName) {
this.removeChannelFromMap(key);
//@ts-ignore use of private property
channel.disconnectListener(eventPayload);
}
});
return channel;
} catch (e) {
const shouldWait: boolean = Object.assign({ wait: true }, opts).wait;
Expand All @@ -95,6 +104,8 @@ export class Channel extends EmitterBase<ChannelEvents> {
const channel = new ChannelProvider(providerIdentity, this.wire.sendAction.bind(this.wire));
const key = providerIdentity.channelId;
this.channelMap.set(key, channel);
//@ts-ignore use of protected property
channel.removeChannel = this.removeChannelFromMap.bind(this);
this.on('client-disconnected', (eventPayload: ProviderIdentity) => {
if (eventPayload.channelName === channelName) {
channel.connections = channel.connections.filter(identity => {
Expand All @@ -106,6 +117,11 @@ export class Channel extends EmitterBase<ChannelEvents> {
});
return channel;
}

protected removeChannelFromMap(mapKey: string) {
this.channelMap.delete(mapKey);
}

public onmessage = (msg: ChannelMessage) => {
if (msg.action === 'process-channel-message') {
this.processChannelMessage(msg);
Expand All @@ -121,7 +137,9 @@ export class Channel extends EmitterBase<ChannelEvents> {
const key = providerIdentity.channelId;
const bus = this.channelMap.get(key);
if (!bus) {
return;
ackToSender.payload.success = false;
ackToSender.payload.reason = `Client connection with identity ${JSON.stringify(this.wire.me)} no longer connected.`;
return this.wire.sendRaw(ackToSender);
}
try {
const res = await bus.processAction(action, payload, senderIdentity);
Expand All @@ -139,7 +157,9 @@ export class Channel extends EmitterBase<ChannelEvents> {
const key = providerIdentity.channelId;
const bus = this.channelMap.get(key);
if (!bus) {
return;
ackToSender.payload.success = false;
ackToSender.payload.reason = `Channel "${providerIdentity.channelName}" has been destroyed.`;
return this.wire.sendRaw(ackToSender);
}
try {
if (!(bus instanceof ChannelProvider)) {
Expand Down
10 changes: 9 additions & 1 deletion js-adapter/src/api/interappbus/channel/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Transport from '../../../transport/transport';
import { Identity } from '../../../main';

export type ConnectionListener = (identity: Identity, connectionMessage?: any) => any;
export type DisconnectionListener = (identity: Identity) => any;

export class ChannelProvider extends ChannelBase {
private connectListener: ConnectionListener;
Expand Down Expand Up @@ -33,8 +34,15 @@ export class ChannelProvider extends ChannelBase {
this.connectListener = listener;
}

public onDisconnection(listener: ConnectionListener): void {
public onDisconnection(listener: DisconnectionListener): void {
this.disconnectListener = listener;
}

public async destroy(): Promise<void> {
const { channelName } = this.providerIdentity;
await this.sendRaw('destroy-channel', { channelName });
const { channelId } = this.providerIdentity;
this.removeChannel(channelId);
}

}

0 comments on commit fdabe25

Please sign in to comment.