diff --git a/CHANGELOG.md b/CHANGELOG.md
index c9634d5..7ea4837 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,9 @@
# Changelog
+### 1.1.0
+
+- Replaced `eventEmitterAsynIterator` with default generic `AsyncIterator` named `PubSubAsyncIterator`. `extends PubSubEngine` automatically uses generic implementation. No breaking changes for those who continue to use `implements PubSubEngine`. See PR [#78](https://github.com/apollographql/graphql-subscriptions/pull/78).
+
### 1.0.0
- BREAKING CHANGE: Changed return type of `publish`.
diff --git a/README.md b/README.md
index 333c363..ce77285 100644
--- a/README.md
+++ b/README.md
@@ -225,7 +225,7 @@ For more information about `AsyncIterator`:
### PubSub Implementations
-It can be easily replaced with some other implementations of [PubSubEngine interface](https://github.com/apollographql/graphql-subscriptions/blob/master/src/pubsub.ts#L21-L25). There are a couple of them out there:
+It can be easily replaced with some other implementations of [PubSubEngine abstract class](https://github.com/apollographql/graphql-subscriptions/blob/master/src/pubsub-engine.ts). Here are a few of them:
- Use Redis with https://github.com/davidyaha/graphql-redis-subscriptions
- Use Google PubSub with https://github.com/axelspringer/graphql-google-pubsub
- Use MQTT enabled broker with https://github.com/davidyaha/graphql-mqtt-subscriptions
@@ -236,7 +236,7 @@ It can be easily replaced with some other implementations of [PubSubEngine inter
- Use multiple backends with https://github.com/jcoreio/graphql-multiplex-subscriptions
- [Add your implementation...](https://github.com/apollographql/graphql-subscriptions/pull/new/master)
-You can also implement a `PubSub` of your own, by using the exported interface `PubSubEngine` from this package.
+You can also implement a `PubSub` of your own, by using the exported abstract class `PubSubEngine` from this package. By using `extends PubSubEngine` you use the default `asyncIterator` method implementation; by using `implements PubSubEngine` you must implement your own `AsyncIterator`.
#### SubscriptionManager **@deprecated**
diff --git a/package.json b/package.json
index bba4e74..3259bd6 100644
--- a/package.json
+++ b/package.json
@@ -31,7 +31,7 @@
"@types/graphql": "^14.0.0",
"@types/mocha": "^2.2.39",
"@types/node": "^8.0.28",
- "@types/sinon": "^5.0.1",
+ "@types/sinon": "5.0.2",
"@types/sinon-chai": "^3.2.0",
"chai": "^4.1.2",
"chai-as-promised": "^7.1.1",
diff --git a/src/event-emitter-to-async-iterator.ts b/src/event-emitter-to-async-iterator.ts
deleted file mode 100644
index 0aef3b2..0000000
--- a/src/event-emitter-to-async-iterator.ts
+++ /dev/null
@@ -1,75 +0,0 @@
-import { $$asyncIterator } from 'iterall';
-import { EventEmitter } from 'events';
-
-export function eventEmitterAsyncIterator(eventEmitter: EventEmitter,
- eventsNames: string | string[]): AsyncIterator {
- const pullQueue = [];
- const pushQueue = [];
- const eventsArray = typeof eventsNames === 'string' ? [eventsNames] : eventsNames;
- let listening = true;
- let addedListeners = false;
-
- const pushValue = event => {
- if (pullQueue.length !== 0) {
- pullQueue.shift()({ value: event, done: false });
- } else {
- pushQueue.push(event);
- }
- };
-
- const pullValue = () => {
- return new Promise(resolve => {
- if (pushQueue.length !== 0) {
- resolve({ value: pushQueue.shift(), done: false });
- } else {
- pullQueue.push(resolve);
- }
- });
- };
-
- const emptyQueue = () => {
- if (listening) {
- listening = false;
- if (addedListeners) { removeEventListeners(); }
- pullQueue.forEach(resolve => resolve({ value: undefined, done: true }));
- pullQueue.length = 0;
- pushQueue.length = 0;
- }
- };
-
- const addEventListeners = () => {
- for (const eventName of eventsArray) {
- eventEmitter.addListener(eventName, pushValue);
- }
- };
-
- const removeEventListeners = () => {
- for (const eventName of eventsArray) {
- eventEmitter.removeListener(eventName, pushValue);
- }
- };
-
- return {
- next() {
- if (!listening) { return this.return(); }
- if (!addedListeners) {
- addEventListeners();
- addedListeners = true;
- }
- return pullValue();
- },
- return() {
- emptyQueue();
-
- return Promise.resolve({ value: undefined, done: true });
- },
- throw(error) {
- emptyQueue();
-
- return Promise.reject(error);
- },
- [$$asyncIterator]() {
- return this;
- },
- };
-}
diff --git a/src/pubsub-async-iterator.ts b/src/pubsub-async-iterator.ts
new file mode 100644
index 0000000..7c256fc
--- /dev/null
+++ b/src/pubsub-async-iterator.ts
@@ -0,0 +1,121 @@
+import { $$asyncIterator } from 'iterall';
+import {PubSubEngine} from './pubsub-engine';
+
+/**
+ * A class for digesting PubSubEngine events via the new AsyncIterator interface.
+ * This implementation is a generic version of the AsyncIterator, so any PubSubEngine may
+ * be used.
+ * @class
+ *
+ * @constructor
+ *
+ * @property pullQueue @type {Function[]}
+ * A queue of resolve functions waiting for an incoming event which has not yet arrived.
+ * This queue expands as next() calls are made without PubSubEngine events occurring in-between.
+ *
+ * @property pushQueue @type {T[]}
+ * A queue of PubSubEngine events waiting for next() calls to be made, which returns the queued events
+ * for handling. This queue expands as PubSubEngine events arrive without next() calls occurring in-between.
+ *
+ * @property eventsArray @type {string[]}
+ * An array of PubSubEngine event names that this PubSubAsyncIterator should watch.
+ *
+ * @property allSubscribed @type {Promise}
+ * undefined until next() called for the first time, afterwards is a promise of an array of all
+ * subscription ids, where each subscription id identified a subscription on the PubSubEngine.
+ * The undefined initialization ensures that subscriptions are not made to the PubSubEngine
+ * before next() has ever been called.
+ *
+ * @property running @type {boolean}
+ * Whether or not the PubSubAsynIterator is in running mode (responding to incoming PubSubEngine events and next() calls).
+ * running begins as true and turns to false once the return method is called.
+ *
+ * @property pubsub @type {PubSubEngine}
+ * The PubSubEngine whose events will be observed.
+ */
+export class PubSubAsyncIterator implements AsyncIterator {
+
+ private pullQueue: ((value?: any) => Promise>)[];
+ private pushQueue: T[];
+ private eventsArray: string[];
+ private allSubscribed: Promise;
+ private running: boolean;
+ private pubsub: PubSubEngine;
+
+ constructor(pubsub: PubSubEngine, eventNames: string | string[]) {
+ this.pubsub = pubsub;
+ this.pullQueue = [];
+ this.pushQueue = [];
+ this.running = true;
+ this.allSubscribed = null;
+ this.eventsArray = typeof eventNames === 'string' ? [eventNames] : eventNames;
+ }
+
+ public async next(): Promise> {
+ if (!this.allSubscribed) await (this.allSubscribed = this.subscribeAll());
+ return this.pullValue()
+ }
+
+ public async return(): Promise> {
+ await this.emptyQueue();
+ return { value: undefined, done: true };
+ }
+
+ public async throw(error) {
+ await this.emptyQueue();
+ return Promise.reject(error);
+ }
+
+ public [$$asyncIterator]() {
+ return this;
+ }
+
+ private async pushValue(event: T) {
+ await this.allSubscribed;
+ if (this.pullQueue.length !== 0) {
+ this.pullQueue.shift()(this.running
+ ? { value: event, done: false }
+ : { value: undefined, done: true }
+ );
+ } else {
+ this.pushQueue.push(event);
+ }
+ }
+
+ private pullValue(): Promise> {
+ return new Promise((resolve => {
+ if (this.pushQueue.length !== 0) {
+ resolve(this.running
+ ? { value: this.pushQueue.shift(), done: false }
+ : { value: undefined, done: true }
+ );
+ } else {
+ this.pullQueue.push(resolve);
+ }
+ }).bind(this));
+ }
+
+ private async emptyQueue() {
+ if (this.running) {
+ this.running = false;
+ this.pullQueue.forEach(resolve => resolve({ value: undefined, done: true }));
+ this.pullQueue.length = 0;
+ this.pushQueue.length = 0;
+ const subscriptionIds = await this.allSubscribed;
+ if (subscriptionIds) this.unsubscribeAll(subscriptionIds);
+ }
+ }
+
+ private subscribeAll() {
+ return Promise.all(this.eventsArray.map(
+ eventName => this.pubsub.subscribe(eventName, this.pushValue.bind(this), {}),
+ ));
+ }
+
+ private unsubscribeAll(subscriptionIds: number[]) {
+ for (const subscriptionId of subscriptionIds) {
+ this.pubsub.unsubscribe(subscriptionId);
+ }
+ }
+
+}
diff --git a/src/pubsub-engine.ts b/src/pubsub-engine.ts
index 97f25ac..afe18d7 100644
--- a/src/pubsub-engine.ts
+++ b/src/pubsub-engine.ts
@@ -1,6 +1,10 @@
-export interface PubSubEngine {
- publish(triggerName: string, payload: any): Promise;
- subscribe(triggerName: string, onMessage: Function, options: Object): Promise;
- unsubscribe(subId: number);
- asyncIterator(triggers: string | string[]): AsyncIterator;
+import {PubSubAsyncIterator} from './pubsub-async-iterator';
+
+export abstract class PubSubEngine {
+ public abstract publish(triggerName: string, payload: any): Promise;
+ public abstract subscribe(triggerName: string, onMessage: Function, options: Object): Promise;
+ public abstract unsubscribe(subId: number);
+ public asyncIterator(triggers: string | string[]): AsyncIterator {
+ return new PubSubAsyncIterator(this, triggers);
+ }
}
diff --git a/src/pubsub.ts b/src/pubsub.ts
index 7ab3a97..7972369 100644
--- a/src/pubsub.ts
+++ b/src/pubsub.ts
@@ -1,17 +1,17 @@
import { EventEmitter } from 'events';
import { PubSubEngine } from './pubsub-engine';
-import { eventEmitterAsyncIterator } from './event-emitter-to-async-iterator';
export interface PubSubOptions {
eventEmitter?: EventEmitter;
}
-export class PubSub implements PubSubEngine {
+export class PubSub extends PubSubEngine {
protected ee: EventEmitter;
private subscriptions: { [key: string]: [string, (...args: any[]) => void] };
private subIdCounter: number;
constructor(options: PubSubOptions = {}) {
+ super();
this.ee = options.eventEmitter || new EventEmitter();
this.subscriptions = {};
this.subIdCounter = 0;
@@ -35,8 +35,4 @@ export class PubSub implements PubSubEngine {
delete this.subscriptions[subId];
this.ee.removeListener(triggerName, onMessage);
}
-
- public asyncIterator(triggers: string | string[]): AsyncIterator {
- return eventEmitterAsyncIterator(this.ee, triggers);
- }
}
diff --git a/src/test/tests.ts b/src/test/tests.ts
index 3a36dd2..5190b31 100644
--- a/src/test/tests.ts
+++ b/src/test/tests.ts
@@ -88,19 +88,21 @@ describe('AsyncIterator', () => {
const iterator = ps.asyncIterator(eventName);
iterator.next().then(result => {
- expect(result).to.not.be.undefined;
- expect(result.value).to.not.be.undefined;
- expect(result.done).to.be.false;
- });
+ expect(result).to.deep.equal({
+ value: undefined,
+ done: true,
+ });
+ }).catch(done);
ps.publish(eventName, { test: true });
iterator.next().then(result => {
- expect(result).to.not.be.undefined;
- expect(result.value).to.be.undefined;
- expect(result.done).to.be.true;
+ expect(result).to.deep.equal({
+ value: undefined,
+ done: true,
+ });
done();
- });
+ }).catch(done);
iterator.return();
diff --git a/src/validation.ts b/src/validation.ts
new file mode 100644
index 0000000..0f89b9a
--- /dev/null
+++ b/src/validation.ts
@@ -0,0 +1,43 @@
+import {
+ ValidationContext,
+ SelectionNode,
+ GraphQLError,
+} from 'graphql';
+
+// XXX I don't know how else to do this. Can't seem to import from GraphQL.
+const FIELD = 'Field';
+
+/**
+ * @deprecated
+ */
+export function tooManySubscriptionFieldsError(subscriptionName: string): string {
+ return `Subscription "${subscriptionName}" must have only one field.`;
+}
+
+// XXX we temporarily use this validation rule to make our life a bit easier.
+
+/**
+ * @deprecated
+ */
+export function subscriptionHasSingleRootField(context: ValidationContext): any {
+ const schema = context.getSchema();
+ schema.getSubscriptionType();
+ return {
+ OperationDefinition(node) {
+ const operationName = node.name ? node.name.value : '';
+ let numFields = 0;
+ node.selectionSet.selections.forEach( (selection: SelectionNode) => {
+ if (selection.kind === FIELD) {
+ numFields++;
+ } else {
+ // why the heck use a fragment on the Subscription type? Just ... don't
+ context.reportError(new GraphQLError('Apollo subscriptions do not support fragments on the root field', [node]));
+ }
+ });
+ if (numFields > 1) {
+ context.reportError(new GraphQLError(tooManySubscriptionFieldsError(operationName), [node]));
+ }
+ return false;
+ },
+ };
+}