Skip to content

Commit

Permalink
feat: raw handlers
Browse files Browse the repository at this point in the history
- Related #500
- Related #501
  • Loading branch information
tegefaulkes committed Feb 14, 2023
1 parent 05e6c6b commit 5426020
Show file tree
Hide file tree
Showing 13 changed files with 598 additions and 223 deletions.
39 changes: 38 additions & 1 deletion src/RPC/RPCClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { StreamPairCreateCallback } from './types';
import type { JsonRpcRequestMessage, StreamPairCreateCallback } from './types';
import type { JSONValue } from 'types';
import type { ReadableWritablePair } from 'stream/web';
import type {
Expand Down Expand Up @@ -49,6 +49,24 @@ class RPCClient {
this.logger.info(`Destroyed ${this.constructor.name}`);
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async rawStreamCaller(
method: string,
params: JSONValue,
): Promise<ReadableWritablePair<Uint8Array, Uint8Array>> {
const streamPair = await this.streamPairCreateCallback();
const tempWriter = streamPair.writable.getWriter();
const header: JsonRpcRequestMessage = {
jsonrpc: '2.0',
method,
params,
id: null,
};
await tempWriter.write(Buffer.from(JSON.stringify(header)));
tempWriter.releaseLock();
return streamPair;
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async duplexStreamCaller<I extends JSONValue, O extends JSONValue>(
method: string,
Expand Down Expand Up @@ -136,6 +154,25 @@ class RPCClient {
return output.value;
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async withRawStreamCaller(
method: string,
params: JSONValue,
f: (output: AsyncGenerator<Uint8Array>) => AsyncGenerator<Uint8Array>,
) {
const callerInterface = await this.rawStreamCaller(method, params);
const outputGenerator = async function* () {
for await (const value of callerInterface.readable) {
yield value;
}
};
const writer = callerInterface.writable.getWriter();
for await (const value of f(outputGenerator())) {
await writer.write(value);
}
await writer.close();
}

@ready(new rpcErrors.ErrorRpcDestroyed())
public async withDuplexCaller<I extends JSONValue, O extends JSONValue>(
method: string,
Expand Down
Loading

0 comments on commit 5426020

Please sign in to comment.