From 41bb8a8a9d5e6b9c6a9c65d8003ba56c7d0c7b8d Mon Sep 17 00:00:00 2001 From: Alex Leung Date: Fri, 16 Jun 2017 18:59:12 -0700 Subject: [PATCH 1/7] Added generic AsyncIterator implementation so other PubSubEngine implementations won't have to implement their own AsyncIterator. --- package.json | 2 +- src/pubsub-async-iterator.ts | 110 +++++++++++++++++++++++++++++++++++ src/pubsub-engine.ts | 14 +++-- src/pubsub.ts | 8 +-- src/validation.ts | 43 ++++++++++++++ 5 files changed, 165 insertions(+), 12 deletions(-) create mode 100644 src/pubsub-async-iterator.ts create mode 100644 src/validation.ts diff --git a/package.json b/package.json index bba4e74..3259bd6 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "@types/graphql": "^14.0.0", "@types/mocha": "^2.2.39", "@types/node": "^8.0.28", - "@types/sinon": "^5.0.1", + "@types/sinon": "5.0.2", "@types/sinon-chai": "^3.2.0", "chai": "^4.1.2", "chai-as-promised": "^7.1.1", diff --git a/src/pubsub-async-iterator.ts b/src/pubsub-async-iterator.ts new file mode 100644 index 0000000..418bab9 --- /dev/null +++ b/src/pubsub-async-iterator.ts @@ -0,0 +1,110 @@ +import { $$asyncIterator } from 'iterall'; +import {PubSubEngine} from './pubsub-engine'; + +/** + * A class for digesting PubSubEngine events via the new AsyncIterator interface. + * This implementation is a generic version of the one located at + * @class + * + * @constructor + * + * @property pullQueue @type {Function[]} + * A queue of resolve functions waiting for an incoming event which has not yet arrived. + * This queue expands as next() calls are made without PubSubEngine events occurring in between. + * + * @property pushQueue @type {any[]} + * A queue of PubSubEngine events waiting for next() calls to be made. + * This queue expands as PubSubEngine events arrice without next() calls occurring in between. + * + * @property eventsArray @type {string[]} + * An array of PubSubEngine event names which this PubSubAsyncIterator should watch. + * + * @property allSubscribed @type {Promise} + * A promise of a list of all subscription ids to the passed PubSubEngine. + * + * @property listening @type {boolean} + * Whether or not the PubSubAsynIterator is in listening mode (responding to incoming PubSubEngine events and next() calls). + * Listening begins as true and turns to false once the return method is called. + * + * @property pubsub @type {PubSubEngine} + * The PubSubEngine whose events will be observed. + */ +export class PubSubAsyncIterator implements AsyncIterator { + + private pullQueue: ((value?: any) => Promise>)[]; + private pushQueue: any[]; + private eventsArray: string[]; + private allSubscribed: Promise; + private listening: boolean; + private pubsub: PubSubEngine; + + constructor(pubsub: PubSubEngine, eventNames: string | string[]) { + this.pubsub = pubsub; + this.pullQueue = []; + this.pushQueue = []; + this.listening = true; + this.eventsArray = typeof eventNames === 'string' ? [eventNames] : eventNames; + this.allSubscribed = this.subscribeAll(); + } + + public async next(): Promise> { + await this.allSubscribed; + return this.listening ? this.pullValue() : this.return(); + } + + public async return(): Promise> { + this.emptyQueue(await this.allSubscribed); + return { value: undefined, done: true }; + } + + public async throw(error) { + this.emptyQueue(await this.allSubscribed); + return Promise.reject(error); + } + + public [$$asyncIterator]() { + return this; + } + + private async pushValue(event) { + await this.allSubscribed; + if (this.pullQueue.length !== 0) { + this.pullQueue.shift()({ value: event, done: false }); + } else { + this.pushQueue.push(event); + } + } + + private pullValue(): Promise> { + return new Promise((resolve => { + if (this.pushQueue.length !== 0) { + resolve({ value: this.pushQueue.shift(), done: false }); + } else { + this.pullQueue.push(resolve); + } + }).bind(this)); + } + + private emptyQueue(subscriptionIds: number[]) { + if (this.listening) { + this.listening = false; + this.unsubscribeAll(subscriptionIds); + this.pullQueue.forEach(resolve => resolve({ value: undefined, done: true })); + this.pullQueue.length = 0; + this.pushQueue.length = 0; + } + } + + private subscribeAll() { + return Promise.all(this.eventsArray.map( + eventName => this.pubsub.subscribe(eventName, this.pushValue.bind(this), {}), + )); + } + + private unsubscribeAll(subscriptionIds: number[]) { + for (const subscriptionId of subscriptionIds) { + this.pubsub.unsubscribe(subscriptionId); + } + } + +} diff --git a/src/pubsub-engine.ts b/src/pubsub-engine.ts index 97f25ac..afe18d7 100644 --- a/src/pubsub-engine.ts +++ b/src/pubsub-engine.ts @@ -1,6 +1,10 @@ -export interface PubSubEngine { - publish(triggerName: string, payload: any): Promise; - subscribe(triggerName: string, onMessage: Function, options: Object): Promise; - unsubscribe(subId: number); - asyncIterator(triggers: string | string[]): AsyncIterator; +import {PubSubAsyncIterator} from './pubsub-async-iterator'; + +export abstract class PubSubEngine { + public abstract publish(triggerName: string, payload: any): Promise; + public abstract subscribe(triggerName: string, onMessage: Function, options: Object): Promise; + public abstract unsubscribe(subId: number); + public asyncIterator(triggers: string | string[]): AsyncIterator { + return new PubSubAsyncIterator(this, triggers); + } } diff --git a/src/pubsub.ts b/src/pubsub.ts index 7ab3a97..7972369 100644 --- a/src/pubsub.ts +++ b/src/pubsub.ts @@ -1,17 +1,17 @@ import { EventEmitter } from 'events'; import { PubSubEngine } from './pubsub-engine'; -import { eventEmitterAsyncIterator } from './event-emitter-to-async-iterator'; export interface PubSubOptions { eventEmitter?: EventEmitter; } -export class PubSub implements PubSubEngine { +export class PubSub extends PubSubEngine { protected ee: EventEmitter; private subscriptions: { [key: string]: [string, (...args: any[]) => void] }; private subIdCounter: number; constructor(options: PubSubOptions = {}) { + super(); this.ee = options.eventEmitter || new EventEmitter(); this.subscriptions = {}; this.subIdCounter = 0; @@ -35,8 +35,4 @@ export class PubSub implements PubSubEngine { delete this.subscriptions[subId]; this.ee.removeListener(triggerName, onMessage); } - - public asyncIterator(triggers: string | string[]): AsyncIterator { - return eventEmitterAsyncIterator(this.ee, triggers); - } } diff --git a/src/validation.ts b/src/validation.ts new file mode 100644 index 0000000..0f89b9a --- /dev/null +++ b/src/validation.ts @@ -0,0 +1,43 @@ +import { + ValidationContext, + SelectionNode, + GraphQLError, +} from 'graphql'; + +// XXX I don't know how else to do this. Can't seem to import from GraphQL. +const FIELD = 'Field'; + +/** + * @deprecated + */ +export function tooManySubscriptionFieldsError(subscriptionName: string): string { + return `Subscription "${subscriptionName}" must have only one field.`; +} + +// XXX we temporarily use this validation rule to make our life a bit easier. + +/** + * @deprecated + */ +export function subscriptionHasSingleRootField(context: ValidationContext): any { + const schema = context.getSchema(); + schema.getSubscriptionType(); + return { + OperationDefinition(node) { + const operationName = node.name ? node.name.value : ''; + let numFields = 0; + node.selectionSet.selections.forEach( (selection: SelectionNode) => { + if (selection.kind === FIELD) { + numFields++; + } else { + // why the heck use a fragment on the Subscription type? Just ... don't + context.reportError(new GraphQLError('Apollo subscriptions do not support fragments on the root field', [node])); + } + }); + if (numFields > 1) { + context.reportError(new GraphQLError(tooManySubscriptionFieldsError(operationName), [node])); + } + return false; + }, + }; +} From a43e386e4126050c5cb6cb5ee4ea21f55fd002cc Mon Sep 17 00:00:00 2001 From: Alex Leung Date: Fri, 16 Jun 2017 19:31:09 -0700 Subject: [PATCH 2/7] used T correctly --- src/pubsub-async-iterator.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pubsub-async-iterator.ts b/src/pubsub-async-iterator.ts index 418bab9..ce5f78b 100644 --- a/src/pubsub-async-iterator.ts +++ b/src/pubsub-async-iterator.ts @@ -32,7 +32,7 @@ import {PubSubEngine} from './pubsub-engine'; export class PubSubAsyncIterator implements AsyncIterator { private pullQueue: ((value?: any) => Promise>)[]; - private pushQueue: any[]; + private pushQueue: T[]; private eventsArray: string[]; private allSubscribed: Promise; private listening: boolean; @@ -66,7 +66,7 @@ export class PubSubAsyncIterator implements AsyncIterator { return this; } - private async pushValue(event) { + private async pushValue(event: T) { await this.allSubscribed; if (this.pullQueue.length !== 0) { this.pullQueue.shift()({ value: event, done: false }); From fce636764098786d89fe3c0074c6d9d9659c984d Mon Sep 17 00:00:00 2001 From: Alex Leung Date: Mon, 10 Dec 2018 20:56:27 -0800 Subject: [PATCH 3/7] fixed pubsub-async-iterator documentation typos --- src/pubsub-async-iterator.ts | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/pubsub-async-iterator.ts b/src/pubsub-async-iterator.ts index ce5f78b..36781aa 100644 --- a/src/pubsub-async-iterator.ts +++ b/src/pubsub-async-iterator.ts @@ -3,24 +3,28 @@ import {PubSubEngine} from './pubsub-engine'; /** * A class for digesting PubSubEngine events via the new AsyncIterator interface. - * This implementation is a generic version of the one located at + * This implementation is a generic version of the AsyncIterator, so any PubSubEngine may + * be used. * @class * * @constructor * * @property pullQueue @type {Function[]} * A queue of resolve functions waiting for an incoming event which has not yet arrived. - * This queue expands as next() calls are made without PubSubEngine events occurring in between. + * This queue expands as next() calls are made without PubSubEngine events occurring in-between. * - * @property pushQueue @type {any[]} - * A queue of PubSubEngine events waiting for next() calls to be made. - * This queue expands as PubSubEngine events arrice without next() calls occurring in between. + * @property pushQueue @type {T[]} + * A queue of PubSubEngine events waiting for next() calls to be made, which returns the queued events + * for handling. This queue expands as PubSubEngine events arrive without next() calls occurring in-between. * * @property eventsArray @type {string[]} - * An array of PubSubEngine event names which this PubSubAsyncIterator should watch. + * An array of PubSubEngine event names that this PubSubAsyncIterator should watch. * * @property allSubscribed @type {Promise} - * A promise of a list of all subscription ids to the passed PubSubEngine. + * undefined until next() called for the first time, afterwards is a promise of an array of all + * subscription ids, where each subscription id identified a subscription on the PubSubEngine. + * The undefined initialization ensures that subscriptions are not made to the PubSubEngine + * before next() has ever been called. * * @property listening @type {boolean} * Whether or not the PubSubAsynIterator is in listening mode (responding to incoming PubSubEngine events and next() calls). From bd4e469a3d0b73bd87e1fceebf2246d1d986e041 Mon Sep 17 00:00:00 2001 From: Alex Leung Date: Sun, 16 Dec 2018 18:00:23 -0800 Subject: [PATCH 4/7] Updated 'PubSub Implementation` section of README to mention that PubSubEngine is now an abstract class with an opt-in default AsyncIterator implementation. Also removed a couple of trailing white spaces. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 333c363..ce77285 100644 --- a/README.md +++ b/README.md @@ -225,7 +225,7 @@ For more information about `AsyncIterator`: ### PubSub Implementations -It can be easily replaced with some other implementations of [PubSubEngine interface](https://github.com/apollographql/graphql-subscriptions/blob/master/src/pubsub.ts#L21-L25). There are a couple of them out there: +It can be easily replaced with some other implementations of [PubSubEngine abstract class](https://github.com/apollographql/graphql-subscriptions/blob/master/src/pubsub-engine.ts). Here are a few of them: - Use Redis with https://github.com/davidyaha/graphql-redis-subscriptions - Use Google PubSub with https://github.com/axelspringer/graphql-google-pubsub - Use MQTT enabled broker with https://github.com/davidyaha/graphql-mqtt-subscriptions @@ -236,7 +236,7 @@ It can be easily replaced with some other implementations of [PubSubEngine inter - Use multiple backends with https://github.com/jcoreio/graphql-multiplex-subscriptions - [Add your implementation...](https://github.com/apollographql/graphql-subscriptions/pull/new/master) -You can also implement a `PubSub` of your own, by using the exported interface `PubSubEngine` from this package. +You can also implement a `PubSub` of your own, by using the exported abstract class `PubSubEngine` from this package. By using `extends PubSubEngine` you use the default `asyncIterator` method implementation; by using `implements PubSubEngine` you must implement your own `AsyncIterator`. #### SubscriptionManager **@deprecated** From 6e6f680b9a7a18c94af939ce48cbf28a38b472e0 Mon Sep 17 00:00:00 2001 From: Alex Leung Date: Mon, 17 Dec 2018 19:44:32 -0800 Subject: [PATCH 5/7] updated CHANGELOG.md --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9634d5..11aa786 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +### 1.1.0 + +- Replaced `eventEmitterAsynIterator` with default generic `AsyncIterator`. `extends PubSubEngine` automatically uses generic implementation. No breaking changes for those who continue to use `implements PubSubEngine`. See PR #78 + ### 1.0.0 - BREAKING CHANGE: Changed return type of `publish`.
From 9dc056d7b844dd4681103bb29a1a558153fd8eb0 Mon Sep 17 00:00:00 2001 From: Alex Leung Date: Mon, 17 Dec 2018 19:54:51 -0800 Subject: [PATCH 6/7] fixed PR link in CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 11aa786..7ea4837 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ### 1.1.0 -- Replaced `eventEmitterAsynIterator` with default generic `AsyncIterator`. `extends PubSubEngine` automatically uses generic implementation. No breaking changes for those who continue to use `implements PubSubEngine`. See PR #78 +- Replaced `eventEmitterAsynIterator` with default generic `AsyncIterator` named `PubSubAsyncIterator`. `extends PubSubEngine` automatically uses generic implementation. No breaking changes for those who continue to use `implements PubSubEngine`. See PR [#78](https://github.com/apollographql/graphql-subscriptions/pull/78). ### 1.0.0 From a90f587ec29edc6abe2be11647aed03e6a72a127 Mon Sep 17 00:00:00 2001 From: Andy Edwards Date: Thu, 14 Feb 2019 14:52:13 -0600 Subject: [PATCH 7/7] fix(PubSubAsyncIterator): don't subscribe until first call to next() --- src/event-emitter-to-async-iterator.ts | 75 -------------------------- src/pubsub-async-iterator.ts | 39 ++++++++------ src/test/tests.ts | 18 ++++--- 3 files changed, 33 insertions(+), 99 deletions(-) delete mode 100644 src/event-emitter-to-async-iterator.ts diff --git a/src/event-emitter-to-async-iterator.ts b/src/event-emitter-to-async-iterator.ts deleted file mode 100644 index 0aef3b2..0000000 --- a/src/event-emitter-to-async-iterator.ts +++ /dev/null @@ -1,75 +0,0 @@ -import { $$asyncIterator } from 'iterall'; -import { EventEmitter } from 'events'; - -export function eventEmitterAsyncIterator(eventEmitter: EventEmitter, - eventsNames: string | string[]): AsyncIterator { - const pullQueue = []; - const pushQueue = []; - const eventsArray = typeof eventsNames === 'string' ? [eventsNames] : eventsNames; - let listening = true; - let addedListeners = false; - - const pushValue = event => { - if (pullQueue.length !== 0) { - pullQueue.shift()({ value: event, done: false }); - } else { - pushQueue.push(event); - } - }; - - const pullValue = () => { - return new Promise(resolve => { - if (pushQueue.length !== 0) { - resolve({ value: pushQueue.shift(), done: false }); - } else { - pullQueue.push(resolve); - } - }); - }; - - const emptyQueue = () => { - if (listening) { - listening = false; - if (addedListeners) { removeEventListeners(); } - pullQueue.forEach(resolve => resolve({ value: undefined, done: true })); - pullQueue.length = 0; - pushQueue.length = 0; - } - }; - - const addEventListeners = () => { - for (const eventName of eventsArray) { - eventEmitter.addListener(eventName, pushValue); - } - }; - - const removeEventListeners = () => { - for (const eventName of eventsArray) { - eventEmitter.removeListener(eventName, pushValue); - } - }; - - return { - next() { - if (!listening) { return this.return(); } - if (!addedListeners) { - addEventListeners(); - addedListeners = true; - } - return pullValue(); - }, - return() { - emptyQueue(); - - return Promise.resolve({ value: undefined, done: true }); - }, - throw(error) { - emptyQueue(); - - return Promise.reject(error); - }, - [$$asyncIterator]() { - return this; - }, - }; -} diff --git a/src/pubsub-async-iterator.ts b/src/pubsub-async-iterator.ts index 36781aa..7c256fc 100644 --- a/src/pubsub-async-iterator.ts +++ b/src/pubsub-async-iterator.ts @@ -26,9 +26,9 @@ import {PubSubEngine} from './pubsub-engine'; * The undefined initialization ensures that subscriptions are not made to the PubSubEngine * before next() has ever been called. * - * @property listening @type {boolean} - * Whether or not the PubSubAsynIterator is in listening mode (responding to incoming PubSubEngine events and next() calls). - * Listening begins as true and turns to false once the return method is called. + * @property running @type {boolean} + * Whether or not the PubSubAsynIterator is in running mode (responding to incoming PubSubEngine events and next() calls). + * running begins as true and turns to false once the return method is called. * * @property pubsub @type {PubSubEngine} * The PubSubEngine whose events will be observed. @@ -39,30 +39,30 @@ export class PubSubAsyncIterator implements AsyncIterator { private pushQueue: T[]; private eventsArray: string[]; private allSubscribed: Promise; - private listening: boolean; + private running: boolean; private pubsub: PubSubEngine; constructor(pubsub: PubSubEngine, eventNames: string | string[]) { this.pubsub = pubsub; this.pullQueue = []; this.pushQueue = []; - this.listening = true; + this.running = true; + this.allSubscribed = null; this.eventsArray = typeof eventNames === 'string' ? [eventNames] : eventNames; - this.allSubscribed = this.subscribeAll(); } public async next(): Promise> { - await this.allSubscribed; - return this.listening ? this.pullValue() : this.return(); + if (!this.allSubscribed) await (this.allSubscribed = this.subscribeAll()); + return this.pullValue() } public async return(): Promise> { - this.emptyQueue(await this.allSubscribed); + await this.emptyQueue(); return { value: undefined, done: true }; } public async throw(error) { - this.emptyQueue(await this.allSubscribed); + await this.emptyQueue(); return Promise.reject(error); } @@ -73,7 +73,10 @@ export class PubSubAsyncIterator implements AsyncIterator { private async pushValue(event: T) { await this.allSubscribed; if (this.pullQueue.length !== 0) { - this.pullQueue.shift()({ value: event, done: false }); + this.pullQueue.shift()(this.running + ? { value: event, done: false } + : { value: undefined, done: true } + ); } else { this.pushQueue.push(event); } @@ -82,20 +85,24 @@ export class PubSubAsyncIterator implements AsyncIterator { private pullValue(): Promise> { return new Promise((resolve => { if (this.pushQueue.length !== 0) { - resolve({ value: this.pushQueue.shift(), done: false }); + resolve(this.running + ? { value: this.pushQueue.shift(), done: false } + : { value: undefined, done: true } + ); } else { this.pullQueue.push(resolve); } }).bind(this)); } - private emptyQueue(subscriptionIds: number[]) { - if (this.listening) { - this.listening = false; - this.unsubscribeAll(subscriptionIds); + private async emptyQueue() { + if (this.running) { + this.running = false; this.pullQueue.forEach(resolve => resolve({ value: undefined, done: true })); this.pullQueue.length = 0; this.pushQueue.length = 0; + const subscriptionIds = await this.allSubscribed; + if (subscriptionIds) this.unsubscribeAll(subscriptionIds); } } diff --git a/src/test/tests.ts b/src/test/tests.ts index 3a36dd2..5190b31 100644 --- a/src/test/tests.ts +++ b/src/test/tests.ts @@ -88,19 +88,21 @@ describe('AsyncIterator', () => { const iterator = ps.asyncIterator(eventName); iterator.next().then(result => { - expect(result).to.not.be.undefined; - expect(result.value).to.not.be.undefined; - expect(result.done).to.be.false; - }); + expect(result).to.deep.equal({ + value: undefined, + done: true, + }); + }).catch(done); ps.publish(eventName, { test: true }); iterator.next().then(result => { - expect(result).to.not.be.undefined; - expect(result.value).to.be.undefined; - expect(result.done).to.be.true; + expect(result).to.deep.equal({ + value: undefined, + done: true, + }); done(); - }); + }).catch(done); iterator.return();