Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(executor): handle errors from the field subscriber #5187

Merged
merged 4 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/polite-pianos-press.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@graphql-tools/executor': patch
---

Handle errors thrown from the field subscriber
102 changes: 85 additions & 17 deletions packages/executor/src/execution/__tests__/subscribe-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ import {
GraphQLSchema,
} from 'graphql';

import { ExecutionArgs, createSourceEventStream, subscribe } from '../execute.js';
import { ExecutionArgs, subscribe } from '../execute.js';

import { SimplePubSub } from './simplePubSub.js';
import { ExecutionResult, isAsyncIterable, isPromise, MaybePromise } from '@graphql-tools/utils';
// eslint-disable-next-line import/no-extraneous-dependencies
import { makeExecutableSchema } from '@graphql-tools/schema';
import { normalizedExecutor } from '../normalizedExecutor.js';
import { assertAsyncIterable } from '../../../../loaders/url/tests/test-utils.js';

interface Email {
from: string;
Expand Down Expand Up @@ -158,21 +162,6 @@ function expectPromise(maybePromise: unknown) {
};
}

// TODO: consider adding this method to testUtils (with tests)
function expectEqualPromisesOrValues<T>(value1: MaybePromise<T>, value2: MaybePromise<T>): MaybePromise<T> {
if (isPromise(value1)) {
expect(isPromise(value2)).toBeTruthy();
return Promise.all([value1, value2]).then(resolved => {
expectJSON(resolved[1]).toDeepEqual(resolved[0]);
return resolved[0];
});
}

expect(!isPromise(value2)).toBeTruthy();
expectJSON(value2).toDeepEqual(value1);
return value1;
}

const DummyQueryType = new GraphQLObjectType({
name: 'Query',
fields: {
Expand All @@ -196,7 +185,7 @@ function subscribeWithBadFn(subscribeFn: () => unknown): MaybePromise<ExecutionR
}

function subscribeWithBadArgs(args: ExecutionArgs): MaybePromise<ExecutionResult | AsyncIterable<unknown>> {
return expectEqualPromisesOrValues(subscribe(args), createSourceEventStream(args));
return subscribe(args);
}

// Check all error cases when initializing the subscription.
Expand Down Expand Up @@ -1230,4 +1219,83 @@ describe('Subscription Publish Phase', () => {
value: undefined,
});
});

it('should handle errors thrown in the field subscriber', async () => {
const schema = makeExecutableSchema({
typeDefs: /* GraphQL */ `
type Query {
_: String
}
type Subscription {
oneTwoThree: Int
}
`,
resolvers: {
Subscription: {
oneTwoThree: {
async *subscribe() {
yield 1;
yield 2;
throw new Error('test error');
},
resolve: (value: number) => value,
},
},
},
});
const result = await normalizedExecutor({
schema,
document: parse(/* GraphQL */ `
subscription {
oneTwoThree
}
`),
});

assertAsyncIterable(result);

const iterator = result[Symbol.asyncIterator]();

const resultOne = await iterator.next();

expect(resultOne).toEqual({
done: false,
value: {
data: {
oneTwoThree: 1,
},
},
});

const resultTwo = await iterator.next();

expect(resultTwo).toEqual({
done: false,
value: {
data: {
oneTwoThree: 2,
},
},
});

const resultThree = await iterator.next();

expect(JSON.parse(JSON.stringify(resultThree))).toEqual({
done: false,
value: {
errors: [
{
message: 'test error',
locations: [{ line: 2, column: 9 }],
},
],
},
});

const endResult = await iterator.next();

expect(endResult).toEqual({
done: true,
});
});
ardatan marked this conversation as resolved.
Show resolved Hide resolved
});
71 changes: 13 additions & 58 deletions packages/executor/src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1368,68 +1368,23 @@ function mapSourceToResponse(
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
return flattenAsyncIterable(
mapAsyncIterator(resultOrStream[Symbol.asyncIterator](), async (payload: unknown) =>
ensureAsyncIterable(await executeImpl(buildPerEventExecutionContext(exeContext, payload)))
mapAsyncIterator(
resultOrStream[Symbol.asyncIterator](),
async (payload: unknown) =>
ensureAsyncIterable(await executeImpl(buildPerEventExecutionContext(exeContext, payload))),
async function* (error: Error) {
const wrappedError = createGraphQLError(error.message, {
originalError: error,
nodes: [exeContext.operation],
});
yield {
errors: [wrappedError],
};
}
)
);
}

/**
* Implements the "CreateSourceEventStream" algorithm described in the
* GraphQL specification, resolving the subscription source event stream.
*
* Returns a Promise which resolves to either an AsyncIterable (if successful)
* or an ExecutionResult (error). The promise will be rejected if the schema or
* other arguments to this function are invalid, or if the resolved event stream
* is not an async iterable.
*
* If the client-provided arguments to this function do not result in a
* compliant subscription, a GraphQL Response (ExecutionResult) with
* descriptive errors and no data will be returned.
*
* If the the source stream could not be created due to faulty subscription
* resolver logic or underlying systems, the promise will resolve to a single
* ExecutionResult containing `errors` and no `data`.
*
* If the operation succeeded, the promise resolves to the AsyncIterable for the
* event stream returned by the resolver.
*
* A Source Event Stream represents a sequence of events, each of which triggers
* a GraphQL execution for that event.
*
* This may be useful when hosting the stateful subscription service in a
* different process or machine than the stateless GraphQL execution engine,
* or otherwise separating these two steps. For more on this, see the
* "Supporting Subscriptions at Scale" information in the GraphQL specification.
*/
export function createSourceEventStream(
args: ExecutionArgs
): MaybePromise<AsyncIterable<unknown> | SingularExecutionResult> {
// If a valid execution context cannot be created due to incorrect arguments,
// a "Response" with only errors is returned.
const exeContext = buildExecutionContext(args);

// Return early errors if execution context failed.
if (!('schema' in exeContext)) {
return {
errors: exeContext.map(e => {
Object.defineProperty(e, 'extensions', {
value: {
...e.extensions,
http: {
...e.extensions?.['http'],
status: 400,
},
},
});
return e;
}),
};
}

return createSourceEventStreamImpl(exeContext);
}

function createSourceEventStreamImpl(
exeContext: ExecutionContext
): MaybePromise<AsyncIterable<unknown> | SingularExecutionResult> {
Expand Down