Skip to content

Commit

Permalink
wip: client sand server integration tests
Browse files Browse the repository at this point in the history
Related #500
Related #501

[ci skip]
  • Loading branch information
tegefaulkes committed Jan 19, 2023
1 parent 1e89eba commit 4bf3ae0
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/RPC/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,9 @@ class RPCServer {
throw Error('TMP Stream closed early');
}
const method = leadingMetadataMessage.value.method;
const _metadata = leadingMetadataMessage.value.params;
const initialParams = leadingMetadataMessage.value.params;
const dataGen = async function* () {
yield initialParams as JSONValue;
for await (const data of input) {
yield data.params as JSONValue;
}
Expand Down
175 changes: 175 additions & 0 deletions tests/RPC/RPC.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import type {
ClientStreamHandler,
DuplexStreamHandler,
ServerStreamHandler,
UnaryHandler,
} from '@/RPC/types';
import type { ConnectionInfo } from '@/network/types';
import type { JSONValue } from '@/types';
import { fc, testProp } from '@fast-check/jest';
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import RPCServer from '@/RPC/RPCServer';
import RPCClient from '@/RPC/RPCClient';
import * as rpcTestUtils from './utils';

describe('RPC', () => {
const logger = new Logger(`RPC Test`, LogLevel.WARN, [new StreamHandler()]);

const methodName = 'testMethod';

testProp(
'RPC communication with duplex stream',
[fc.array(rpcTestUtils.safeJsonValueArb, { minLength: 1 })],
async (values) => {
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
Buffer,
Buffer
>();

const container = {};
const rpc = await RPCServer.createRPCServer({ container, logger });

const duplexHandler: DuplexStreamHandler<JSONValue, JSONValue> =
async function* (input, _container, _connectionInfo, _ctx) {
for await (const val of input) {
yield val;
}
};

rpc.registerDuplexStreamHandler(methodName, duplexHandler);
rpc.handleStream(serverPair, {} as ConnectionInfo);

const rpcClient = await RPCClient.createRPCClient({
streamPairCreateCallback: async () => clientPair,
logger,
});

const callerInterface = await rpcClient.duplexStreamCaller(
methodName,
{},
);
for (const value of values) {
await callerInterface.write(value);
expect((await callerInterface.read()).value).toStrictEqual(value);
}
await callerInterface.end();
expect((await callerInterface.read()).value).toBeUndefined();
expect((await callerInterface.read()).done).toBeTrue();
},
);

testProp(
'RPC communication with client stream',
[fc.integer({ min: 1, max: 100 })],
async (value) => {
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
Buffer,
Buffer
>();

const container = {};
const rpc = await RPCServer.createRPCServer({ container, logger });

const serverStreamHandler: ServerStreamHandler<number, number> =
async function* (input, _container, _connectionInfo, _ctx) {
for (let i = 0; i < input; i++) {
yield i;
}
};

rpc.registerServerStreamHandler(methodName, serverStreamHandler);
rpc.handleStream(serverPair, {} as ConnectionInfo);

const rpcClient = await RPCClient.createRPCClient({
streamPairCreateCallback: async () => clientPair,
logger,
});

const callerInterface = await rpcClient.serverStreamCaller<
number,
number
>(methodName, value, {});

const outputs: Array<number> = [];
for await (const num of callerInterface.outputGenerator) {
outputs.push(num);
}
expect(outputs.length).toEqual(value);
},
{ numRuns: 1 },
);

testProp(
'RPC communication with server stream',
[fc.array(fc.integer(), { minLength: 1 })],
async (values) => {
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
Buffer,
Buffer
>();

const container = {};
const rpc = await RPCServer.createRPCServer({ container, logger });

const clientStreamhandler: ClientStreamHandler<number, number> = async (
input,
) => {
let acc = 0;
for await (const number of input) {
acc += number;
}
return acc;
};
rpc.registerClientStreamHandler(methodName, clientStreamhandler);
rpc.handleStream(serverPair, {} as ConnectionInfo);

const rpcClient = await RPCClient.createRPCClient({
streamPairCreateCallback: async () => clientPair,
logger,
});

const callerInterface = await rpcClient.clientStreamCaller<
number,
number
>(methodName, {});
for (const value of values) {
await callerInterface.write(value);
}
await callerInterface.end();

const expectedResult = values.reduce((p, c) => p + c);
await expect(callerInterface.result).resolves.toEqual(expectedResult);
},
);

testProp(
'RPC communication with unary call',
[rpcTestUtils.safeJsonValueArb],
async (value) => {
const { clientPair, serverPair } = rpcTestUtils.createTapPairs<
Buffer,
Buffer
>();

const container = {};
const rpc = await RPCServer.createRPCServer({ container, logger });

const unaryCaller: UnaryHandler<JSONValue, JSONValue> = async (input) =>
input;
rpc.registerUnaryHandler(methodName, unaryCaller);
rpc.handleStream(serverPair, {} as ConnectionInfo);

const rpcClient = await RPCClient.createRPCClient({
streamPairCreateCallback: async () => clientPair,
logger,
});

const result = await rpcClient.unaryCaller<JSONValue, JSONValue>(
methodName,
value,
{},
);
expect(result).toStrictEqual(value);
},
);
});
50 changes: 50 additions & 0 deletions tests/RPC/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type {
Transformer,
TransformerFlushCallback,
TransformerTransformCallback,
ReadableWritablePair,
} from 'stream/web';
import type { POJO } from '@/types';
import type {
Expand Down Expand Up @@ -229,6 +230,53 @@ function streamToArray<T>(): [Promise<Array<T>>, WritableStream<T>] {
return [result.p, outputStream];
}

class TapTransformer<I> implements Transformer<I, I> {
protected iteration = 0;

constructor(
protected tapCallback: (chunk: I, iteration: number) => Promise<void>,
) {}

transform: TransformerTransformCallback<I, I> = async (chunk, controller) => {
await this.tapCallback(chunk, this.iteration);
controller.enqueue(chunk);
this.iteration += 1;
};
}

type TapCallback<T> = (chunk: T, iteration: number) => Promise<void>;

/**
* This is used to convert regular chunks into randomly sized chunks based on
* a provided pattern. This is to replicate randomness introduced by packets
* splitting up the data.
*/
class TapTransformerStream<I> extends TransformStream {
constructor(tapCallback: TapCallback<I> = async () => {}) {
super(new TapTransformer<I>(tapCallback));
}
}

function createTapPairs<A, B>(
forwardTapCallback: TapCallback<A> = async () => {},
reverseTapCallback: TapCallback<B> = async () => {},
) {
const forwardTap = new TapTransformerStream<A>(forwardTapCallback);
const reverseTap = new TapTransformerStream<B>(reverseTapCallback);
const clientPair: ReadableWritablePair = {
readable: reverseTap.readable,
writable: forwardTap.writable,
};
const serverPair: ReadableWritablePair = {
readable: forwardTap.readable,
writable: reverseTap.writable,
};
return {
clientPair,
serverPair,
};
}

export {
BufferStreamToSnippedStream,
BufferStreamToNoisyStream,
Expand All @@ -245,4 +293,6 @@ export {
snippingPatternArb,
jsonMessagesArb,
streamToArray,
TapTransformerStream,
createTapPairs,
};

0 comments on commit 4bf3ae0

Please sign in to comment.