This repository has been archived by the owner on May 22, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 33
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* passing tests * made changes
- Loading branch information
Showing
6 changed files
with
267 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import { Identity } from '../../identity'; | ||
import Transport, { Message } from '../../transport/transport'; | ||
|
||
const idOrResult = (func: (...args: any[]) => any) => (...args: any[] ) => { | ||
const res = func(...args); | ||
return res === undefined ? args[1] : res; | ||
}; | ||
|
||
//tslint:disable-next-line | ||
export interface ServiceIdentity extends Identity {} | ||
|
||
export type Action = (() => any) | ||
| ((payload: any) => any) | ||
| ((payload: any, id: ServiceIdentity) => any); | ||
export type Middleware = (() => any) | ||
| ((action: string) => any) | ||
| ((action: string, payload: any) => any) | ||
| ((action: string, payload: any, id: ServiceIdentity) => any); | ||
|
||
export interface ServiceMessagePayload extends Identity { | ||
action: string; | ||
payload: any; | ||
} | ||
|
||
export class ServiceChannel { | ||
protected subscriptions: any; | ||
public defaultAction: (action?: string, payload?: any, senderIdentity?: ServiceIdentity) => any; | ||
private preAction: (...args: any[]) => any; | ||
private postAction: (...args: any[]) => any; | ||
private errorMiddleware: (...args: any[]) => any; | ||
private defaultSet: boolean; | ||
protected send: (to: Identity, action: string, payload: any) => Promise<Message<void>>; | ||
|
||
constructor (send: Transport['sendAction']) { | ||
this.defaultSet = false; | ||
this.subscriptions = new Map<string, () => any>(); | ||
this.defaultAction = () => { | ||
throw new Error('No action registered'); | ||
}; | ||
this.send = async (to: Identity, action: string, payload: any) => { | ||
const raw = await send('send-service-message', { ...to, action, payload }).catch(reason => { | ||
throw new Error(reason.message); | ||
}); | ||
return raw.payload.data.result; | ||
}; | ||
} | ||
|
||
public async processAction(action: string, payload: any, senderIdentity: ServiceIdentity) { | ||
try { | ||
const mainAction = this.subscriptions.has(action) | ||
? this.subscriptions.get(action) | ||
: (payload: any, id: ServiceIdentity) => this.defaultAction(action, payload, id); | ||
const preActionProcessed = this.preAction ? await this.preAction(action, payload, senderIdentity) : payload; | ||
const actionProcessed = await mainAction(preActionProcessed, senderIdentity); | ||
return this.postAction | ||
? await this.postAction(action, actionProcessed, senderIdentity) | ||
: actionProcessed; | ||
} catch (e) { | ||
if (this.errorMiddleware) { | ||
return this.errorMiddleware(action, e, senderIdentity); | ||
} throw e; | ||
} | ||
} | ||
|
||
public beforeAction(func: Action) { | ||
if (this.preAction) { | ||
throw new Error('Already registered beforeAction middleware'); | ||
} | ||
this.preAction = idOrResult(func); | ||
} | ||
|
||
public onError(func: (e: any, action: string, id: Identity) => any) { | ||
if (this.errorMiddleware) { | ||
throw new Error('Already registered error middleware'); | ||
} | ||
this.errorMiddleware = func; | ||
} | ||
|
||
public afterAction(func: Action) { | ||
if (this.postAction) { | ||
throw new Error('Already registered afterAction middleware'); | ||
} | ||
this.postAction = idOrResult(func); | ||
} | ||
|
||
public remove(action: string): void { | ||
this.subscriptions.delete(action); | ||
} | ||
|
||
public setDefaultAction(func: (action?: string, payload?: any, senderIdentity?: ServiceIdentity) => any): void { | ||
if (this.defaultSet) { | ||
throw new Error('default action can only be set once'); | ||
} else { | ||
this.defaultAction = func; | ||
this.defaultSet = true; | ||
} | ||
} | ||
|
||
public register(topic: string, listener: Action) { | ||
if (this.subscriptions.has(topic)) { | ||
throw new Error(`Subscription already registered for action: ${topic}. Unsubscribe before adding new subscription`); | ||
} else { | ||
this.subscriptions.set(topic, listener); | ||
return true; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
import { ServiceChannel, ServiceIdentity } from './channel'; | ||
import Transport from '../../transport/transport'; | ||
|
||
export class Client extends ServiceChannel { | ||
public onServiceDisconnect: (f: () => void) => void; | ||
constructor(private identity: ServiceIdentity, send: Transport['sendAction']) { | ||
super(send); | ||
} | ||
|
||
public async dispatch(action: string, payload: any): Promise<any> { | ||
return this.send(this.identity, action, payload); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
import { Client } from './client'; | ||
import { Identity } from '../../identity'; | ||
import { Provider } from './provider'; | ||
import { Base } from '../base'; | ||
import Transport, { Message, Payload } from '../../transport/transport'; | ||
|
||
export interface Options { | ||
wait?: boolean; | ||
uuid: string; | ||
payload?: any; | ||
} | ||
|
||
export interface ServicePayload { | ||
payload: Payload; | ||
} | ||
export interface ServiceMessage extends Message<any> { | ||
senderIdentity: Identity; | ||
ackToSender: any; | ||
serviceIdentity: Identity; | ||
connectAction: boolean; | ||
} | ||
|
||
export class Service extends Base { | ||
private serviceMap: Map<string, Provider | Client>; | ||
constructor(wire: Transport) { | ||
super(wire); | ||
this.serviceMap = new Map(); | ||
wire.registerMessageHandler(this.onmessage.bind(this)); | ||
} | ||
|
||
public async onServiceConnect(identity: Identity, listener: EventListener): Promise<void> { | ||
this.registerEventListener({ | ||
topic: 'service', | ||
type: 'connected', | ||
...identity | ||
}); | ||
this.on('connected', listener); | ||
} | ||
|
||
public async connect(options: Options): Promise<Client> { | ||
try { | ||
const { payload: { data: serviceIdentity } } = await this.wire.sendAction('send-service-message', Object.assign({ | ||
connectAction: true, | ||
wait: true | ||
}, options)); | ||
const channel = new Client(serviceIdentity, this.wire.sendAction.bind(this.wire)); | ||
channel.onServiceDisconnect = (listener: () => void) => { | ||
this.registerEventListener({ | ||
topic: 'service', | ||
type: 'disconnected', | ||
...serviceIdentity | ||
}); | ||
this.on('disconnected', listener); | ||
}; | ||
this.serviceMap.set(serviceIdentity.uuid, channel); | ||
return channel; | ||
} catch (e) { | ||
throw new Error(e.message); | ||
} | ||
} | ||
|
||
public async register(): Promise<Provider> { | ||
const { payload: { data: serviceIdentity } } = await this.wire.sendAction('register-service', {}); | ||
const channel = new Provider(this.wire.sendAction.bind(this.wire)); | ||
this.serviceMap.set(serviceIdentity.uuid, channel); | ||
return channel; | ||
} | ||
public onmessage = (msg: ServiceMessage) => { | ||
if (msg.action === 'process-service-action') { | ||
this.processServiceMessage(msg); | ||
return true; | ||
} | ||
return false; | ||
} | ||
private async processServiceMessage (msg: ServiceMessage) { | ||
const { senderIdentity, serviceIdentity, action, ackToSender, payload, connectAction} = msg.payload; | ||
const bus = this.serviceMap.get(serviceIdentity.uuid); | ||
try { | ||
let res; | ||
if (!bus) { | ||
return; | ||
} | ||
if (connectAction) { | ||
if (!(bus instanceof Provider)) { | ||
throw Error('Cannot connect to a plugin'); | ||
} | ||
res = await bus.processConnection(senderIdentity, payload); | ||
} else { | ||
res = await bus.processAction(action, payload, senderIdentity); | ||
} | ||
ackToSender.payload.payload = ackToSender.payload.payload || {}; | ||
ackToSender.payload.payload.result = res; | ||
this.wire.sendRaw(ackToSender); | ||
} catch (e) { | ||
ackToSender.success = false; | ||
ackToSender.reason = e.message; | ||
this.wire.sendRaw(ackToSender); | ||
} | ||
} | ||
|
||
} | ||
|
||
interface PluginSubscribeSuccess { | ||
uuid: string; | ||
name: string; | ||
serviceName: string; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
import { ServiceChannel, ServiceIdentity } from './channel'; | ||
import Transport from '../../transport/transport'; | ||
|
||
export type ConnectionListener = (adapterIdentity: ServiceIdentity, connectionMessage?: any) => any; | ||
|
||
export class Provider extends ServiceChannel { | ||
private connectListener: ConnectionListener; | ||
private connections: ServiceIdentity[]; | ||
|
||
constructor(send: Transport['sendAction']) { | ||
super(send); | ||
this.connectListener = () => undefined; | ||
this.connections = []; | ||
} | ||
|
||
public dispatch(to: ServiceIdentity, action: string, payload: any): Promise<any> { | ||
return this.send(to, action, payload); | ||
} | ||
|
||
public async processConnection(senderId: ServiceIdentity, payload: any) { | ||
this.connections.push(senderId); | ||
return this.connectListener(senderId, payload); | ||
} | ||
|
||
public publish(action: string, payload: any): Promise<any>[] { | ||
return this.connections.map(to => this.send(to, action, payload)); | ||
} | ||
|
||
public onConnection(listener: ConnectionListener): void { | ||
this.connectListener = listener; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters