Skip to content

Commit

Permalink
chore(context): use a consolidated notification queue
Browse files Browse the repository at this point in the history
  • Loading branch information
raymondfeng committed Feb 10, 2019
1 parent 4828bb6 commit 6ee3837
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ describe('Context', () => {
'SYNC:foo:foo-value:bind',
'ASYNC:foo:foo-value:bind',
'SYNC:foo:foo-value:unbind',
'SYNC:foo:new-foo-value:bind',
'ASYNC:foo:foo-value:unbind',
'SYNC:foo:new-foo-value:bind',
'ASYNC:foo:new-foo-value:bind',
]);
expect(nonMatchingObserverCalled).to.be.false();
Expand Down
22 changes: 22 additions & 0 deletions packages/context/src/context-observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,25 @@ export interface Subscription {
*/
closed: boolean;
}

/**
* Event data for observer notifications
*/
export type Notification = {
/**
* Context event type - bind/unbind
*/
eventType: ContextEventType;
/**
* Binding added/removed
*/
binding: Readonly<Binding<unknown>>;
/**
* Owner context for the binding
*/
context: Context;
/**
* A snapshot of observers when the original event is emitted
*/
observers: Set<ContextEventObserver>;
};
63 changes: 35 additions & 28 deletions packages/context/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
ContextEventObserver,
ContextEventType,
ContextObserver,
Notification,
Subscription,
} from './context-observer';
import {ResolutionOptions, ResolutionSession} from './resolution-session';
Expand Down Expand Up @@ -152,8 +153,7 @@ export class Context extends EventEmitter {

// The following are two async functions. Returned promises are ignored as
// they are long-running background tasks.
this.startNotificationTask('bind').catch(notificationErrorHandler);
this.startNotificationTask('unbind').catch(notificationErrorHandler);
this.startNotificationTask().catch(notificationErrorHandler);
}

/**
Expand Down Expand Up @@ -197,32 +197,14 @@ export class Context extends EventEmitter {

/**
* Start a background task to listen on context events and notify observers
* @param eventType Context event type
*/
private async startNotificationTask(eventType: ContextEventType) {
const notificationEvent = `${eventType}-notification`;
this.on(eventType, (binding, context) => {
// No need to schedule notifications if no observers are present
if (this.observers.size === 0) return;
// Track pending events
this.pendingEvents++;
// Take a snapshot of current observers to ensure notifications of this
// event will only be sent to current ones. Emit a new event to notify
// current context observers.
this.emit(notificationEvent, {
binding,
context,
observers: new Set(this.observers),
});
});
// FIXME(rfeng): p-event should allow multiple event types in an iterator.
// Create an async iterator from the given event type
const events: AsyncIterable<{
binding: Readonly<Binding<unknown>>;
context: Context;
observers: Set<ContextEventObserver>;
}> = pEvent.iterator(this, notificationEvent);
for await (const {binding, context, observers} of events) {
*/
private async startNotificationTask() {
this.setupNotification('bind', 'unbind');
const events: AsyncIterable<Notification> = pEvent.iterator(
this,
'notification',
);
for await (const {eventType, binding, context, observers} of events) {
// The loop will happen asynchronously upon events
try {
// The execution of observers happen in the Promise micro-task queue
Expand All @@ -244,6 +226,31 @@ export class Context extends EventEmitter {
}
}

/**
* Listen on given event types and emit `notification` event. This method
* merge multiple event types into one for notification.
* @param eventTypes Context event types
*/
private setupNotification(...eventTypes: ContextEventType[]) {
for (const eventType of eventTypes) {
this.on(eventType, (binding, context) => {
// No need to schedule notifications if no observers are present
if (this.observers.size === 0) return;
// Track pending events
this.pendingEvents++;
// Take a snapshot of current observers to ensure notifications of this
// event will only be sent to current ones. Emit a new event to notify
// current context observers.
this.emit('notification', {
eventType,
binding,
context,
observers: new Set(this.observers),
});
});
}
}

/**
* Wait until observers are notified for all of currently pending events.
*
Expand Down

0 comments on commit 6ee3837

Please sign in to comment.