Skip to content

Commit

Permalink
fix(subscriptions): The subscription resolvers were firing too often (#…
Browse files Browse the repository at this point in the history
…609)

* fix(subscriptions): The subscription resolvers were firing too often

The underlying subscription in a remote schema was not terminated properly, causing memory leaks. In
addition, multiple subscriptions caused duplicate payloads. Both is fixed by removing the PubSub
dependency and directly transforming the Observer returned by the ApolloLink to an async iterable.
This allows the async iterable to propagate to the underlying observable to cancel the suscription
once the subscription is not needed anymore.

* refactor(test): remove timeout from test

* chore(changelog): add fix to changelog
  • Loading branch information
timsuchanek authored and freiksenet committed Feb 5, 2018
1 parent e9c0eb5 commit 2868fe2
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 42 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

### vNEXT

### v2.19.1
* Fix duplicate subscriptions for schema stitching [PR #609](https://github.com/apollographql/graphql-tools/pull/609)

### v2.19.0

* Also recreate `astNode` property for fields, not only types, when recreating schemas. [PR #580](https://github.com/apollographql/graphql-tools/pull/580)
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@
},
"homepage": "https://github.com/apollostack/graphql-tools#readme",
"dependencies": {
"apollo-utilities": "^1.0.1",
"apollo-link": "^1.0.0",
"apollo-utilities": "^1.0.1",
"deprecated-decorator": "^0.1.6",
"graphql-subscriptions": "^0.5.6",
"uuid": "^3.1.0"
},
"peerDependencies": {
Expand All @@ -68,6 +67,7 @@
"chai": "^4.1.2",
"express": "^4.16.2",
"graphql": "^0.12.3",
"graphql-subscriptions": "^0.5.6",
"graphql-type-json": "^0.1.4",
"istanbul": "^0.4.5",
"iterall": "^1.1.3",
Expand Down
40 changes: 8 additions & 32 deletions src/stitching/makeRemoteExecutableSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { makeExecutableSchema } from '../schemaGenerator';
import resolveParentFromTypename from './resolveFromParentTypename';
import defaultMergedResolver from './defaultMergedResolver';
import { checkResultAndHandleErrors } from './errors';
import { PubSub, PubSubEngine } from 'graphql-subscriptions';
import { observableToAsyncIterable } from './observableToAsyncIterable';

export type ResolverFn = (
rootValue?: any,
Expand All @@ -51,12 +51,10 @@ export default function makeRemoteExecutableSchema({
schema,
link,
fetcher,
createPubSub,
}: {
schema: GraphQLSchema | string;
link?: ApolloLink;
fetcher?: Fetcher;
createPubSub?: () => PubSubEngine;
}): GraphQLSchema {
if (!fetcher && link) {
fetcher = linkToFetcher(link);
Expand Down Expand Up @@ -93,11 +91,10 @@ export default function makeRemoteExecutableSchema({
const subscriptionResolvers: IResolverObject = {};
const subscriptionType = schema.getSubscriptionType();
if (subscriptionType) {
const pubSub = createPubSub ? createPubSub() : new PubSub();
const subscriptions = subscriptionType.getFields();
Object.keys(subscriptions).forEach(key => {
subscriptionResolvers[key] = {
subscribe: createSubscriptionResolver(key, link, pubSub),
subscribe: createSubscriptionResolver(key, link),
};
});
}
Expand All @@ -117,10 +114,7 @@ export default function makeRemoteExecutableSchema({
const typeMap = schema.getTypeMap();
const types = Object.keys(typeMap).map(name => typeMap[name]);
for (const type of types) {
if (
type instanceof GraphQLInterfaceType ||
type instanceof GraphQLUnionType
) {
if (type instanceof GraphQLInterfaceType || type instanceof GraphQLUnionType) {
resolvers[type.name] = {
__resolveType(parent, context, info) {
return resolveParentFromTypename(parent, info.schema);
Expand Down Expand Up @@ -161,9 +155,7 @@ export default function makeRemoteExecutableSchema({

function createResolver(fetcher: Fetcher): GraphQLFieldResolver<any, any> {
return async (root, args, context, info) => {
const fragments = Object.keys(info.fragments).map(
fragment => info.fragments[fragment],
);
const fragments = Object.keys(info.fragments).map(fragment => info.fragments[fragment]);
const document = {
kind: Kind.DOCUMENT,
definitions: [info.operation, ...fragments],
Expand All @@ -177,15 +169,9 @@ function createResolver(fetcher: Fetcher): GraphQLFieldResolver<any, any> {
};
}

function createSubscriptionResolver(
name: string,
link: ApolloLink,
pubSub: PubSubEngine,
): ResolverFn {
function createSubscriptionResolver(name: string, link: ApolloLink): ResolverFn {
return (root, args, context, info) => {
const fragments = Object.keys(info.fragments).map(
fragment => info.fragments[fragment],
);
const fragments = Object.keys(info.fragments).map(fragment => info.fragments[fragment]);
const document = {
kind: Kind.DOCUMENT,
definitions: [info.operation, ...fragments],
Expand All @@ -196,20 +182,10 @@ function createSubscriptionResolver(
variables: info.variableValues,
context: { graphqlContext: context },
};
const observable = execute(link, operation);

const observer = {
next(value: any) {
pubSub.publish(`remote-schema-${name}`, value.data);
},
error(err: Error) {
pubSub.publish(`remote-schema-${name}`, { errors: [err] });
},
};

observable.subscribe(observer);
const observable = execute(link, operation);

return pubSub.asyncIterator(`remote-schema-${name}`);
return observableToAsyncIterable(observable);
};
}

Expand Down
77 changes: 77 additions & 0 deletions src/stitching/observableToAsyncIterable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { Observable } from 'apollo-link';
import { $$asyncIterator } from 'iterall';
type Callback = (value?: any) => any;

export function observableToAsyncIterable<T>(observable: Observable<T>): AsyncIterator<T> {
const pullQueue: Callback[] = [];
const pushQueue: any[] = [];

let listening = true;

const pushValue = ({ data }: any) => {
if (pullQueue.length !== 0) {
pullQueue.shift()({ value: data, done: false });
} else {
pushQueue.push({ value: data });
}
};

const pushError = (error: any) => {
if (pullQueue.length !== 0) {
pullQueue.shift()({ value: { errors: [error] }, done: false });
} else {
pushQueue.push({ value: { errors: [error] } });
}
};

const pullValue = () => {
return new Promise(resolve => {
if (pushQueue.length !== 0) {
const element = pushQueue.shift();
// either {value: {errors: [...]}} or {value: ...}
resolve({
...element,
done: false,
});
} else {
pullQueue.push(resolve);
}
});
};

const subscription = observable.subscribe({
next(value: any) {
pushValue(value);
},
error(err: Error) {
pushError(err);
},
});

const emptyQueue = () => {
if (listening) {
listening = false;
subscription.unsubscribe();
pullQueue.forEach(resolve => resolve({ value: undefined, done: true }));
pullQueue.length = 0;
pushQueue.length = 0;
}
};

return {
async next() {
return listening ? pullValue() : this.return();
},
return() {
emptyQueue();
return Promise.resolve({ value: undefined, done: true });
},
throw(error) {
emptyQueue();
return Promise.reject(error);
},
[$$asyncIterator]() {
return this;
},
};
}
53 changes: 45 additions & 8 deletions src/test/testMakeRemoteExecutableSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,53 @@ describe('remote subscriptions', () => {

let notificationCnt = 0;
subscribe(schema, subscription).then(results =>
forAwaitEach(
results as AsyncIterable<ExecutionResult>,
(result: ExecutionResult) => {
expect(result).to.have.property('data');
expect(result.data).to.deep.equal(mockNotification);
!notificationCnt++ ? done() : null;
},
),
forAwaitEach(results as AsyncIterable<ExecutionResult>, (result: ExecutionResult) => {
expect(result).to.have.property('data');
expect(result.data).to.deep.equal(mockNotification);
!notificationCnt++ ? done() : null;
}),
);

subscriptionPubSub.publish(subscriptionPubSubTrigger, mockNotification);
});

it('should work without triggering multiple times per notification', done => {
const mockNotification = {
notifications: {
text: 'Hello world',
},
};

const subscription = parse(`
subscription Subscription {
notifications {
text
}
}
`);

let notificationCnt = 0;
subscribe(schema, subscription).then(results =>
forAwaitEach(results as AsyncIterable<ExecutionResult>, (result: ExecutionResult) => {
expect(result).to.have.property('data');
expect(result.data).to.deep.equal(mockNotification);
notificationCnt++;
}),
);

subscribe(schema, subscription).then(results =>
forAwaitEach(results as AsyncIterable<ExecutionResult>, (result: ExecutionResult) => {
expect(result).to.have.property('data');
expect(result.data).to.deep.equal(mockNotification);
}),
);

subscriptionPubSub.publish(subscriptionPubSubTrigger, mockNotification);
subscriptionPubSub.publish(subscriptionPubSubTrigger, mockNotification);

setTimeout(() => {
expect(notificationCnt).to.eq(2);
done();
}, 0);
});
});

0 comments on commit 2868fe2

Please sign in to comment.