diff --git a/src/execution/__tests__/flattenAsyncIterator-test.ts b/src/execution/__tests__/flattenAsyncIterator-test.ts new file mode 100644 index 00000000000..c2d3dcb4b37 --- /dev/null +++ b/src/execution/__tests__/flattenAsyncIterator-test.ts @@ -0,0 +1,145 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { flattenAsyncIterator } from '../flattenAsyncIterator'; + +describe('flattenAsyncIterator', () => { + it('does not modify an already flat async generator', async () => { + async function* source() { + yield await Promise.resolve(1); + yield await Promise.resolve(2); + yield await Promise.resolve(3); + } + + const result = flattenAsyncIterator(source()); + + expect(await result.next()).to.deep.equal({ value: 1, done: false }); + expect(await result.next()).to.deep.equal({ value: 2, done: false }); + expect(await result.next()).to.deep.equal({ value: 3, done: false }); + expect(await result.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); + + it('does not modify an already flat async iterator', async () => { + const items = [1, 2, 3]; + + const iterator: any = { + [Symbol.asyncIterator]() { + return this; + }, + next() { + return Promise.resolve({ + done: items.length === 0, + value: items.shift(), + }); + }, + }; + + const result = flattenAsyncIterator(iterator); + + expect(await result.next()).to.deep.equal({ value: 1, done: false }); + expect(await result.next()).to.deep.equal({ value: 2, done: false }); + expect(await result.next()).to.deep.equal({ value: 3, done: false }); + expect(await result.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); + + it('flatten nested async generators', async () => { + async function* source() { + yield await Promise.resolve(1); + yield await Promise.resolve(2); + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(2.1); + yield await Promise.resolve(2.2); + })(), + ); + yield await Promise.resolve(3); + } + + const doubles = flattenAsyncIterator(source()); + + const result = []; + for await (const x of doubles) { + result.push(x); + } + expect(result).to.deep.equal([1, 2, 2.1, 2.2, 3]); + }); + + it('allows returning early from a nested async generator', async () => { + async function* source() { + yield await Promise.resolve(1); + yield await Promise.resolve(2); + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(2.1); /* c8 ignore start */ + // Not reachable, early return + yield await Promise.resolve(2.2); + })(), + ); + // Not reachable, early return + yield await Promise.resolve(3); + } + /* c8 ignore stop */ + + const doubles = flattenAsyncIterator(source()); + + expect(await doubles.next()).to.deep.equal({ value: 1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false }); + + // Early return + expect(await doubles.return()).to.deep.equal({ + value: undefined, + done: true, + }); + + // Subsequent next calls + expect(await doubles.next()).to.deep.equal({ + value: undefined, + done: true, + }); + expect(await doubles.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); + + it('allows throwing errors from a nested async generator', async () => { + async function* source() { + yield await Promise.resolve(1); + yield await Promise.resolve(2); + yield await Promise.resolve( + (async function* nested(): AsyncGenerator { + yield await Promise.resolve(2.1); /* c8 ignore start */ + // Not reachable, early return + yield await Promise.resolve(2.2); + })(), + ); + // Not reachable, early return + yield await Promise.resolve(3); + } + /* c8 ignore stop */ + + const doubles = flattenAsyncIterator(source()); + + expect(await doubles.next()).to.deep.equal({ value: 1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false }); + + // Throw error + let caughtError; + try { + await doubles.throw('ouch'); /* c8 ignore start */ + // Not reachable, always throws + /* c8 ignore stop */ + } catch (e) { + caughtError = e; + } + expect(caughtError).to.equal('ouch'); + }); +}); diff --git a/src/execution/__tests__/subscribe-test.ts b/src/execution/__tests__/subscribe-test.ts index 5f256ca8680..7e8233cbb46 100644 --- a/src/execution/__tests__/subscribe-test.ts +++ b/src/execution/__tests__/subscribe-test.ts @@ -82,17 +82,22 @@ const emailSchema = new GraphQLSchema({ }), }); -function createSubscription(pubsub: SimplePubSub) { +function createSubscription( + pubsub: SimplePubSub, + variableValues?: { readonly [variable: string]: unknown }, +) { const document = parse(` - subscription ($priority: Int = 0) { + subscription ($priority: Int = 0, $shouldDefer: Boolean = false) { importantEmail(priority: $priority) { email { from subject } - inbox { - unread - total + ... @defer(if: $shouldDefer) { + inbox { + unread + total + } } } } @@ -122,7 +127,12 @@ function createSubscription(pubsub: SimplePubSub) { }), }; - return subscribe({ schema: emailSchema, document, rootValue: data }); + return subscribe({ + schema: emailSchema, + document, + rootValue: data, + variableValues, + }); } // TODO: consider adding this method to testUtils (with tests) @@ -679,6 +689,144 @@ describe('Subscription Publish Phase', () => { }); }); + it('produces additional payloads for subscriptions with @defer', async () => { + const pubsub = new SimplePubSub(); + const subscription = await createSubscription(pubsub, { + shouldDefer: true, + }); + assert(isAsyncIterable(subscription)); + // Wait for the next subscription payload. + const payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + }), + ).to.equal(true); + + // The previously waited on payload now has a value. + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + }, + }, + }, + hasNext: true, + }, + }); + + // Wait for the next payload from @defer + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + incremental: [ + { + data: { + inbox: { + unread: 1, + total: 2, + }, + }, + path: ['importantEmail'], + }, + ], + hasNext: false, + }, + }); + + // Another new email arrives, after all incrementally delivered payloads are received. + expect( + pubsub.emit({ + from: 'hyo@graphql.org', + subject: 'Tools', + message: 'I <3 making things', + unread: true, + }), + ).to.equal(true); + + // The next waited on payload will have a value. + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'hyo@graphql.org', + subject: 'Tools', + }, + }, + }, + hasNext: true, + }, + }); + + // Another new email arrives, before the incrementally delivered payloads from the last email was received. + expect( + pubsub.emit({ + from: 'adam@graphql.org', + subject: 'Important', + message: 'Read me please', + unread: true, + }), + ).to.equal(true); + + // Deferred payload from previous event is received. + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + incremental: [ + { + data: { + inbox: { + unread: 2, + total: 3, + }, + }, + path: ['importantEmail'], + }, + ], + hasNext: false, + }, + }); + + // Next payload from last event + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'adam@graphql.org', + subject: 'Important', + }, + }, + }, + hasNext: true, + }, + }); + + // The client disconnects before the deferred payload is consumed. + expect(await subscription.return()).to.deep.equal({ + done: true, + value: undefined, + }); + + // Awaiting a subscription after closing it results in completed results. + expect(await subscription.next()).to.deep.equal({ + done: true, + value: undefined, + }); + }); + it('produces a payload when there are multiple events', async () => { const pubsub = new SimplePubSub(); const subscription = createSubscription(pubsub); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 32c3bc3e5b9..cd6ee0bbbff 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -52,6 +52,7 @@ import { collectFields, collectSubfields as _collectSubfields, } from './collectFields'; +import { flattenAsyncIterator } from './flattenAsyncIterator'; import { mapAsyncIterator } from './mapAsyncIterator'; import { getArgumentValues, @@ -1399,19 +1400,11 @@ function mapSourceToResponse( // the GraphQL specification. The `execute` function provides the // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the // "ExecuteQuery" algorithm, for which `execute` is also used. - return mapAsyncIterator(resultOrStream, (payload: unknown) => { - const executionResult = executeImpl( - buildPerEventExecutionContext(exeContext, payload), - ); - /* c8 ignore next 6 */ - // TODO: implement support for defer/stream in subscriptions - if (isAsyncIterable(executionResult)) { - throw new Error( - 'TODO: implement support for defer/stream in subscriptions', - ); - } - return executionResult as PromiseOrValue; - }); + return flattenAsyncIterator( + mapAsyncIterator(resultOrStream, (payload: unknown) => + executeImpl(buildPerEventExecutionContext(exeContext, payload)), + ), + ); } /** diff --git a/src/execution/flattenAsyncIterator.ts b/src/execution/flattenAsyncIterator.ts new file mode 100644 index 00000000000..1533482bb93 --- /dev/null +++ b/src/execution/flattenAsyncIterator.ts @@ -0,0 +1,50 @@ +import { isAsyncIterable } from '../jsutils/isAsyncIterable'; + +type AsyncIterableOrGenerator = + | AsyncGenerator + | AsyncIterable; + +/** + * Given an AsyncIterable that could potentially yield other async iterators, + * flatten all yielded results into a single AsyncIterable + */ +export function flattenAsyncIterator( + iterable: AsyncIterableOrGenerator>, +): AsyncGenerator { + const iteratorMethod = iterable[Symbol.asyncIterator]; + const iterator: any = iteratorMethod.call(iterable); + let iteratorStack: Array> = [iterator]; + + async function next(): Promise> { + const currentIterator = iteratorStack[0]; + if (!currentIterator) { + return { value: undefined, done: true }; + } + const result = await currentIterator.next(); + if (result.done) { + iteratorStack.shift(); + return next(); + } else if (isAsyncIterable(result.value)) { + const childIterator = result.value[ + Symbol.asyncIterator + ]() as AsyncIterator; + iteratorStack.unshift(childIterator); + return next(); + } + return result; + } + return { + next, + return() { + iteratorStack = []; + return iterator.return(); + }, + throw(error?: unknown): Promise> { + iteratorStack = []; + return iterator.throw(error); + }, + [Symbol.asyncIterator]() { + return this; + }, + }; +}