Skip to content

Commit

Permalink
add defer/stream support for subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
robrichard committed Aug 3, 2022
1 parent 6c5b85c commit f5ebcbe
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 19 deletions.
145 changes: 145 additions & 0 deletions src/execution/__tests__/flattenAsyncIterator-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import { expect } from 'chai';
import { describe, it } from 'mocha';

import { flattenAsyncIterator } from '../flattenAsyncIterator';

describe('flattenAsyncIterator', () => {
it('does not modify an already flat async generator', async () => {
async function* source() {
yield await Promise.resolve(1);
yield await Promise.resolve(2);
yield await Promise.resolve(3);
}

const result = flattenAsyncIterator(source());

expect(await result.next()).to.deep.equal({ value: 1, done: false });
expect(await result.next()).to.deep.equal({ value: 2, done: false });
expect(await result.next()).to.deep.equal({ value: 3, done: false });
expect(await result.next()).to.deep.equal({
value: undefined,
done: true,
});
});

it('does not modify an already flat async iterator', async () => {
const items = [1, 2, 3];

const iterator: any = {
[Symbol.asyncIterator]() {
return this;
},
next() {
return Promise.resolve({
done: items.length === 0,
value: items.shift(),
});
},
};

const result = flattenAsyncIterator(iterator);

expect(await result.next()).to.deep.equal({ value: 1, done: false });
expect(await result.next()).to.deep.equal({ value: 2, done: false });
expect(await result.next()).to.deep.equal({ value: 3, done: false });
expect(await result.next()).to.deep.equal({
value: undefined,
done: true,
});
});

it('flatten nested async generators', async () => {
async function* source() {
yield await Promise.resolve(1);
yield await Promise.resolve(2);
yield await Promise.resolve(
(async function* nested(): AsyncGenerator<number, void, void> {
yield await Promise.resolve(2.1);
yield await Promise.resolve(2.2);
})(),
);
yield await Promise.resolve(3);
}

const doubles = flattenAsyncIterator(source());

const result = [];
for await (const x of doubles) {
result.push(x);
}
expect(result).to.deep.equal([1, 2, 2.1, 2.2, 3]);
});

it('allows returning early from a nested async generator', async () => {
async function* source() {
yield await Promise.resolve(1);
yield await Promise.resolve(2);
yield await Promise.resolve(
(async function* nested(): AsyncGenerator<number, void, void> {
yield await Promise.resolve(2.1); /* c8 ignore start */
// Not reachable, early return
yield await Promise.resolve(2.2);
})(),
);
// Not reachable, early return
yield await Promise.resolve(3);
}
/* c8 ignore stop */

const doubles = flattenAsyncIterator(source());

expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });

// Early return
expect(await doubles.return()).to.deep.equal({
value: undefined,
done: true,
});

// Subsequent next calls
expect(await doubles.next()).to.deep.equal({
value: undefined,
done: true,
});
expect(await doubles.next()).to.deep.equal({
value: undefined,
done: true,
});
});

it('allows throwing errors from a nested async generator', async () => {
async function* source() {
yield await Promise.resolve(1);
yield await Promise.resolve(2);
yield await Promise.resolve(
(async function* nested(): AsyncGenerator<number, void, void> {
yield await Promise.resolve(2.1); /* c8 ignore start */
// Not reachable, early return
yield await Promise.resolve(2.2);
})(),
);
// Not reachable, early return
yield await Promise.resolve(3);
}
/* c8 ignore stop */

const doubles = flattenAsyncIterator(source());

expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });

// Throw error
let caughtError;
try {
await doubles.throw('ouch'); /* c8 ignore start */
// Not reachable, always throws
/* c8 ignore stop */
} catch (e) {
caughtError = e;
}
expect(caughtError).to.equal('ouch');
});
});
160 changes: 154 additions & 6 deletions src/execution/__tests__/subscribe-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,22 @@ const emailSchema = new GraphQLSchema({
}),
});

function createSubscription(pubsub: SimplePubSub<Email>) {
function createSubscription(
pubsub: SimplePubSub<Email>,
variableValues?: { readonly [variable: string]: unknown },
) {
const document = parse(`
subscription ($priority: Int = 0) {
subscription ($priority: Int = 0, $shouldDefer: Boolean = false) {
importantEmail(priority: $priority) {
email {
from
subject
}
inbox {
unread
total
... @defer(if: $shouldDefer) {
inbox {
unread
total
}
}
}
}
Expand Down Expand Up @@ -122,7 +127,12 @@ function createSubscription(pubsub: SimplePubSub<Email>) {
}),
};

return subscribe({ schema: emailSchema, document, rootValue: data });
return subscribe({
schema: emailSchema,
document,
rootValue: data,
variableValues,
});
}

// TODO: consider adding this method to testUtils (with tests)
Expand Down Expand Up @@ -679,6 +689,144 @@ describe('Subscription Publish Phase', () => {
});
});

it('produces additional payloads for subscriptions with @defer', async () => {
const pubsub = new SimplePubSub<Email>();
const subscription = await createSubscription(pubsub, {
shouldDefer: true,
});
assert(isAsyncIterable(subscription));
// Wait for the next subscription payload.
const payload = subscription.next();

// A new email arrives!
expect(
pubsub.emit({
from: 'yuzhi@graphql.org',
subject: 'Alright',
message: 'Tests are good',
unread: true,
}),
).to.equal(true);

// The previously waited on payload now has a value.
expect(await payload).to.deep.equal({
done: false,
value: {
data: {
importantEmail: {
email: {
from: 'yuzhi@graphql.org',
subject: 'Alright',
},
},
},
hasNext: true,
},
});

// Wait for the next payload from @defer
expect(await subscription.next()).to.deep.equal({
done: false,
value: {
incremental: [
{
data: {
inbox: {
unread: 1,
total: 2,
},
},
path: ['importantEmail'],
},
],
hasNext: false,
},
});

// Another new email arrives, after all incrementally delivered payloads are received.
expect(
pubsub.emit({
from: 'hyo@graphql.org',
subject: 'Tools',
message: 'I <3 making things',
unread: true,
}),
).to.equal(true);

// The next waited on payload will have a value.
expect(await subscription.next()).to.deep.equal({
done: false,
value: {
data: {
importantEmail: {
email: {
from: 'hyo@graphql.org',
subject: 'Tools',
},
},
},
hasNext: true,
},
});

// Another new email arrives, before the incrementally delivered payloads from the last email was received.
expect(
pubsub.emit({
from: 'adam@graphql.org',
subject: 'Important',
message: 'Read me please',
unread: true,
}),
).to.equal(true);

// Deferred payload from previous event is received.
expect(await subscription.next()).to.deep.equal({
done: false,
value: {
incremental: [
{
data: {
inbox: {
unread: 2,
total: 3,
},
},
path: ['importantEmail'],
},
],
hasNext: false,
},
});

// Next payload from last event
expect(await subscription.next()).to.deep.equal({
done: false,
value: {
data: {
importantEmail: {
email: {
from: 'adam@graphql.org',
subject: 'Important',
},
},
},
hasNext: true,
},
});

// The client disconnects before the deferred payload is consumed.
expect(await subscription.return()).to.deep.equal({
done: true,
value: undefined,
});

// Awaiting a subscription after closing it results in completed results.
expect(await subscription.next()).to.deep.equal({
done: true,
value: undefined,
});
});

it('produces a payload when there are multiple events', async () => {
const pubsub = new SimplePubSub<Email>();
const subscription = createSubscription(pubsub);
Expand Down
19 changes: 6 additions & 13 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import {
collectFields,
collectSubfields as _collectSubfields,
} from './collectFields';
import { flattenAsyncIterator } from './flattenAsyncIterator';
import { mapAsyncIterator } from './mapAsyncIterator';
import {
getArgumentValues,
Expand Down Expand Up @@ -1399,19 +1400,11 @@ function mapSourceToResponse(
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
return mapAsyncIterator(resultOrStream, (payload: unknown) => {
const executionResult = executeImpl(
buildPerEventExecutionContext(exeContext, payload),
);
/* c8 ignore next 6 */
// TODO: implement support for defer/stream in subscriptions
if (isAsyncIterable(executionResult)) {
throw new Error(
'TODO: implement support for defer/stream in subscriptions',
);
}
return executionResult as PromiseOrValue<ExecutionResult>;
});
return flattenAsyncIterator<ExecutionResult, AsyncExecutionResult>(
mapAsyncIterator(resultOrStream, (payload: unknown) =>
executeImpl(buildPerEventExecutionContext(exeContext, payload)),
),
);
}

/**
Expand Down
Loading

0 comments on commit f5ebcbe

Please sign in to comment.