Skip to content

Commit

Permalink
feat: updated client caller interfaces
Browse files Browse the repository at this point in the history
- Related #501

[ci skip]
  • Loading branch information
tegefaulkes committed Feb 14, 2023
1 parent 13ee28c commit 50ff0fa
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 207 deletions.
74 changes: 14 additions & 60 deletions src/RPC/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ import type {
JsonRpcRequestMessage,
StreamPairCreateCallback,
ClientManifest,
MapRawCallers,
} from './types';
import type { JSONValue } from 'types';
import type { ReadableWritablePair, WritableStream } from 'stream/web';
import type {
ReadableWritablePair,
WritableStream,
ReadableStream,
} from 'stream/web';
import type {
JsonRpcRequest,
JsonRpcResponse,
Expand Down Expand Up @@ -78,27 +81,9 @@ class RPCClient<M extends ClientManifest> {
case 'CLIENT':
return () => this.clientStreamCaller(method);
case 'DUPLEX':
return (f) => this.duplexStreamCaller(method, f);
return () => this.duplexStreamCaller(method);
case 'RAW':
default:
return;
}
},
},
);
protected rawMethodsProxy = new Proxy(
{},
{
get: (_, method) => {
if (typeof method === 'symbol') throw never();
switch (this.callerTypes[method]) {
case 'DUPLEX':
return () => this.rawDuplexStreamCaller(method);
case 'RAW':
return (params) => this.rawStreamCaller(method, params);
case 'SERVER':
case 'CLIENT':
case 'UNARY':
return (header) => this.rawStreamCaller(method, header);
default:
return;
}
Expand Down Expand Up @@ -138,17 +123,12 @@ class RPCClient<M extends ClientManifest> {
return this.methodsProxy as MapCallers<M>;
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public get rawMethods(): MapRawCallers<M> {
return this.rawMethodsProxy as MapRawCallers<M>;
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async unaryCaller<I extends JSONValue, O extends JSONValue>(
method: string,
parameters: I,
): Promise<O> {
const callerInterface = await this.rawDuplexStreamCaller<I, O>(method);
const callerInterface = await this.duplexStreamCaller<I, O>(method);
const reader = callerInterface.readable.getReader();
const writer = callerInterface.writable.getWriter();
await writer.write(parameters);
Expand All @@ -165,18 +145,13 @@ class RPCClient<M extends ClientManifest> {
public async serverStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
parameters: I,
): Promise<AsyncIterable<O>> {
const callerInterface = await this.rawDuplexStreamCaller<I, O>(method);
): Promise<ReadableStream<O>> {
const callerInterface = await this.duplexStreamCaller<I, O>(method);
const writer = callerInterface.writable.getWriter();
await writer.write(parameters);
await writer.close();

const outputGen = async function* () {
for await (const value of callerInterface.readable) {
yield value;
}
};
return outputGen();
return callerInterface.readable;
}

@ready(new rpcErrors.ErrorRpcDestroyed())
Expand All @@ -186,7 +161,7 @@ class RPCClient<M extends ClientManifest> {
output: Promise<O>;
writable: WritableStream<I>;
}> {
const callerInterface = await this.rawDuplexStreamCaller<I, O>(method);
const callerInterface = await this.duplexStreamCaller<I, O>(method);
const reader = callerInterface.readable.getReader();
const output = reader.read().then(({ value, done }) => {
if (done) {
Expand All @@ -203,27 +178,6 @@ class RPCClient<M extends ClientManifest> {
@ready(new rpcErrors.ErrorRpcDestroyed())
public async duplexStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
f: (output: AsyncIterable<O>) => AsyncIterable<I>,
): Promise<void> {
const callerInterface = await this.rawDuplexStreamCaller<I, O>(method);
const outputGenerator = async function* () {
for await (const value of callerInterface.readable) {
yield value;
}
};
const writer = callerInterface.writable.getWriter();
try {
for await (const value of f(outputGenerator())) {
await writer.write(value);
}
} finally {
await writer.close();
}
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async rawDuplexStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
): Promise<ReadableWritablePair<O, I>> {
const outputMessageTransformStream = clientOutputTransformStream<O>();
const inputMessageTransformStream = clientInputTransformStream<I>(method);
Expand All @@ -249,14 +203,14 @@ class RPCClient<M extends ClientManifest> {
@ready(new rpcErrors.ErrorRpcDestroyed())
public async rawStreamCaller(
method: string,
params: JSONValue,
headerParams: JSONValue,
): Promise<ReadableWritablePair<Uint8Array, Uint8Array>> {
const streamPair = await this.streamPairCreateCallback();
const tempWriter = streamPair.writable.getWriter();
const header: JsonRpcRequestMessage = {
jsonrpc: '2.0',
method,
params,
params: headerParams,
id: null,
};
await tempWriter.write(Buffer.from(JSON.stringify(header)));
Expand Down
24 changes: 2 additions & 22 deletions src/RPC/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ type UnaryCallerImplementation<
type ServerCallerImplementation<
I extends JSONValue = JSONValue,
O extends JSONValue = JSONValue,
> = (parameters: I) => Promise<AsyncIterable<O>>;
> = (parameters: I) => Promise<ReadableStream<O>>;

type ClientCallerImplementation<
I extends JSONValue = JSONValue,
Expand All @@ -166,17 +166,10 @@ type ClientCallerImplementation<
type DuplexCallerImplementation<
I extends JSONValue = JSONValue,
O extends JSONValue = JSONValue,
> = (f: (output: AsyncIterable<O>) => AsyncIterable<I>) => Promise<void>;

// Raw callers

type RawDuplexCallerImplementation<
I extends JSONValue = JSONValue,
O extends JSONValue = JSONValue,
> = () => Promise<ReadableWritablePair<O, I>>;

type RawCallerImplementation = (
params: JSONValue,
headerParams: JSONValue,
) => Promise<ReadableWritablePair<Uint8Array, Uint8Array>>;

type ConvertDuplexCaller<T> = T extends DuplexCaller<infer I, infer O>
Expand All @@ -203,14 +196,6 @@ type ConvertCaller<T extends Caller> = T extends DuplexCaller
? ConvertClientCaller<T>
: T extends UnaryCaller
? ConvertUnaryCaller<T>
: never;

type ConvertRawDuplexStreamHandler<T> = T extends DuplexCaller<infer I, infer O>
? RawDuplexCallerImplementation<I, O>
: never;

type ConvertRawCaller<T> = T extends DuplexCaller
? ConvertRawDuplexStreamHandler<T>
: T extends RawCaller
? RawCallerImplementation
: never;
Expand All @@ -224,10 +209,6 @@ type MapCallers<T extends ClientManifest> = {
[K in keyof T]: ConvertCaller<T[K]>;
};

type MapRawCallers<T extends ClientManifest> = {
[K in keyof T]: ConvertRawCaller<T[K]>;
};

export type {
JsonRpcRequestMessage,
JsonRpcRequestNotification,
Expand All @@ -250,5 +231,4 @@ export type {
ClientManifest,
HandlerType,
MapCallers,
MapRawCallers,
};
4 changes: 2 additions & 2 deletions tests/RPC/RPC.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ describe('RPC', () => {
logger,
});

const callerInterface = await rpcClient.rawMethods.testMethod({
const callerInterface = await rpcClient.methods.testMethod({
hello: 'world',
});
const writer = callerInterface.writable.getWriter();
Expand Down Expand Up @@ -116,7 +116,7 @@ describe('RPC', () => {
logger,
});

const callerInterface = await rpcClient.rawMethods.testMethod();
const callerInterface = await rpcClient.methods.testMethod();
const writer = callerInterface.writable.getWriter();
const reader = callerInterface.readable.getReader();
for (const value of values) {
Expand Down
Loading

0 comments on commit 50ff0fa

Please sign in to comment.