Skip to content

Commit

Permalink
fix: handler receives signal if output stream fails
Browse files Browse the repository at this point in the history
[ci skip]
  • Loading branch information
tegefaulkes committed Mar 24, 2023
1 parent a6f69f2 commit 043066d
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
13 changes: 11 additions & 2 deletions src/rpc/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ class RPCServer extends EventTarget {
connectionInfo,
ctx,
) => {
// Setting up abort controller
const abortController = new AbortController();
if (ctx.signal.aborted) abortController.abort(ctx.signal.reason);
ctx.signal.addEventListener('abort', () => {
abortController.abort(ctx.signal.reason);
});
const signal = abortController.signal;
// Setting up middleware
const middleware = this.middlewareFactory();
// Forward from the client to the server
Expand All @@ -214,14 +221,14 @@ class RPCServer extends EventTarget {
const reverseStream = middleware.reverse.writable;
// Generator derived from handler
const outputGen = async function* (): AsyncGenerator<JSONRPCResponse> {
if (ctx.signal.aborted) throw ctx.signal.reason;
if (signal.aborted) throw signal.reason;
// Input generator derived from the forward stream
const inputGen = async function* (): AsyncIterable<I> {
for await (const data of forwardStream) {
yield data.params as I;
}
};
const handlerG = handler(inputGen(), connectionInfo, ctx);
const handlerG = handler(inputGen(), connectionInfo, { signal });
for await (const response of handlerG) {
const responseMessage: JSONRPCResponseResult = {
jsonrpc: '2.0',
Expand Down Expand Up @@ -271,6 +278,8 @@ class RPCServer extends EventTarget {
),
}),
);
// Abort with the reason
abortController.abort(reason);
// If the output stream path fails then we need to end the generator
// early.
await outputGenerator.return(undefined);
Expand Down
5 changes: 5 additions & 0 deletions src/rpc/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ abstract class DuplexHandler<
Input extends JSONValue = JSONValue,
Output extends JSONValue = JSONValue,
> extends Handler<Container, Input, Output> {
/**
* Note that if the output has an error, the handler will not see this as an
* error. If you need to handle any clean up it should be handled in a
* `finally` block and check the abort signal for potential errors.
*/
abstract handle(
input: AsyncIterable<Input>,
connectionInfo: ConnectionInfo,
Expand Down
10 changes: 8 additions & 2 deletions tests/rpc/RPCServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,13 +506,15 @@ describe(`${RPCServer.name}`, () => {
},
{ numRuns: 1 },
);
testProp.only(
testProp(
'should emit stream error if output stream fails',
[specificMessageArb],
async (messages) => {
const handlerEndedProm = promise();
let ctx: ContextCancellable | undefined;
class TestMethod extends DuplexHandler {
public async *handle(input): AsyncIterable<JSONValue> {
public async *handle(input, _, _ctx): AsyncIterable<JSONValue> {
ctx = _ctx;
// Echo input
try {
yield* input;
Expand Down Expand Up @@ -564,6 +566,10 @@ describe(`${RPCServer.name}`, () => {
expect(event.detail.cause).toBe(readerReason);
// Check that the handler was cleaned up.
await expect(handlerEndedProm.p).toResolve();
// Check that an abort signal happened
expect(ctx).toBeDefined();
expect(ctx?.signal.aborted).toBeTrue();
expect(ctx?.signal.reason).toBe(readerReason);
await rpcServer.destroy();
},
{ numRuns: 1 },
Expand Down

0 comments on commit 043066d

Please sign in to comment.