Skip to content

Commit

Permalink
feat: handler class using abstract function
Browse files Browse the repository at this point in the history
Related #500

[ci skip]
  • Loading branch information
tegefaulkes committed Feb 10, 2023
1 parent 1b33d12 commit 878b40a
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 124 deletions.
27 changes: 21 additions & 6 deletions src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,27 +87,42 @@ class RPCServer {
}) {
for (const [key, manifestItem] of Object.entries(manifest)) {
if (manifestItem instanceof RawHandler) {
this.registerRawStreamHandler(key, manifestItem.handle);
this.registerRawStreamHandler(
key,
manifestItem.handle.bind(manifestItem),
);
continue;
}
if (manifestItem instanceof DuplexHandler) {
this.registerDuplexStreamHandler(key, manifestItem.handle);
this.registerDuplexStreamHandler(
key,
manifestItem.handle.bind(manifestItem),
);
continue;
}
if (manifestItem instanceof ServerHandler) {
this.registerServerStreamHandler(key, manifestItem.handle);
this.registerServerStreamHandler(
key,
manifestItem.handle.bind(manifestItem),
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(key, manifestItem.handle);
this.registerClientStreamHandler(
key,
manifestItem.handle.bind(manifestItem),
);
continue;
}
if (manifestItem instanceof ClientHandler) {
this.registerClientStreamHandler(key, manifestItem.handle);
this.registerClientStreamHandler(
key,
manifestItem.handle.bind(manifestItem),
);
continue;
}
if (manifestItem instanceof UnaryHandler) {
this.registerUnaryHandler(key, manifestItem.handle);
this.registerUnaryHandler(key, manifestItem.handle.bind(manifestItem));
continue;
}
never();
Expand Down
43 changes: 30 additions & 13 deletions src/RPC/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import type { JSONValue } from 'types';
import type {
ClientHandlerImplementation,
DuplexHandlerImplementation,
RawHandlerImplementation,
ServerHandlerImplementation,
UnaryHandlerImplementation,
ContainerType,
} from 'RPC/types';
import type { ContainerType } from 'RPC/types';
import type { ReadableStream } from 'stream/web';
import type { JsonRpcRequest } from 'RPC/types';
import type { ConnectionInfo } from '../network/types';
import type { ContextCancellable } from '../contexts/types';

abstract class Handler<
Container extends ContainerType = ContainerType,
Expand All @@ -22,39 +19,59 @@ abstract class Handler<
abstract class RawHandler<
Container extends ContainerType = ContainerType,
> extends Handler<Container> {
abstract handle: RawHandlerImplementation;
abstract handle(
input: [ReadableStream<Uint8Array>, JsonRpcRequest],
connectionInfo: ConnectionInfo,
ctx: ContextCancellable,
): ReadableStream<Uint8Array>;
}

abstract class DuplexHandler<
Container extends ContainerType = ContainerType,
Input extends JSONValue = JSONValue,
Output extends JSONValue = JSONValue,
> extends Handler<Container, Input, Output> {
abstract handle: DuplexHandlerImplementation<Input, Output>;
abstract handle(
input: AsyncIterable<Input>,
connectionInfo: ConnectionInfo,
ctx: ContextCancellable,
): AsyncIterable<Output>;
}

abstract class ServerHandler<
Container extends ContainerType = ContainerType,
Input extends JSONValue = JSONValue,
Output extends JSONValue = JSONValue,
> extends Handler<Container, Input, Output> {
abstract handle: ServerHandlerImplementation<Input, Output>;
abstract handle(
input: Input,
connectionInfo: ConnectionInfo,
ctx: ContextCancellable,
): AsyncIterable<Output>;
}

abstract class ClientHandler<
Container extends ContainerType = ContainerType,
Input extends JSONValue = JSONValue,
Output extends JSONValue = JSONValue,
> extends Handler<Container, Input, Output> {
abstract handle: ClientHandlerImplementation<Input, Output>;
abstract handle(
input: AsyncIterable<Input>,
connectionInfo: ConnectionInfo,
ctx: ContextCancellable,
): Promise<Output>;
}

abstract class UnaryHandler<
Container extends ContainerType = ContainerType,
Input extends JSONValue = JSONValue,
Output extends JSONValue = JSONValue,
> extends Handler<Container, Input, Output> {
abstract handle: UnaryHandlerImplementation<Input, Output>;
abstract handle(
input: Input,
connectionInfo: ConnectionInfo,
ctx: ContextCancellable,
): Promise<Output>;
}

export {
Expand Down
8 changes: 2 additions & 6 deletions src/clientRPC/handlers/agentStatus.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { UnaryHandlerImplementation } from '../../RPC/types';
import type KeyRing from '../../keys/KeyRing';
import type CertManager from '../../keys/CertManager';
import type Logger from '@matrixai/logger';
Expand Down Expand Up @@ -29,18 +28,15 @@ class AgentStatusHandler extends UnaryHandler<
WithMetadata,
WithMetadata<StatusResult>
> {
public handle: UnaryHandlerImplementation<
WithMetadata,
WithMetadata<StatusResult>
> = async () => {
public async handle(): Promise<WithMetadata<StatusResult>> {
return {
pid: process.pid,
nodeId: nodesUtils.encodeNodeId(this.container.keyRing.getNodeId()),
publicJwk: JSON.stringify(
keysUtils.publicKeyToJWK(this.container.keyRing.keyPair.publicKey),
),
};
};
}
}

export { AgentStatusHandler, agentStatusCaller };
18 changes: 6 additions & 12 deletions src/clientRPC/handlers/agentUnlock.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { UnaryHandlerImplementation } from '../../RPC/types';
import type Logger from '@matrixai/logger';
import type { WithMetadata } from '../types';
import { UnaryHandler } from '../../RPC/handlers';
Expand All @@ -11,17 +10,12 @@ class AgentUnlockHandler extends UnaryHandler<
WithMetadata,
WithMetadata
> {
public handle: UnaryHandlerImplementation<WithMetadata, WithMetadata> =
async () => {
// This is a NOP handler,
// authentication and unlocking is handled via middleware.
// Failure to authenticate will be an error from the middleware layer.
return {
metadata: {
Authorization: '',
},
};
};
public async handle(): Promise<WithMetadata> {
// This is a NOP handler,
// authentication and unlocking is handled via middleware.
// Failure to authenticate will be an error from the middleware layer.
return {};
}
}

export { agentUnlockCaller, AgentUnlockHandler };
52 changes: 24 additions & 28 deletions tests/RPC/RPC.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
import type {
ClientHandlerImplementation,
ContainerType,
DuplexHandlerImplementation,
JsonRpcRequest,
RawHandlerImplementation,
ServerHandlerImplementation,
UnaryHandlerImplementation,
} from '@/RPC/types';
import type { ContainerType, JsonRpcRequest } from '@/RPC/types';
import type { ConnectionInfo } from '@/network/types';
import type { ReadableStream } from 'stream/web';
import type { JSONValue } from '@/types';
import { fc, testProp } from '@fast-check/jest';
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import RPCServer from '@/RPC/RPCServer';
Expand Down Expand Up @@ -44,10 +38,13 @@ describe('RPC', () => {

let header: JsonRpcRequest | undefined;
class TestMethod extends RawHandler {
public handle: RawHandlerImplementation = ([input, header_]) => {
public handle(
input: [ReadableStream<Uint8Array>, JsonRpcRequest],
): ReadableStream<Uint8Array> {
const [stream, header_] = input;
header = header_;
return input;
};
return stream;
}
}
const rpcServer = await RPCServer.createRPCServer({
manifest: {
Expand Down Expand Up @@ -96,11 +93,11 @@ describe('RPC', () => {
Uint8Array
>();
class TestMethod extends DuplexHandler {
public handle: DuplexHandlerImplementation = async function* (input) {
for await (const val of input) {
yield val;
}
};
public async *handle(
input: AsyncIterable<JSONValue>,
): AsyncIterable<JSONValue> {
yield* input;
}
}
const rpcServer = await RPCServer.createRPCServer({
manifest: {
Expand Down Expand Up @@ -143,12 +140,11 @@ describe('RPC', () => {
>();

class TestMethod extends ServerHandler<ContainerType, number, number> {
public handle: ServerHandlerImplementation<number, number> =
async function* (input) {
for (let i = 0; i < input; i++) {
yield i;
}
};
public async *handle(input: number): AsyncIterable<number> {
for (let i = 0; i < input; i++) {
yield i;
}
}
}

const rpcServer = await RPCServer.createRPCServer({
Expand Down Expand Up @@ -188,15 +184,13 @@ describe('RPC', () => {
>();

class TestMethod extends ClientHandler<ContainerType, number, number> {
public handle: ClientHandlerImplementation<number, number> = async (
input,
) => {
public async handle(input: AsyncIterable<number>): Promise<number> {
let acc = 0;
for await (const number of input) {
acc += number;
}
return acc;
};
}
}
const rpcServer = await RPCServer.createRPCServer({
manifest: {
Expand Down Expand Up @@ -239,7 +233,9 @@ describe('RPC', () => {
>();

class TestMethod extends UnaryHandler {
public handle: UnaryHandlerImplementation = async (input) => input;
public async handle(input: JSONValue): Promise<JSONValue> {
return input;
}
}
const rpcServer = await RPCServer.createRPCServer({
manifest: {
Expand Down
Loading

0 comments on commit 878b40a

Please sign in to comment.