Skip to content

Commit

Permalink
chore(context): defer initialization of properties for context objects
Browse files Browse the repository at this point in the history
  • Loading branch information
raymondfeng committed Feb 11, 2019
1 parent 6ee3837 commit ffda885
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 57 deletions.
10 changes: 7 additions & 3 deletions packages/context/src/__tests__/unit/context-observer.unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class TestContext extends Context {
* Wait until the context event queue is empty or an error is thrown
*/
waitUntilObserversNotified(): Promise<void> {
return this.waitForObserversNotifiedForPendingEvents(100);
return this.waitUntilPendingNotificationsDone(100);
}
}

Expand Down Expand Up @@ -63,9 +63,10 @@ describe('Context', () => {

it('registers observers on context with parent', () => {
const childCtx = new TestContext(ctx, 'child');
expect(childCtx.parentEventListeners).to.be.undefined();
childCtx.subscribe(nonMatchingObserver);
expect(childCtx.parentEventListeners!.has('bind')).to.be.true();
expect(childCtx.parentEventListeners!.has('unbind')).to.be.true();
childCtx.subscribe(nonMatchingObserver);
expect(childCtx.isSubscribed(nonMatchingObserver)).to.true();
expect(ctx.isSubscribed(nonMatchingObserver)).to.false();
});
Expand All @@ -83,10 +84,13 @@ describe('Context', () => {
it('un-registers observers on context chain during close', () => {
const childCtx = new TestContext(ctx, 'child');
childCtx.subscribe(nonMatchingObserver);
const parentEventListeners = new Map(childCtx.parentEventListeners!);
childCtx.close();
for (const [event, listener] of parentEventListeners) {
expect(ctx.listeners(event)).to.not.containEql(listener);
}
expect(childCtx.parentEventListeners).to.be.undefined();
expect(childCtx.isSubscribed(nonMatchingObserver)).to.false();
expect(ctx.isSubscribed(nonMatchingObserver)).to.false();
});

function givenNonMatchingObserver() {
Expand Down
157 changes: 103 additions & 54 deletions packages/context/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,34 @@ export class Context extends EventEmitter {
protected _parent?: Context;

/**
* Event listeners for parent context mapped by event names
* Event listeners for parent context keyed by event names. It keeps track
* of listeners from this context against its parent so that we can remove
* these listeners when this context is closed.
*/
// tslint:disable-next-line:no-any
protected _parentEventListeners:
| Map<string, (...args: any[]) => void>
| Map<
string,
// tslint:disable-next-line:no-any
(...args: any[]) => void
>
| undefined;

/**
* A list of registered context observers
* A list of registered context observers. The Set will be created when the
* first observer is added.
*/
protected readonly observers: Set<ContextEventObserver> = new Set();
protected observers: Set<ContextEventObserver> | undefined;

/**
* Internal counter for pending events which observers have not processed yet
* Internal counter for pending notification events which are yet to be
* processed by observers.
*/
private pendingEvents = 0;
private pendingNotifications = 0;

/**
* Queue for background notifications for observers
*/
private notificationQueue: AsyncIterableIterator<Notification> | undefined;

/**
* Create a new context. For example,
Expand Down Expand Up @@ -99,15 +111,12 @@ export class Context extends EventEmitter {
}
this._parent = _parent;
this.name = name || uuidv1();

this.setupEventHandlers();
}

/**
* Wrap the debug statement so that it always print out the context name
* as the prefix
* @param formatter
* @param args
* @param args Arguments for the debug
*/
// tslint:disable-next-line:no-any
private _debug(...args: any[]) {
Expand All @@ -123,37 +132,26 @@ export class Context extends EventEmitter {

/**
* Set up an internal listener to notify registered observers asynchronously
* upon `bind` and `unbind` events
*/
private setupEventHandlers() {
// tslint:disable-next-line:no-any
const notificationErrorHandler = (...args: any[]) => {
this._debug(...args);
// Bubbling up the error event over the context chain
// until we find an error listener
let ctx: Context | undefined = this;
while (ctx) {
if (ctx.listenerCount('error') === 0) {
// No error listener found, try its parent
ctx = ctx._parent;
continue;
}
this._debug('Emitting error to context %s', ctx.name);
ctx.emit('error', ...args);
return;
}
// No context with error listeners found
this._debug('No error handler is configured for the context chain');
// Let it crash now by emitting an error event
this.emit('error', ...args);
};
* upon `bind` and `unbind` events. This method will be called lazily when
* the first observer is added.
*/
private setupEventHandlersIfNeeded() {
if (this.notificationQueue != null) return;

this.addParentEventListener('bind');
this.addParentEventListener('unbind');

// The following are two async functions. Returned promises are ignored as
// they are long-running background tasks.
this.startNotificationTask().catch(notificationErrorHandler);
this.startNotificationTask().catch(err => {
this.handleNotificationError(err);
});

let ctx = this._parent;
while (ctx) {
ctx.setupEventHandlersIfNeeded();
ctx = ctx._parent;
}
}

/**
Expand All @@ -163,8 +161,13 @@ export class Context extends EventEmitter {
*/
private addParentEventListener(event: string) {
if (this._parent == null) return;

// Keep track of parent event listeners so that we can remove them
this._parentEventListeners = this._parentEventListeners || new Map();
if (this._parentEventListeners.has(event)) return;

const parentEventListener = (
binding: Binding<unknown>,
binding: Readonly<Binding<unknown>>,
context: Context,
) => {
// Propagate the event to this context only if the binding key does not
Expand All @@ -189,35 +192,69 @@ export class Context extends EventEmitter {
);
this.emit(event, binding, context);
};
// Keep track of parent event listeners so that we can remove them
this._parentEventListeners = this._parentEventListeners || new Map();
this._parentEventListeners.set(event, parentEventListener);
// Listen on the parent context events
this._parent.on(event, parentEventListener);
}

/**
* Handle errors caught during the notification of observers
* @param err Error
*/
private handleNotificationError(err: unknown) {
this._debug(err);
// Bubbling up the error event over the context chain
// until we find an error listener
let ctx: Context | undefined = this;
while (ctx) {
if (ctx.listenerCount('error') === 0) {
// No error listener found, try its parent
ctx = ctx._parent;
continue;
}
this._debug('Emitting error to context %s', ctx.name);
ctx.emit('error', err);
return;
}
// No context with error listeners found
this._debug('No error handler is configured for the context chain');
// Let it crash now by emitting an error event
this.emit('error', err);
}

/**
* Start a background task to listen on context events and notify observers
*/
private async startNotificationTask() {
private startNotificationTask() {
// Set up listeners on `bind` and `unbind` for notifications
this.setupNotification('bind', 'unbind');
const events: AsyncIterable<Notification> = pEvent.iterator(
this,
'notification',
);

// Create an async iterator for the `notification` event as a queue
this.notificationQueue = pEvent.iterator(this, 'notification');

return this.processNotifications();
}

/**
* Process notification events as they arrive on the queue
*/
private async processNotifications() {
const events = this.notificationQueue;
if (events == null) return;
for await (const {eventType, binding, context, observers} of events) {
// The loop will happen asynchronously upon events
try {
// The execution of observers happen in the Promise micro-task queue
await this.notifyObservers(eventType, binding, context, observers);
this.pendingEvents--;
this.pendingNotifications--;
this._debug(
'Observers notified for %s of binding %s',
eventType,
binding.key,
);
this.emit('observersNotified', {eventType, binding});
} catch (err) {
this.pendingEvents--;
this.pendingNotifications--;
this._debug('Error caught from observers', err);
// Errors caught from observers. Emit it to the current context.
// If no error listeners are registered, crash the process.
Expand All @@ -235,9 +272,9 @@ export class Context extends EventEmitter {
for (const eventType of eventTypes) {
this.on(eventType, (binding, context) => {
// No need to schedule notifications if no observers are present
if (this.observers.size === 0) return;
if (!this.observers || this.observers.size === 0) return;
// Track pending events
this.pendingEvents++;
this.pendingNotifications++;
// Take a snapshot of current observers to ensure notifications of this
// event will only be sent to current ones. Emit a new event to notify
// current context observers.
Expand All @@ -252,13 +289,14 @@ export class Context extends EventEmitter {
}

/**
* Wait until observers are notified for all of currently pending events.
* Wait until observers are notified for all of currently pending notification
* events.
*
* This method is for test only to perform assertions after observers are
* notified for relevant events.
*/
protected async waitForObserversNotifiedForPendingEvents(timeout?: number) {
const count = this.pendingEvents;
protected async waitUntilPendingNotificationsDone(timeout?: number) {
const count = this.pendingNotifications;
if (count === 0) return;
await pEvent.multiple(this, 'observersNotified', {count, timeout});
}
Expand Down Expand Up @@ -331,6 +369,8 @@ export class Context extends EventEmitter {
* @param observer Context observer instance or function
*/
subscribe(observer: ContextEventObserver): Subscription {
this.observers = this.observers || new Set();
this.setupEventHandlersIfNeeded();
this.observers.add(observer);
return new ContextSubscription(this, observer);
}
Expand All @@ -340,6 +380,7 @@ export class Context extends EventEmitter {
* @param observer Context event observer
*/
unsubscribe(observer: ContextEventObserver): boolean {
if (!this.observers) return false;
return this.observers.delete(observer);
}

Expand All @@ -353,14 +394,21 @@ export class Context extends EventEmitter {
*/
close() {
this._debug('Closing context...');
this.observers.clear();
this.registry.clear();
this.observers = undefined;
if (this.notificationQueue != null) {
// Cancel the notification iterator
// FIXME: p-event bug which does not return a promise for `return`
// tslint:disable-next-line:no-floating-promises
this.notificationQueue.return!(undefined);
this.notificationQueue = undefined;
}
if (this._parent && this._parentEventListeners) {
for (const [event, listener] of this._parentEventListeners) {
this._parent.removeListener(event, listener);
}
this._parentEventListeners = undefined;
}
this.registry.clear();
this._parent = undefined;
}

Expand All @@ -369,6 +417,7 @@ export class Context extends EventEmitter {
* @param observer Context observer
*/
isSubscribed(observer: ContextObserver) {
if (!this.observers) return false;
return this.observers.has(observer);
}

Expand All @@ -389,7 +438,7 @@ export class Context extends EventEmitter {
context: Context,
observers = this.observers,
) {
if (observers.size === 0) return;
if (!observers || observers.size === 0) return;

for (const observer of observers) {
if (typeof observer === 'function') {
Expand Down

0 comments on commit ffda885

Please sign in to comment.