diff --git a/src/subscription/__tests__/asyncIteratorReject-test.js b/src/subscription/__tests__/asyncIteratorReject-test.js new file mode 100644 index 0000000000..1d620e8f6a --- /dev/null +++ b/src/subscription/__tests__/asyncIteratorReject-test.js @@ -0,0 +1,41 @@ +/** + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import asyncIteratorReject from '../asyncIteratorReject'; + +describe('asyncIteratorReject', () => { + + it('creates a failing async iterator', async () => { + const error = new Error('Oh no, Mr. Bill!'); + const iter = asyncIteratorReject(error); + + let caughtError; + try { + await iter.next(); + } catch (thrownError) { + caughtError = thrownError; + } + expect(caughtError).to.equal(error); + + expect(await iter.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('can be closed before failing', async () => { + const error = new Error('Oh no, Mr. Bill!'); + const iter = asyncIteratorReject(error); + + // Close iterator + expect(await iter.return()).to.deep.equal({ done: true, value: undefined }); + + expect(await iter.next()).to.deep.equal({ done: true, value: undefined }); + }); +}); diff --git a/src/subscription/__tests__/mapAsyncIterator-test.js b/src/subscription/__tests__/mapAsyncIterator-test.js index bd96d5159a..657538669e 100644 --- a/src/subscription/__tests__/mapAsyncIterator-test.js +++ b/src/subscription/__tests__/mapAsyncIterator-test.js @@ -194,6 +194,53 @@ describe('mapAsyncIterator', () => { ).to.deep.equal({ value: undefined, done: true }); }); + it('does not normally map over thrown errors', async () => { + async function* source() { + yield 'Hello'; + throw new Error('Goodbye'); + } + + const doubles = mapAsyncIterator(source(), x => x + x); + + expect( + await doubles.next() + ).to.deep.equal({ value: 'HelloHello', done: false }); + + let caughtError; + try { + await doubles.next(); + } catch (e) { + caughtError = e; + } + expect(caughtError && caughtError.message).to.equal('Goodbye'); + }); + + it('maps over thrown errors if second callback provided', async () => { + async function* source() { + yield 'Hello'; + throw new Error('Goodbye'); + } + + const doubles = mapAsyncIterator( + source(), + x => x + x, + error => error + ); + + expect( + await doubles.next() + ).to.deep.equal({ value: 'HelloHello', done: false }); + + const result = await doubles.next(); + expect(result.value).to.be.instanceof(Error); + expect(result.value && result.value.message).to.equal('Goodbye'); + expect(result.done).to.equal(false); + + expect( + await doubles.next() + ).to.deep.equal({ value: undefined, done: true }); + }); + async function testClosesSourceWithMapper(mapper) { let didVisitFinally = false; @@ -251,4 +298,45 @@ describe('mapAsyncIterator', () => { }); }); + async function testClosesSourceWithRejectMapper(mapper) { + async function* source() { + yield 1; + throw new Error(2); + } + + const throwOver1 = mapAsyncIterator(source(), x => x, mapper); + + expect( + await throwOver1.next() + ).to.deep.equal({ value: 1, done: false }); + + let expectedError; + try { + await throwOver1.next(); + } catch (error) { + expectedError = error; + } + + expect(expectedError).to.be.an('error'); + if (expectedError) { + expect(expectedError.message).to.equal('Cannot count to 2'); + } + + expect( + await throwOver1.next() + ).to.deep.equal({ value: undefined, done: true }); + } + + it('closes source if mapper throws an error', async () => { + await testClosesSourceWithRejectMapper(error => { + throw new Error('Cannot count to ' + error.message); + }); + }); + + it('closes source if mapper rejects', async () => { + await testClosesSourceWithRejectMapper(async error => { + throw new Error('Cannot count to ' + error.message); + }); + }); + }); diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js index 2b3a4ac67a..ce21b11f0f 100644 --- a/src/subscription/__tests__/subscribe-test.js +++ b/src/subscription/__tests__/subscribe-test.js @@ -22,115 +22,135 @@ import { GraphQLString, } from '../../type'; +const EmailType = new GraphQLObjectType({ + name: 'Email', + fields: { + from: { type: GraphQLString }, + subject: { type: GraphQLString }, + message: { type: GraphQLString }, + unread: { type: GraphQLBoolean }, + } +}); -describe('Subscribe', () => { - - const EmailType = new GraphQLObjectType({ - name: 'Email', - fields: { - from: { type: GraphQLString }, - subject: { type: GraphQLString }, - message: { type: GraphQLString }, - unread: { type: GraphQLBoolean }, - } - }); - - const InboxType = new GraphQLObjectType({ - name: 'Inbox', - fields: { - total: { - type: GraphQLInt, - resolve: inbox => inbox.emails.length, - }, - unread: { - type: GraphQLInt, - resolve: inbox => inbox.emails.filter(email => email.unread).length, - }, - emails: { type: new GraphQLList(EmailType) }, - } - }); +const InboxType = new GraphQLObjectType({ + name: 'Inbox', + fields: { + total: { + type: GraphQLInt, + resolve: inbox => inbox.emails.length, + }, + unread: { + type: GraphQLInt, + resolve: inbox => inbox.emails.filter(email => email.unread).length, + }, + emails: { type: new GraphQLList(EmailType) }, + } +}); - const QueryType = new GraphQLObjectType({ - name: 'Query', - fields: { - inbox: { type: InboxType }, - } - }); +const QueryType = new GraphQLObjectType({ + name: 'Query', + fields: { + inbox: { type: InboxType }, + } +}); - const EmailEventType = new GraphQLObjectType({ - name: 'EmailEvent', - fields: { - email: { type: EmailType }, - inbox: { type: InboxType }, - } - }); +const EmailEventType = new GraphQLObjectType({ + name: 'EmailEvent', + fields: { + email: { type: EmailType }, + inbox: { type: InboxType }, + } +}); - const SubscriptionType = new GraphQLObjectType({ - name: 'Subscription', - fields: { - importantEmail: { type: EmailEventType }, - } - }); +const emailSchema = emailSchemaWithResolvers(); - const emailSchema = new GraphQLSchema({ +function emailSchemaWithResolvers(subscribeFn, resolveFn) { + return new GraphQLSchema({ query: QueryType, - subscription: SubscriptionType - }); - - function createSubscription(pubsub, schema = emailSchema) { - const data = { - inbox: { - emails: [ - { - from: 'joe@graphql.org', - subject: 'Hello', - message: 'Hello World', - unread: false, - }, - ], + subscription: new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { + type: EmailEventType, + resolve: resolveFn, + subscribe: subscribeFn, + // TODO: remove + args: { + priority: {type: GraphQLInt} + } + }, }, - importantEmail() { - return eventEmitterAsyncIterator(pubsub, 'importantEmail'); + }) + }); +} + +async function createSubscription(pubsub, schema = emailSchema, ast, vars) { + const data = { + inbox: { + emails: [ + { + from: 'joe@graphql.org', + subject: 'Hello', + message: 'Hello World', + unread: false, + }, + ], + }, + importantEmail() { + return eventEmitterAsyncIterator(pubsub, 'importantEmail'); + } + }; + + function sendImportantEmail(newEmail) { + data.inbox.emails.push(newEmail); + // Returns true if the event was consumed by a subscriber. + return pubsub.emit('importantEmail', { + importantEmail: { + email: newEmail, + inbox: data.inbox, } - }; + }); + } - function sendImportantEmail(newEmail) { - data.inbox.emails.push(newEmail); - // Returns true if the event was consumed by a subscriber. - return pubsub.emit('importantEmail', { - importantEmail: { - email: newEmail, - inbox: data.inbox, + const defaultAst = parse(` + subscription ($priority: Int = 0) { + importantEmail(priority: $priority) { + email { + from + subject } - }); - } - - const ast = parse(` - subscription ($priority: Int = 0) { - importantEmail(priority: $priority) { - email { - from - subject - } - inbox { - unread - total - } + inbox { + unread + total } } - `); - - // GraphQL `subscribe` has the same call signature as `execute`, but returns - // AsyncIterator instead of Promise. - return { - sendImportantEmail, - subscription: subscribe( - schema, - ast, - data - ), - }; + } + `); + + // `subscribe` returns Promise + return { + sendImportantEmail, + subscription: await subscribe( + schema, + ast || defaultAst, + data, + null, + vars, + ), + }; +} + +async function expectPromiseToThrow(promise, message) { + try { + await promise(); + expect.fail('promise should have thrown but did not'); + } catch (error) { + expect(error && error.message).to.equal(message); } +} + +// Check all error cases when initializing the subscription. +describe('Subscription Initialization Phase', () => { it('accepts an object with named properties as arguments', async () => { const document = parse(` @@ -154,36 +174,7 @@ describe('Subscribe', () => { ai.return(); }); - it('throws when missing schema', async () => { - const document = parse(` - subscription { - importantEmail - } - `); - - expect(() => - subscribe( - null, - document - ) - ).to.throw('Must provide schema'); - - expect(() => - subscribe({ document }) - ).to.throw('Must provide schema'); - }); - - it('throws when missing document', async () => { - expect(() => - subscribe(emailSchema, null) - ).to.throw('Must provide document'); - - expect(() => - subscribe({ schema: emailSchema }) - ).to.throw('Must provide document'); - }); - - it('multiple subscription fields defined in schema', async () => { + it('accepts multiple subscription fields defined in schema', async () => { const pubsub = new EventEmitter(); const SubscriptionTypeMultiple = new GraphQLObjectType({ name: 'Subscription', @@ -198,17 +189,88 @@ describe('Subscribe', () => { subscription: SubscriptionTypeMultiple }); - expect(() => { - const { sendImportantEmail } = - createSubscription(pubsub, testSchema); + const { + subscription, + sendImportantEmail, + } = await createSubscription(pubsub, testSchema); - sendImportantEmail({ - from: 'yuzhi@graphql.org', - subject: 'Alright', - message: 'Tests are good', - unread: true, - }); - }).not.to.throw(); + sendImportantEmail({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + }); + + await subscription.next(); + }); + + it('accepts type definition with sync subscribe function', async () => { + const pubsub = new EventEmitter(); + const schema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { + type: GraphQLString, + subscribe: () => eventEmitterAsyncIterator(pubsub, 'importantEmail') + }, + } + }) + }); + + const ast = parse(` + subscription { + importantEmail + } + `); + + const subscription = await subscribe( + schema, + ast + ); + + pubsub.emit('importantEmail', { + importantEmail: {} + }); + + await subscription.next(); + }); + + it('accepts type definition with async subscribe function', async () => { + const pubsub = new EventEmitter(); + const schema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { + type: GraphQLString, + subscribe: async () => { + await new Promise(setImmediate); + return eventEmitterAsyncIterator(pubsub, 'importantEmail'); + } + }, + } + }) + }); + + const ast = parse(` + subscription { + importantEmail + } + `); + + const subscription = await subscribe( + schema, + ast + ); + + pubsub.emit('importantEmail', { + importantEmail: {} + }); + + await subscription.next(); }); it('should only resolve the first field of invalid multi-field', async () => { @@ -247,7 +309,7 @@ describe('Subscribe', () => { } `); - const subscription = subscribe(testSchema, ast); + const subscription = await subscribe(testSchema, ast); subscription.next(); // Ask for a result, but ignore it. expect(didResolveImportantEmail).to.equal(true); @@ -257,11 +319,195 @@ describe('Subscribe', () => { subscription.return(); }); - it('produces payload for multiple subscribe in same subscription', + it('throws an error if schema is missing', async () => { + const document = parse(` + subscription { + importantEmail + } + `); + + expectPromiseToThrow( + () => subscribe(null, document), + 'Must provide schema', + ); + + expectPromiseToThrow( + () => subscribe({ document }), + 'Must provide schema', + ); + }); + + it('throws an error if document is missing', async () => { + expectPromiseToThrow( + () => subscribe(emailSchema, null), + 'Must provide document' + ); + + expectPromiseToThrow( + () => subscribe({ schema: emailSchema }), + 'Must provide document' + ); + }); + + it('throws an error for unknown root field', async () => { + const ast = parse(` + subscription { + unknownField + } + `); + + const pubsub = new EventEmitter(); + + expectPromiseToThrow( + () => createSubscription(pubsub, emailSchema, ast), + 'This subscription is not defined by the schema.' + ); + }); + + it('throws an error if subscribe does not return an iterator', async () => { + const invalidEmailSchema = new GraphQLSchema({ + query: QueryType, + subscription: new GraphQLObjectType({ + name: 'Subscription', + fields: { + importantEmail: { + type: GraphQLString, + subscribe: () => 'test', + }, + } + }) + }); + + const pubsub = new EventEmitter(); + + expectPromiseToThrow( + () => createSubscription(pubsub, invalidEmailSchema), + 'Subscription must return Async Iterable. Received: test' + ); + }); + + it('returns an error if subscribe function returns error', async () => { + const erroringEmailSchema = emailSchemaWithResolvers( + () => { return new Error('test error'); } + ); + + const result = await subscribe( + erroringEmailSchema, + parse(` + subscription { + importantEmail + } + `) + ); + + expect(result).to.deep.equal({ + errors: [ + { + message: 'test error', + locations: [ { line: 3, column: 11 } ], + path: [ 'importantEmail' ] + } + ] + }); + }); + + it('returns an ExecutionResult for resolver errors', async () => { + const erroringEmailSchema = emailSchemaWithResolvers( + () => { throw new Error('test error'); } + ); + + const result = await subscribe( + erroringEmailSchema, + parse(` + subscription { + importantEmail + } + `) + ); + + expect(result).to.deep.equal({ + errors: [ + { + message: 'test error', + locations: [ { line: 3, column: 11 } ], + path: [ 'importantEmail' ] + } + ] + }); + }); + + it('resolves to an error if variables were wrong type', async () => { + // If we receive variables that cannot be coerced correctly, subscribe() + // will resolve to an ExecutionResult that contains an informative error + // description. + const ast = parse(` + subscription ($priority: Int) { + importantEmail(priority: $priority) { + email { + from + subject + } + inbox { + unread + total + } + } + } + `); + + const pubsub = new EventEmitter(); + const data = { + inbox: { + emails: [ + { + from: 'joe@graphql.org', + subject: 'Hello', + message: 'Hello World', + unread: false, + }, + ], + }, + importantEmail() { + return eventEmitterAsyncIterator(pubsub, 'importantEmail'); + } + }; + + const result = await subscribe( + emailSchema, + ast, + data, + null, + { + priority: 'meow', + } + ); + + expect(result).to.deep.equal({ + errors: [ + { + message: 'Variable "$priority" got invalid value "meow".\nExpected ' + + 'type "Int", found "meow": Int cannot represent non 32-bit signed ' + + 'integer value: meow', + locations: [ { line: 2, column: 21 } ], + path: undefined + } + ] + }); + }); +}); + +// Once a subscription returns a valid AsyncIterator, it can still yield +// errors. +describe('Subscription Publish Phase', () => { + + it('produces a payload for multiple subscribe in same subscription', async () => { const pubsub = new EventEmitter(); - const { sendImportantEmail, subscription } = createSubscription(pubsub); - const second = createSubscription(pubsub); + const { + sendImportantEmail, + subscription, + } = await createSubscription(pubsub); + const second = await createSubscription(pubsub); const payload1 = subscription.next(); const payload2 = second.subscription.next(); @@ -297,7 +543,10 @@ describe('Subscribe', () => { it('produces a payload per subscription event', async () => { const pubsub = new EventEmitter(); - const { sendImportantEmail, subscription } = createSubscription(pubsub); + const { + sendImportantEmail, + subscription, + } = await createSubscription(pubsub); // Wait for the next subscription payload. const payload = subscription.next(); @@ -379,7 +628,10 @@ describe('Subscribe', () => { it('produces a payload when there are multiple events', async () => { const pubsub = new EventEmitter(); - const { sendImportantEmail, subscription } = createSubscription(pubsub); + const { + sendImportantEmail, + subscription, + } = await createSubscription(pubsub); let payload = subscription.next(); // A new email arrives! @@ -439,7 +691,10 @@ describe('Subscribe', () => { it('should not trigger when subscription is already done', async () => { const pubsub = new EventEmitter(); - const { sendImportantEmail, subscription } = createSubscription(pubsub); + const { + sendImportantEmail, + subscription, + } = await createSubscription(pubsub); let payload = subscription.next(); // A new email arrives! @@ -485,9 +740,12 @@ describe('Subscribe', () => { }); }); - it('events order is correct when multiple triggered together', async () => { + it('event order is correct for multiple publishes', async () => { const pubsub = new EventEmitter(); - const { sendImportantEmail, subscription } = createSubscription(pubsub); + const { + sendImportantEmail, + subscription, + } = await createSubscription(pubsub); let payload = subscription.next(); // A new email arrives! @@ -545,105 +803,134 @@ describe('Subscribe', () => { }); }); - it('invalid query should result in error', async () => { - const invalidAST = parse(` - subscription { - invalidField + it('should handle error during execuction of source event', async () => { + const erroringEmailSchema = emailSchemaWithResolvers( + async function* () { + yield { email: { subject: 'Hello' } }; + yield { email: { subject: 'Goodbye' } }; + yield { email: { subject: 'Bonjour' } }; + }, + event => { + if (event.email.subject === 'Goodbye') { + throw new Error('Never leave.'); + } + return event; } - `); - - expect(() => { - subscribe( - emailSchema, - invalidAST, - ); - }).to.throw('This subscription is not defined by the schema.'); - }); + ); + + const subscription = await subscribe( + erroringEmailSchema, + parse(` + subscription { + importantEmail { + email { + subject + } + } + } + `) + ); - it('throws when subscription definition doesnt return iterator', () => { - const invalidEmailSchema = new GraphQLSchema({ - query: QueryType, - subscription: new GraphQLObjectType({ - name: 'Subscription', - fields: { + const payload1 = await subscription.next(); + expect(payload1).to.deep.equal({ + done: false, + value: { + data: { importantEmail: { - type: GraphQLString, - subscribe: () => 'test', - }, + email: { + subject: 'Hello' + } + } } - }) + } }); - const ast = parse(` - subscription { - importantEmail + // An error in execution is presented as such. + const payload2 = await subscription.next(); + expect(payload2).to.deep.equal({ + done: false, + value: { + errors: [ + { + message: 'Never leave.', + locations: [ { line: 3, column: 11 } ], + path: [ 'importantEmail' ], + } + ], + data: { + importantEmail: null, + } } - `); - - expect(() => { - subscribe( - invalidEmailSchema, - ast - ); - }).to.throw('Subscription must return Async Iterable. Received: test'); - }); + }); - it('expects to have subscribe on type definition with iterator', () => { - const pubsub = new EventEmitter(); - const invalidEmailSchema = new GraphQLSchema({ - query: QueryType, - subscription: new GraphQLObjectType({ - name: 'Subscription', - fields: { + // However that does not close the response event stream. Subsequent + // events are still executed. + const payload3 = await subscription.next(); + expect(payload3).to.deep.equal({ + done: false, + value: { + data: { importantEmail: { - type: GraphQLString, - subscribe: () => eventEmitterAsyncIterator(pubsub, 'importantEmail') - }, + email: { + subject: 'Bonjour' + } + } } - }) - }); - - const ast = parse(` - subscription { - importantEmail } - `); + }); - expect(() => { - subscribe( - invalidEmailSchema, - ast - ); - }).not.to.throw(); }); - it('should handle error thrown by subscribe method', () => { - const invalidEmailSchema = new GraphQLSchema({ - query: QueryType, - subscription: new GraphQLObjectType({ - name: 'Subscription', - fields: { + it('should pass through error thrown in source event stream', async () => { + const erroringEmailSchema = emailSchemaWithResolvers( + async function* () { + yield { email: { subject: 'Hello' } }; + throw new Error('test error'); + }, + email => email + ); + + const subscription = await subscribe( + erroringEmailSchema, + parse(` + subscription { + importantEmail { + email { + subject + } + } + } + `) + ); + + const payload1 = await subscription.next(); + expect(payload1).to.deep.equal({ + done: false, + value: { + data: { importantEmail: { - type: GraphQLString, - subscribe: () => { - throw new Error('test error'); - }, - }, - }, - }) + email: { + subject: 'Hello' + } + } + } + } }); - const ast = parse(` - subscription { - importantEmail - } - `); + let expectedError; + try { + await subscription.next(); + } catch (error) { + expectedError = error; + } + + expect(expectedError).to.be.instanceof(Error); + expect(expectedError.message).to.equal('test error'); - expect(() => { - subscribe( - invalidEmailSchema, - ast - ); - }).to.throw('test error'); + const payload2 = await subscription.next(); + expect(payload2).to.deep.equal({ + done: true, + value: undefined + }); }); }); diff --git a/src/subscription/asyncIteratorReject.js b/src/subscription/asyncIteratorReject.js new file mode 100644 index 0000000000..85f0881167 --- /dev/null +++ b/src/subscription/asyncIteratorReject.js @@ -0,0 +1,41 @@ +/** + * Copyright (c) 2017, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + * + * @flow + */ + +import { $$asyncIterator } from 'iterall'; + +/** + * Given an error, returns an AsyncIterable which will fail with that error. + * + * Similar to Promise.reject(error) + */ +export default function asyncIteratorReject(error: Error): AsyncIterator { + let isComplete = false; + return { + next() { + const result = isComplete ? + Promise.resolve({ value: undefined, done: true }) : + Promise.reject(error); + isComplete = true; + return result; + }, + return() { + isComplete = true; + return Promise.resolve({ value: undefined, done: true }); + }, + throw() { + isComplete = true; + return Promise.reject(error); + }, + [$$asyncIterator]() { + return this; + }, + }; +} diff --git a/src/subscription/mapAsyncIterator.js b/src/subscription/mapAsyncIterator.js index f3c9c07f08..830d4c07a2 100644 --- a/src/subscription/mapAsyncIterator.js +++ b/src/subscription/mapAsyncIterator.js @@ -17,7 +17,8 @@ import { $$asyncIterator, getAsyncIterator } from 'iterall'; */ export default function mapAsyncIterator( iterable: AsyncIterable, - callback: (value: T) => Promise | U + callback: T => Promise | U, + rejectCallback?: any => Promise | U ): AsyncGenerator { const iterator = getAsyncIterator(iterable); let $return; @@ -36,18 +37,24 @@ export default function mapAsyncIterator( asyncMapValue(result.value, callback).then(iteratorResult, abruptClose); } + let mapReject; + if (rejectCallback) { + mapReject = error => + asyncMapValue(error, rejectCallback).then(iteratorResult, abruptClose); + } + return { next() { - return iterator.next().then(mapResult); + return iterator.next().then(mapResult, mapReject); }, return() { return $return ? - $return.call(iterator).then(mapResult) : + $return.call(iterator).then(mapResult, mapReject) : Promise.resolve({ value: undefined, done: true }); }, throw(error) { if (typeof iterator.throw === 'function') { - return iterator.throw(error).then(mapResult); + return iterator.throw(error).then(mapResult, mapReject); } return Promise.reject(error).catch(abruptClose); }, @@ -59,7 +66,7 @@ export default function mapAsyncIterator( function asyncMapValue( value: T, - callback: (T) => Promise | U + callback: T => Promise | U ): Promise { return new Promise(resolve => resolve(callback(value))); } diff --git a/src/subscription/subscribe.js b/src/subscription/subscribe.js index 2f9139d323..65bb369a37 100644 --- a/src/subscription/subscribe.js +++ b/src/subscription/subscribe.js @@ -10,16 +10,19 @@ */ import { isAsyncIterable } from 'iterall'; +import { GraphQLError } from '../error/GraphQLError'; +import { locatedError } from '../error/locatedError'; import { addPath, assertValidExecutionArguments, buildExecutionContext, + buildResolveInfo, collectFields, execute, getFieldDef, getOperationRootType, - buildResolveInfo, resolveFieldValueOrError, + responsePathAsArray, } from '../execution/execute'; import { GraphQLSchema } from '../type/schema'; import invariant from '../jsutils/invariant'; @@ -32,10 +35,20 @@ import type { GraphQLFieldResolver } from '../type/definition'; /** * Implements the "Subscribe" algorithm described in the GraphQL specification. * - * Returns an AsyncIterator + * Returns a Promise which resolves to either an AsyncIterator (if successful) + * or an ExecutionResult (client error). The promise will be rejected if a + * server error occurs. + * + * If the client-provided arguments to this function do not result in a + * compliant subscription, a GraphQL Response (ExecutionResult) with + * descriptive errors and no data will be returned. + * + * If the the source stream could not be created due to faulty subscription + * resolver logic or underlying systems, the promise will resolve to a single + * ExecutionResult containing `errors` and no `data`. * - * If the arguments to this function do not result in a legal execution context, - * a GraphQLError will be thrown immediately explaining the invalid input. + * If the operation succeeded, the promise resolves to an AsyncIterator, which + * yields a stream of ExecutionResults representing the response stream. * * Accepts either an object with named arguments, or individual arguments. */ @@ -48,7 +61,7 @@ declare function subscribe({| operationName?: ?string, fieldResolver?: ?GraphQLFieldResolver, subscribeFieldResolver?: ?GraphQLFieldResolver -|}, ..._: []): AsyncIterator; +|}, ..._: []): Promise | ExecutionResult>; /* eslint-disable no-redeclare */ declare function subscribe( schema: GraphQLSchema, @@ -59,7 +72,7 @@ declare function subscribe( operationName?: ?string, fieldResolver?: ?GraphQLFieldResolver, subscribeFieldResolver?: ?GraphQLFieldResolver -): AsyncIterator; +): Promise | ExecutionResult>; export function subscribe( argsOrSchema, document, @@ -73,6 +86,7 @@ export function subscribe( // Extract arguments from object args if provided. const args = arguments.length === 1 ? argsOrSchema : undefined; const schema = args ? args.schema : argsOrSchema; + return args ? subscribeImpl( schema, @@ -96,6 +110,18 @@ export function subscribe( ); } +/** + * This function checks if the error is a GraphQLError. If it is, report it as + * an ExecutionResult, containing only errors and no data. Otherwise treat the + * error as a system-class error and re-throw it. + */ +function reportGraphQLError(error) { + if (error instanceof GraphQLError) { + return { errors: [ error ] }; + } + throw error; +} + function subscribeImpl( schema, document, @@ -106,7 +132,7 @@ function subscribeImpl( fieldResolver, subscribeFieldResolver ) { - const subscription = createSourceEventStream( + const sourcePromise = createSourceEventStream( schema, document, rootValue, @@ -122,17 +148,25 @@ function subscribeImpl( // the GraphQL specification. The `execute` function provides the // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the // "ExecuteQuery" algorithm, for which `execute` is also used. - return mapAsyncIterator( - subscription, - payload => execute( - schema, - document, - payload, - contextValue, - variableValues, - operationName, - fieldResolver - ) + const mapSourceToResponse = payload => execute( + schema, + document, + payload, + contextValue, + variableValues, + operationName, + fieldResolver + ); + + // Resolve the Source Stream, then map every source value to a + // ExecutionResult value as described above. + return sourcePromise.then( + sourceStream => mapAsyncIterator( + sourceStream, + mapSourceToResponse, + reportGraphQLError + ), + reportGraphQLError ); } @@ -140,17 +174,21 @@ function subscribeImpl( * Implements the "CreateSourceEventStream" algorithm described in the * GraphQL specification, resolving the subscription source event stream. * - * Returns an AsyncIterable, may through a GraphQLError. + * Returns a Promise. * - * A Source Stream represents the sequence of events, each of which is - * expected to be used to trigger a GraphQL execution for that event. + * If the client-provided invalid arguments, the source stream could not be + * created, or the resolver did not return an AsyncIterable, this function will + * will throw an error, which should be caught and handled by the caller. + * + * A Source Event Stream represents a sequence of events, each of which triggers + * a GraphQL execution for that event. * * This may be useful when hosting the stateful subscription service in a * different process or machine than the stateless GraphQL execution engine, * or otherwise separating these two steps. For more on this, see the * "Supporting Subscriptions at Scale" information in the GraphQL specification. */ -export function createSourceEventStream( +export async function createSourceEventStream( schema: GraphQLSchema, document: DocumentNode, rootValue?: mixed, @@ -158,8 +196,9 @@ export function createSourceEventStream( variableValues?: ?{[key: string]: mixed}, operationName?: ?string, fieldResolver?: ?GraphQLFieldResolver -): AsyncIterable { - // If arguments are missing or incorrect, throw an error. +): Promise> { + // If arguments are missing or incorrectly typed, this is an internal + // developer mistake which should throw an early error. assertValidExecutionArguments( schema, document, @@ -200,18 +239,20 @@ export function createSourceEventStream( // AsyncIterable yielding raw payloads. const resolveFn = fieldDef.subscribe || exeContext.fieldResolver; + const path = addPath(undefined, responseName); + const info = buildResolveInfo( exeContext, fieldDef, fieldNodes, type, - addPath(undefined, responseName) + path ); // resolveFieldValueOrError implements the "ResolveFieldEventStream" // algorithm from GraphQL specification. It differs from // "ResolveFieldValue" due to providing a different `resolveFn`. - const subscription = resolveFieldValueOrError( + const subscription = await resolveFieldValueOrError( exeContext, fieldDef, fieldNodes, @@ -220,8 +261,13 @@ export function createSourceEventStream( info ); + // Throw located GraphQLError if subscription source fails to resolve. if (subscription instanceof Error) { - throw subscription; + throw locatedError( + subscription, + fieldNodes, + responsePathAsArray(path), + ); } if (!isAsyncIterable(subscription)) {