Skip to content

Commit

Permalink
fix: event handler leak in wrapRpc function
Browse files Browse the repository at this point in the history
Closes: #723
  • Loading branch information
piotr-oles committed Apr 13, 2022
1 parent cb327ee commit 414fa2e
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 51 deletions.
113 changes: 62 additions & 51 deletions src/rpc/wrap-rpc.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { ChildProcess } from 'child_process';
import * as process from 'process';

import { createControlledPromise } from '../utils/async/controlled-promise';

import { RpcExitError } from './rpc-error';
import type { RpcRemoteMethod, RpcMessage } from './types';
Expand All @@ -17,61 +18,71 @@ export function wrapRpc<T extends (...args: any[]) => any>(

const id = uuid();

const resultPromise = new Promise((resolve, reject) => {
const handleMessage = (message: RpcMessage) => {
if (message.id === id) {
if (message.type === 'resolve') {
resolve(message.value);
unsubscribe();
} else if (message.type === 'reject') {
reject(message.error);
unsubscribe();
}
}
};
const handleClose = (code: string | number | null, signal: string | null) => {
reject(
new RpcExitError(
code
? `Process ${process.pid} exited with code "${code}" [${signal}]`
: `Process ${process.pid} exited [${signal}].`,
code,
signal
)
);
unsubscribe();
};
// create promises
const {
promise: resultPromise,
resolve: resolveResult,
reject: rejectResult,
} = createControlledPromise<T>();
const {
promise: sendPromise,
resolve: resolveSend,
reject: rejectSend,
} = createControlledPromise<void>();

const subscribe = () => {
childProcess.on('message', handleMessage);
childProcess.on('close', handleClose);
};
const unsubscribe = () => {
childProcess.off('message', handleMessage);
childProcess.off('exit', handleClose);
};
const handleMessage = (message: RpcMessage) => {
if (message?.id === id) {
if (message.type === 'resolve') {
// assume the contract is respected
resolveResult(message.value as T);
removeHandlers();
} else if (message.type === 'reject') {
rejectResult(message.error);
removeHandlers();
}
}
};
const handleClose = (code: string | number | null, signal: string | null) => {
rejectResult(
new RpcExitError(
code
? `Process ${childProcess.pid} exited with code ${code}` +
(signal ? ` [${signal}]` : '')
: `Process ${childProcess.pid} exited` + (signal ? ` [${signal}]` : ''),
code,
signal
)
);
removeHandlers();
};

subscribe();
});
// to prevent event handler leaks
const removeHandlers = () => {
childProcess.off('message', handleMessage);
childProcess.off('close', handleClose);
};

await new Promise<void>((resolve, reject) => {
childProcess.send(
{
type: 'call',
id,
args,
},
(error) => {
if (error) {
reject(error);
} else {
resolve(undefined);
}
// add event listeners
childProcess.on('message', handleMessage);
childProcess.on('close', handleClose);
// send call message
childProcess.send(
{
type: 'call',
id,
args,
},
(error) => {
if (error) {
rejectSend(error);
removeHandlers();
} else {
resolveSend(undefined);
}
);
});
}
);

return resultPromise;
return sendPromise.then(() => resultPromise);
}) as RpcRemoteMethod<T>;
}

Expand Down
16 changes: 16 additions & 0 deletions src/utils/async/controlled-promise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
function createControlledPromise<T = unknown>() {
let resolve: (value: T) => void = () => undefined;
let reject: (error: unknown) => void = () => undefined;
const promise = new Promise<T>((aResolve, aReject) => {
resolve = aResolve;
reject = aReject;
});

return {
promise,
resolve,
reject,
};
}

export { createControlledPromise };
167 changes: 167 additions & 0 deletions test/unit/rpc/wrap-rpc.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import type { ChildProcess } from 'child_process';

import { RpcExitError, wrapRpc } from '../../../src/rpc';

describe('wrapRpc', () => {
let childProcessMock: ChildProcess;
let eventHandlers: Record<string, Array<(...args: unknown[]) => void>>;
let messageIds: string[];

beforeEach(() => {
eventHandlers = {};
messageIds = [];
childProcessMock = {
connected: true,
pid: 1234,
send: jest.fn((message, callback) => {
messageIds.push(message?.id);
callback();
}),
on: jest.fn((name, handlerToAdd) => {
if (!eventHandlers[name]) {
eventHandlers[name] = [];
}
eventHandlers[name].push(handlerToAdd);
}),
off: jest.fn((name, handlerToRemove) => {
if (!eventHandlers[name]) {
return;
}
eventHandlers[name] = eventHandlers[name].filter((handler) => handler !== handlerToRemove);
}),
// we don't have to implement all methods - it would take a lot of code to do so
} as unknown as ChildProcess;
});

it('returns new functions without adding event handlers', () => {
const wrapped = wrapRpc(childProcessMock);
expect(wrapped).toBeInstanceOf(Function);
expect(eventHandlers).toEqual({});
});

it("throws an error if child process doesn't have IPC channels", async () => {
childProcessMock.send = undefined;
const wrapped = wrapRpc(childProcessMock);
await expect(wrapped()).rejects.toEqual(new Error("Process 1234 doesn't have IPC channels"));
expect(eventHandlers).toEqual({});
});

it("throws an error if child process doesn't have open IPC channels", async () => {
// @ts-expect-error We're using mock here :)
childProcessMock.connected = false;
const wrapped = wrapRpc(childProcessMock);
await expect(wrapped()).rejects.toEqual(
new Error("Process 1234 doesn't have open IPC channels")
);
expect(eventHandlers).toEqual({});
});

it('sends a call message', async () => {
const wrapped = wrapRpc(childProcessMock);
wrapped('foo', 1234);
expect(childProcessMock.send).toHaveBeenCalledWith(
{
type: 'call',
id: expect.any(String),
args: ['foo', 1234],
},
expect.any(Function)
);
expect(eventHandlers).toEqual({
message: [expect.any(Function)],
close: [expect.any(Function)],
});
});

it('ignores invalid message', async () => {
const wrapped = wrapRpc<() => void>(childProcessMock);
wrapped();
expect(messageIds).toEqual([expect.any(String)]);
expect(eventHandlers['message']).toEqual([expect.any(Function)]);
const triggerMessage = eventHandlers['message'][0];

triggerMessage(undefined);
triggerMessage('test');
triggerMessage({});
triggerMessage({ id: 'test' });

expect(eventHandlers).toEqual({
message: [expect.any(Function)],
close: [expect.any(Function)],
});
});

it('resolves on valid resolve message', async () => {
const wrapped = wrapRpc<() => void>(childProcessMock);
const promise = wrapped();
expect(messageIds).toEqual([expect.any(String)]);
expect(eventHandlers['message']).toEqual([expect.any(Function)]);
const triggerMessage = eventHandlers['message'][0];
const id = messageIds[0];

triggerMessage({
id,
type: 'resolve',
value: 41,
});

expect(promise).resolves.toEqual(41);
expect(eventHandlers).toEqual({
message: [],
close: [],
});
});

it('rejects on valid reject message', async () => {
const wrapped = wrapRpc<() => void>(childProcessMock);
const promise = wrapped();
expect(messageIds).toEqual([expect.any(String)]);
expect(eventHandlers['message']).toEqual([expect.any(Function)]);
const triggerMessage = eventHandlers['message'][0];
const id = messageIds[0];

triggerMessage({
id,
type: 'reject',
error: 'sad error',
});

expect(promise).rejects.toEqual('sad error');
expect(eventHandlers).toEqual({
message: [],
close: [],
});
});

it('rejects on send error', async () => {
(childProcessMock.send as jest.Mock).mockImplementation((message, callback) =>
callback(new Error('cannot send'))
);
const wrapped = wrapRpc<() => void>(childProcessMock);

expect(wrapped()).rejects.toEqual(new Error('cannot send'));
expect(eventHandlers).toEqual({
message: [],
close: [],
});
});

it.each([
{ code: 100, signal: 'SIGINT', message: 'Process 1234 exited with code 100 [SIGINT]' },
{ code: -1, signal: undefined, message: 'Process 1234 exited with code -1' },
{ code: undefined, signal: undefined, message: 'Process 1234 exited' },
])('rejects on process close with %p', async ({ code, signal, message }) => {
const wrapped = wrapRpc<() => void>(childProcessMock);
const promise = wrapped();
expect(eventHandlers['close']).toEqual([expect.any(Function)]);
const triggerClose = eventHandlers['close'][0];

triggerClose(code, signal);

expect(promise).rejects.toEqual(new RpcExitError(message, code, signal));
expect(eventHandlers).toEqual({
message: [],
close: [],
});
});
});

0 comments on commit 414fa2e

Please sign in to comment.