Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/swift-geese-behave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@graphql-tools/executor': patch
'@graphql-tools/utils': patch
---

In executor, do not use leaking `registerAbortSignalListener`, and handle listeners inside the
execution context
31 changes: 17 additions & 14 deletions packages/executor/src/execution/__tests__/abort-signal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe('Abort Signal', () => {
subscribe() {
return new Repeater(async (push, stop) => {
let i = 0;
stop.then(() => {
stop.finally(() => {
stopped = true;
});

Expand Down Expand Up @@ -150,7 +150,7 @@ describe('Abort Signal', () => {
didInvokeFirstFn = true;
return true;
},
async second() {
second() {
didInvokeSecondFn = true;
controller.abort();
return true;
Expand All @@ -162,18 +162,21 @@ describe('Abort Signal', () => {
},
},
});
const result$ = normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
mutation {
first
second
third
}
`),
signal: controller.signal,
});
expect(result$).rejects.toBeInstanceOf(DOMException);
await expect(
Promise.resolve().then(() =>
normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
mutation {
first
second
third
}
`),
signal: controller.signal,
}),
),
).rejects.toBeInstanceOf(DOMException);
expect(didInvokeFirstFn).toBe(true);
expect(didInvokeSecondFn).toBe(true);
expect(didInvokeThirdFn).toBe(false);
Expand Down
4 changes: 1 addition & 3 deletions packages/executor/src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,7 @@ describe('Execute: stream directive', () => {
],
hasNext: true,
},
{
hasNext: false,
},
{ hasNext: false },
]);
});

Expand Down
72 changes: 54 additions & 18 deletions packages/executor/src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import {
collectFields,
createGraphQLError,
fakePromise,
getAbortPromise,
getArgumentValues,
getDefinedRootType,
GraphQLResolveInfo,
Expand All @@ -52,11 +51,10 @@ import {
Path,
pathToArray,
promiseReduce,
registerAbortSignalListener,
} from '@graphql-tools/utils';
import { TypedDocumentNode } from '@graphql-typed-document-node/core';
import { DisposableSymbols } from '@whatwg-node/disposablestack';
import { handleMaybePromise } from '@whatwg-node/promise-helpers';
import { createDeferredPromise, handleMaybePromise } from '@whatwg-node/promise-helpers';
import { coerceError } from './coerceError.js';
import { flattenAsyncIterable } from './flattenAsyncIterable.js';
import { invariant } from './invariant.js';
Expand Down Expand Up @@ -127,6 +125,8 @@ export interface ExecutionContext<TVariables = any, TContext = any> {
errors: Array<GraphQLError>;
subsequentPayloads: Set<AsyncPayloadRecord>;
signal?: AbortSignal;
onSignalAbort?(handler: () => void): void;
signalPromise?: Promise<never>;
}

export interface FormattedExecutionResult<
Expand Down Expand Up @@ -421,6 +421,8 @@ export function buildExecutionContext<TData = any, TVariables = any, TContext =
signal,
} = args;

signal?.throwIfAborted();

// If the schema used for execution is invalid, throw an error.
assertValidSchema(schema);

Expand Down Expand Up @@ -489,6 +491,31 @@ export function buildExecutionContext<TData = any, TVariables = any, TContext =
return coercedVariableValues.errors;
}

signal?.throwIfAborted();

let onSignalAbort: ExecutionContext['onSignalAbort'];
let signalPromise: ExecutionContext['signalPromise'];

if (signal) {
const listeners = new Set<() => void>();
const signalDeferred = createDeferredPromise<never>();
signalPromise = signalDeferred.promise;
const sharedListener = () => {
signalDeferred.reject(signal.reason);
signal.removeEventListener('abort', sharedListener);
};
signal.addEventListener('abort', sharedListener, { once: true });
signalPromise.catch(() => {
for (const listener of listeners) {
listener();
}
listeners.clear();
});
onSignalAbort = handler => {
listeners.add(handler);
};
}

return {
schema,
fragments,
Expand All @@ -502,6 +529,8 @@ export function buildExecutionContext<TData = any, TVariables = any, TContext =
subsequentPayloads: new Set(),
errors: [],
signal,
onSignalAbort,
signalPromise,
};
}

Expand Down Expand Up @@ -626,7 +655,7 @@ function executeFields(
}
}
} catch (error) {
if (containsPromise) {
if (error !== exeContext.signal?.reason && containsPromise) {
// Ensure that any promises returned by other fields are handled, as they may also reject.
return handleMaybePromise(
() => promiseForObject(results, exeContext.signal),
Expand All @@ -649,7 +678,7 @@ function executeFields(
// Otherwise, results is a map from field name to the result of resolving that
// field, which is possibly a promise. Return a promise that will return this
// same map, but with any promises replaced with the values they resolved to.
return promiseForObject(results, exeContext.signal);
return promiseForObject(results, exeContext.signal, exeContext.signalPromise);
}

/**
Expand Down Expand Up @@ -679,6 +708,7 @@ function executeField(

// Get the resolve function, regardless of if its result is normal or abrupt (error).
try {
exeContext.signal?.throwIfAborted();
// Build a JS object of arguments from the field.arguments AST, using the
// variables scope to fulfill any variable references.
// TODO: find a way to memoize, in case this field is within a List type.
Expand Down Expand Up @@ -973,8 +1003,9 @@ async function completeAsyncIteratorValue(
iterator: AsyncIterator<unknown>,
asyncPayloadRecord?: AsyncPayloadRecord,
): Promise<ReadonlyArray<unknown>> {
if (exeContext.signal && iterator.return) {
registerAbortSignalListener(exeContext.signal, () => {
exeContext.signal?.throwIfAborted();
if (iterator.return) {
exeContext.onSignalAbort?.(() => {
iterator.return?.();
});
}
Expand Down Expand Up @@ -1755,18 +1786,25 @@ function executeSubscription(exeContext: ExecutionContext): MaybePromise<AsyncIt
const result = resolveFn(rootValue, args, contextValue, info);

if (isPromise(result)) {
return result.then(assertEventStream).then(undefined, error => {
throw locatedError(error, fieldNodes, pathToArray(path));
});
return result
.then(result => assertEventStream(result, exeContext.signal, exeContext.onSignalAbort))
.then(undefined, error => {
throw locatedError(error, fieldNodes, pathToArray(path));
});
}

return assertEventStream(result, exeContext.signal);
return assertEventStream(result, exeContext.signal, exeContext.onSignalAbort);
} catch (error) {
throw locatedError(error, fieldNodes, pathToArray(path));
}
}

function assertEventStream(result: unknown, signal?: AbortSignal): AsyncIterable<unknown> {
function assertEventStream(
result: unknown,
signal?: AbortSignal,
onSignalAbort?: (handler: () => void) => void,
): AsyncIterable<unknown> {
signal?.throwIfAborted();
if (result instanceof Error) {
throw result;
}
Expand All @@ -1777,13 +1815,13 @@ function assertEventStream(result: unknown, signal?: AbortSignal): AsyncIterable
'Subscription field must return Async Iterable. ' + `Received: ${inspect(result)}.`,
);
}
if (signal) {
if (onSignalAbort) {
return {
[Symbol.asyncIterator]() {
const asyncIterator = result[Symbol.asyncIterator]();

if (asyncIterator.return) {
registerAbortSignalListener(signal, () => {
onSignalAbort?.(() => {
asyncIterator.return?.();
});
}
Expand Down Expand Up @@ -2110,8 +2148,6 @@ function yieldSubsequentPayloads(
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
let isDone = false;

const abortPromise = exeContext.signal ? getAbortPromise(exeContext.signal) : undefined;

async function next(): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
if (isDone) {
return { value: undefined, done: true };
Expand All @@ -2121,8 +2157,8 @@ function yieldSubsequentPayloads(
record => record.promise,
);

if (abortPromise) {
await Promise.race([abortPromise, ...subSequentPayloadPromises]);
if (exeContext.signalPromise) {
await Promise.race([exeContext.signalPromise, ...subSequentPayloadPromises]);
} else {
await Promise.race(subSequentPayloadPromises);
}
Expand Down
10 changes: 5 additions & 5 deletions packages/executor/src/execution/promiseForObject.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { getAbortPromise, isPromise, MaybePromise } from '@graphql-tools/utils';
import { handleMaybePromise } from '@whatwg-node/promise-helpers';
import { handleMaybePromise, isPromise, MaybePromise } from '@whatwg-node/promise-helpers';

type ResolvedObject<TData> = {
[TKey in keyof TData]: TData[TKey] extends Promise<infer TValue> ? TValue : TData[TKey];
Expand All @@ -15,7 +14,9 @@ type ResolvedObject<TData> = {
export function promiseForObject<TData>(
object: TData,
signal?: AbortSignal,
signalPromise?: Promise<never>,
): MaybePromise<ResolvedObject<TData>> {
signal?.throwIfAborted();
const resolvedObject = Object.create(null);
const promises: Promise<void>[] = [];
for (const key in object) {
Expand All @@ -33,9 +34,8 @@ export function promiseForObject<TData>(
return resolvedObject;
}
const promiseAll = promises.length === 1 ? promises[0] : Promise.all(promises);
if (signal) {
const abortPromise = getAbortPromise(signal);
return Promise.race([abortPromise, promiseAll]).then(() => resolvedObject);
if (signalPromise) {
return Promise.race([signalPromise, promiseAll]).then(() => resolvedObject);
}
return promiseAll.then(() => resolvedObject);
}
6 changes: 5 additions & 1 deletion packages/utils/src/registerAbortSignalListener.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { fakeRejectPromise } from '@whatwg-node/promise-helpers';
import { memoize1 } from './memoize.js';

// AbortSignal handler cache to avoid the "possible EventEmitter memory leak detected"
Expand Down Expand Up @@ -32,8 +33,11 @@ export function registerAbortSignalListener(signal: AbortSignal, listener: () =>
}

export const getAbortPromise = memoize1(function getAbortPromise(signal: AbortSignal) {
// If the signal is already aborted, return a rejected promise
if (signal.aborted) {
return fakeRejectPromise(signal.reason);
}
return new Promise<void>((_resolve, reject) => {
// If the signal is already aborted, return a rejected promise
if (signal.aborted) {
reject(signal.reason);
return;
Expand Down
Loading