Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#78 fixed up and rebased #198

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

### 1.1.0

- Fix [#132](https://github.com/apollographql/graphql-subscriptions/issues/132) - withFilter was previously always passing undefined as its first argument to the filterFn
- Partially attempt to fix [#143](https://github.com/apollographql/graphql-subscriptions/issues/143) - try to reduce occurrence of certain memory leaks with the built-in PubSubEngine implementation
- Replaced `eventEmitterAsyncIterator` with default generic `AsyncIterator` named `PubSubAsyncIterator`. `extends PubSubEngine` automatically uses generic implementation. No breaking changes for those who continue to use `implements PubSubEngine`. See PR [#78](https://github.com/apollographql/graphql-subscriptions/pull/78).

### 1.0.0

- BREAKING CHANGE: Changed return type of `publish`. <br/>
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ For more information about `AsyncIterator`:

### PubSub Implementations

It can be easily replaced with some other implementations of [PubSubEngine interface](https://github.com/apollographql/graphql-subscriptions/blob/master/src/pubsub.ts#L21-L25). There are a couple of them out there:
It can be easily replaced with some other implementations of [PubSubEngine abstract class](https://github.com/apollographql/graphql-subscriptions/blob/master/src/pubsub-engine.ts). Here are a few of them:
- Use Redis with https://github.com/davidyaha/graphql-redis-subscriptions
- Use Google PubSub with https://github.com/axelspringer/graphql-google-pubsub
- Use MQTT enabled broker with https://github.com/davidyaha/graphql-mqtt-subscriptions
Expand All @@ -237,7 +237,7 @@ It can be easily replaced with some other implementations of [PubSubEngine inter
- Use multiple backends with https://github.com/jcoreio/graphql-multiplex-subscriptions
- [Add your implementation...](https://github.com/apollographql/graphql-subscriptions/pull/new/master)

You can also implement a `PubSub` of your own, by using the exported interface `PubSubEngine` from this package.
You can also implement a `PubSub` of your own, by using the exported abstract class `PubSubEngine` from this package. By using `extends PubSubEngine` you use the default `asyncIterator` method implementation; by using `implements PubSubEngine` you must implement your own `AsyncIterator`.

#### SubscriptionManager **@deprecated**

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"@types/graphql": "^14.0.0",
"@types/mocha": "^2.2.39",
"@types/node": "^8.0.28",
"@types/sinon": "5.0.1",
"@types/sinon": "5.0.2",
"@types/sinon-chai": "^3.2.0",
"chai": "^4.1.2",
"chai-as-promised": "^7.1.1",
Expand Down
75 changes: 0 additions & 75 deletions src/event-emitter-to-async-iterator.ts

This file was deleted.

123 changes: 123 additions & 0 deletions src/pubsub-async-iterator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import { $$asyncIterator } from 'iterall';
import { PubSubEngine } from './pubsub-engine';

/**
* A class for digesting PubSubEngine events via the new AsyncIterator interface.
* This implementation is a generic version of the AsyncIterator, so any PubSubEngine may
* be used.
* @class
*
* @constructor
*
* @property pullQueue @type {Function[]}
* A queue of resolve functions waiting for an incoming event which has not yet arrived.
* This queue expands as next() calls are made without PubSubEngine events occurring in-between.
*
* @property pushQueue @type {T[]}
* A queue of PubSubEngine events waiting for next() calls to be made, which returns the queued events
* for handling. This queue expands as PubSubEngine events arrive without next() calls occurring in-between.
*
* @property eventsArray @type {string[]}
* An array of PubSubEngine event names that this PubSubAsyncIterator should watch.
*
* @property allSubscribed @type {Promise<number[]>}
* undefined until next() called for the first time, afterwards is a promise of an array of all
* subscription ids, where each subscription id identified a subscription on the PubSubEngine.
* The undefined initialization ensures that subscriptions are not made to the PubSubEngine
* before next() has ever been called.
*
* @property running @type {boolean}
* Whether or not the PubSubAsyncIterator is in running mode (responding to incoming PubSubEngine events and next() calls).
* running begins as true and turns to false once the return method is called.
*
* @property pubsub @type {PubSubEngine}
* The PubSubEngine whose events will be observed.
*/
export class PubSubAsyncIterator<T> implements AsyncIterator<T> {

private pullQueue: ((value: IteratorResult<T>) => void)[];
private pushQueue: T[];
private eventsArray: string[];
private allSubscribed: Promise<number[]>;
private running: boolean;
private pubsub: PubSubEngine;

constructor(pubsub: PubSubEngine, eventNames: string | string[]) {
this.pubsub = pubsub;
this.pullQueue = [];
this.pushQueue = [];
this.running = true;
this.allSubscribed = null;
this.eventsArray = typeof eventNames === 'string' ? [eventNames] : eventNames;
}

public async next(): Promise<IteratorResult<T>> {
if (!this.allSubscribed) { await (this.allSubscribed = this.subscribeAll()); }
return this.pullValue();
}

public async return(): Promise<IteratorResult<T>> {
await this.emptyQueue();
return { value: undefined, done: true };
}

public async throw(error) {
await this.emptyQueue();
return Promise.reject(error);
}

public [$$asyncIterator]() {
return this;
}

private async pushValue(event: T) {
await this.allSubscribed;
if (this.pullQueue.length !== 0) {
this.pullQueue.shift()(this.running
? { value: event, done: false }
: { value: undefined, done: true },
);
} else {
this.pushQueue.push(event);
}
}

private pullValue(): Promise<IteratorResult<T>> {
return new Promise(
resolve => {
if (this.pushQueue.length !== 0) {
resolve(this.running
? { value: this.pushQueue.shift(), done: false }
: { value: undefined, done: true },
);
} else {
this.pullQueue.push(resolve);
}
},
);
}

private async emptyQueue() {
if (this.running) {
this.running = false;
this.pullQueue.forEach(resolve => resolve({ value: undefined, done: true }));
this.pullQueue.length = 0;
this.pushQueue.length = 0;
const subscriptionIds = await this.allSubscribed;
if (subscriptionIds) { this.unsubscribeAll(subscriptionIds); }
}
}

private subscribeAll() {
return Promise.all(this.eventsArray.map(
eventName => this.pubsub.subscribe(eventName, this.pushValue.bind(this), {}),
));
}

private unsubscribeAll(subscriptionIds: number[]) {
for (const subscriptionId of subscriptionIds) {
this.pubsub.unsubscribe(subscriptionId);
}
}

}
14 changes: 9 additions & 5 deletions src/pubsub-engine.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
export interface PubSubEngine {
publish(triggerName: string, payload: any): Promise<void>;
subscribe(triggerName: string, onMessage: Function, options: Object): Promise<number>;
unsubscribe(subId: number);
asyncIterator<T>(triggers: string | string[]): AsyncIterator<T>;
import {PubSubAsyncIterator} from './pubsub-async-iterator';

export abstract class PubSubEngine {
public abstract publish(triggerName: string, payload: any): Promise<void>;
public abstract subscribe(triggerName: string, onMessage: Function, options: Object): Promise<number>;
public abstract unsubscribe(subId: number);
public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> {
return new PubSubAsyncIterator<T>(this, triggers);
}
}
8 changes: 2 additions & 6 deletions src/pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { EventEmitter } from 'events';
import { PubSubEngine } from './pubsub-engine';
import { eventEmitterAsyncIterator } from './event-emitter-to-async-iterator';

export interface PubSubOptions {
eventEmitter?: EventEmitter;
}

export class PubSub implements PubSubEngine {
export class PubSub extends PubSubEngine {
protected ee: EventEmitter;
private subscriptions: { [key: string]: [string, (...args: any[]) => void] };
private subIdCounter: number;

constructor(options: PubSubOptions = {}) {
super();
this.ee = options.eventEmitter || new EventEmitter();
this.subscriptions = {};
this.subIdCounter = 0;
Expand All @@ -35,8 +35,4 @@ export class PubSub implements PubSubEngine {
delete this.subscriptions[subId];
this.ee.removeListener(triggerName, onMessage);
}

public asyncIterator<T>(triggers: string | string[]): AsyncIterator<T> {
return eventEmitterAsyncIterator<T>(this.ee, triggers);
}
}
18 changes: 10 additions & 8 deletions src/test/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,21 @@ describe('AsyncIterator', () => {
const iterator = ps.asyncIterator(eventName);

iterator.next().then(result => {
expect(result).to.not.be.undefined;
expect(result.value).to.not.be.undefined;
expect(result.done).to.be.false;
});
expect(result).to.deep.equal({
value: undefined,
done: true,
});
}).catch(done);

ps.publish(eventName, { test: true });

iterator.next().then(result => {
expect(result).to.not.be.undefined;
expect(result.value).to.be.undefined;
expect(result.done).to.be.true;
expect(result).to.deep.equal({
value: undefined,
done: true,
});
done();
});
}).catch(done);

iterator.return();

Expand Down