Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #894, add perEventContextResolver for subscribe #2485

65 changes: 63 additions & 2 deletions src/subscription/__tests__/subscribe-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { GraphQLError } from '../../error/GraphQLError';

import { GraphQLSchema } from '../../type/schema';
import { GraphQLList, GraphQLObjectType } from '../../type/definition';
import type { GraphQLFieldResolver } from '../../type/definition';
import { GraphQLInt, GraphQLString, GraphQLBoolean } from '../../type/scalars';

import { createSourceEventStream, subscribe } from '../subscribe';
Expand Down Expand Up @@ -68,9 +69,9 @@ const EmailEventType = new GraphQLObjectType({

const emailSchema = emailSchemaWithResolvers();

function emailSchemaWithResolvers<T: mixed>(
function emailSchemaWithResolvers<T: mixed, R: mixed, C: mixed>(
subscribeFn?: (T) => mixed,
resolveFn?: (T) => mixed,
resolveFn?: GraphQLFieldResolver<R, C>,
) {
return new GraphQLSchema({
query: QueryType,
Expand Down Expand Up @@ -1103,4 +1104,64 @@ describe('Subscription Publish Phase', () => {
value: undefined,
});
});

it('should produce a unique context per event via perEventContextResolver', async () => {
const contextExecutionSchema = emailSchemaWithResolvers(
async function* () {
yield { email: { subject: 'Hello' } };
yield { email: { subject: 'Hello' } };
},
(data, _args, ctx) => ({
email: { subject: `${data.email.subject} ${ctx.contextIndex}` },
}),
);

let contextIndex = 0;
const subscription = await subscribe({
schema: contextExecutionSchema,
document: parse(`
subscription {
importantEmail {
email {
subject
}
}
}
`),
contextValue: { test: true },
perEventContextResolver: (ctx) => {
expect(ctx.test).to.equal(true);
return { ...ctx, contextIndex: contextIndex++ };
},
});
invariant(isAsyncIterable(subscription));

const payload1 = await subscription.next();
expect(payload1).to.deep.equal({
done: false,
value: {
data: {
importantEmail: {
email: {
subject: 'Hello 0',
},
},
},
},
});

const payload2 = await subscription.next();
expect(payload2).to.deep.equal({
done: false,
value: {
data: {
importantEmail: {
email: {
subject: 'Hello 1',
},
},
},
},
});
});
});
5 changes: 5 additions & 0 deletions src/subscription/subscribe.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface SubscriptionArgs {
operationName?: Maybe<string>;
fieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
subscribeFieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
perEventContextResolver?: Maybe<(contextValue: any) => any>;
}

/**
Expand All @@ -34,6 +35,9 @@ export interface SubscriptionArgs {
* If the operation succeeded, the promise resolves to an AsyncIterator, which
* yields a stream of ExecutionResults representing the response stream.
*
* If a `perEventContextResolver` argument is provided, it will be invoked for
* each event and return a new context value specific to that event's execution.
IvanGoncharov marked this conversation as resolved.
Show resolved Hide resolved
*
* Accepts either an object with named arguments, or individual arguments.
*/
export function subscribe(
Expand All @@ -49,6 +53,7 @@ export function subscribe(
operationName?: Maybe<string>,
fieldResolver?: Maybe<GraphQLFieldResolver<any, any>>,
subscribeFieldResolver?: Maybe<GraphQLFieldResolver<any, any>>,
perEventContextResolver?: Maybe<(contextValue: any) => any>,
): Promise<AsyncIterableIterator<ExecutionResult> | ExecutionResult>;

/**
Expand Down
14 changes: 13 additions & 1 deletion src/subscription/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export type SubscriptionArgs = {|
operationName?: ?string,
fieldResolver?: ?GraphQLFieldResolver<any, any>,
subscribeFieldResolver?: ?GraphQLFieldResolver<any, any>,
perEventContextResolver?: ?(contextValue: any) => mixed,
|};

/**
Expand All @@ -55,6 +56,9 @@ export type SubscriptionArgs = {|
* If the operation succeeded, the promise resolves to an AsyncIterator, which
* yields a stream of ExecutionResults representing the response stream.
*
* If a `perEventContextResolver` argument is provided, it will be invoked for
* each event and return a new context value specific to the event's execution.
*
* Accepts either an object with named arguments, or individual arguments.
*/
declare function subscribe(
Expand All @@ -71,6 +75,7 @@ declare function subscribe(
operationName?: ?string,
fieldResolver?: ?GraphQLFieldResolver<any, any>,
subscribeFieldResolver?: ?GraphQLFieldResolver<any, any>,
perEventContextResolver?: ?(contextValue: any) => mixed,
): Promise<AsyncIterator<ExecutionResult> | ExecutionResult>;
export function subscribe(
argsOrSchema,
Expand All @@ -81,6 +86,7 @@ export function subscribe(
operationName,
fieldResolver,
subscribeFieldResolver,
perEventContextResolver,
) {
/* eslint-enable no-redeclare */
// Extract arguments from object args if provided.
Expand All @@ -95,6 +101,7 @@ export function subscribe(
operationName,
fieldResolver,
subscribeFieldResolver,
perEventContextResolver,
});
}

Expand Down Expand Up @@ -122,6 +129,7 @@ function subscribeImpl(
operationName,
fieldResolver,
subscribeFieldResolver,
perEventContextResolver,
} = args;

const sourcePromise = createSourceEventStream(
Expand All @@ -140,12 +148,16 @@ function subscribeImpl(
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
// If `perEventContextResolver` is provided, it is invoked with the original
// `contextValue` to return a new context unique to this `execute`.
const perEventContextResolverFn = perEventContextResolver ?? ((ctx) => ctx);

const mapSourceToResponse = (payload) =>
execute({
schema,
document,
rootValue: payload,
contextValue,
contextValue: perEventContextResolverFn(contextValue),
variableValues,
operationName,
fieldResolver,
Expand Down