Skip to content

Commit

Permalink
chore(context): address more review comments - 02/05
Browse files Browse the repository at this point in the history
  • Loading branch information
raymondfeng committed Feb 5, 2019
1 parent 5aacda0 commit 9425ee6
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 55 deletions.
68 changes: 54 additions & 14 deletions docs/site/Context.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,23 @@ come and go. There are a few caveats associated with that:
2. It's hard for event listeners to perform asynchronous operations.

To make it easy to support asynchronous event processing, we introduce
`ContextObserver` and corresponding APIs on `Context:
`ContextObserver` and corresponding APIs on `Context`:

1. ContextObserver interface
1. `ContextObserverFn` type and `ContextObserver` interface

```ts
/**
* Listen on `bind`, `unbind`, or other events
* @param eventType Context event type
* @param binding The binding as event source
* @param context Context object for the binding event
*/
export type ContextObserverFn = (
eventType: ContextEventType,
binding: Readonly<Binding<unknown>>,
context: Context,
) => ValueOrPromise<void>;

/**
* Observers of context bind/unbind events
*/
Expand All @@ -308,21 +320,24 @@ export interface ContextObserver {
* @param eventType Context event type
* @param binding The binding as event source
*/
observe(
eventType: ContextEventType,
binding: Readonly<Binding<unknown>>,
context: Context,
): ValueOrPromise<void>;
observe: ContextObserverFn;
}

/**
* Context event observer type - An instance of `ContextObserver` or a function
*/
export type ContextEventObserver = ContextObserver | ContextObserverFn;
```

If `filter` is not required, we can simply use `ContextObserverFn`.

2. Context APIs

- `subscribe(observer: ContextObserver)`
- `subscribe(observer: ContextEventObserver)`

Add a context event observer to the context chain, including its ancestors

- `unsubscribe(observer: ContextObserver)`
- `unsubscribe(observer: ContextEventObserver)`

Remove the context event observer from the context chain

Expand All @@ -334,7 +349,8 @@ export interface ContextObserver {
leak if the child context is to be recycled.

To react on context events asynchronously, we need to implement the
`ContextObserver` interface and register it with the context.
`ContextObserver` interface or provide a `ContextObserverFn` and register it
with the context.

For example:

Expand Down Expand Up @@ -372,7 +388,31 @@ app
// bind: foo-app
```

It's recommended that `ContextObserver` implementations should not throw errors
in `observe` method. Uncaught errors will be reported as `error` events of the
context object. If there is no error listener registered, the node process will
crash.
Please note when an observer subscribes to a context, it will be registered with
all contexts on the chain. In the example above, the observer is added to both
`server` and `app` contexts so that it can be notified when bindings are added
or removed from any of the context on the chain.

- Observers are called in the next turn of
[Promise micro-task queue](https://jsblog.insiderattack.net/promises-next-ticks-and-immediates-nodejs-event-loop-part-3-9226cbe7a6aa)

- When there are multiple async observers registered, they are notified in
series for an event.

- When multiple binding events are emitted in the same event loop tick and there
are async observers registered, such events are queued and observers are
notified by the order of events.

### Observer error handling

It's recommended that `ContextEventObserver` implementations should not throw
errors in their code. Errors thrown by context event observers are reported as
follows over the context chain.

1. Check if the current context object has `error` listeners, if yes, emit an
`error` event on the context and we're done. if not, try its parent context
by repeating step 1.

2. If no context object of the chain has `error` listeners, emit an `error`
event on the current context. As a result, the process exits abnormally. See
https://nodejs.org/api/events.html#events_error_events for more details.
40 changes: 31 additions & 9 deletions packages/context/src/__tests__/unit/context-observer.unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
filterByTag,
} from '../..';

const pEvent = require('p-event');
const setImmediateAsync = promisify(setImmediate);

/**
Expand Down Expand Up @@ -85,7 +86,7 @@ describe('Context', () => {
function givenNonMatchingObserver() {
nonMatchingObserver = {
filter: binding => false,
observe: (event, binding) => {},
observe: () => {},
};
}
});
Expand Down Expand Up @@ -174,13 +175,10 @@ describe('Context', () => {
ctx.bind('foo').to('foo-value');
process.nextTick(() => {
// Register a new observer after 1st event
const anotherObserver: ContextObserver = {
observe: (event, binding, context) => {
const val = binding.getValue(context);
events.push(`LATE:${binding.key}:${val}:${event}`);
},
};
ctx.subscribe(anotherObserver);
ctx.subscribe((event, binding, context) => {
const val = binding.getValue(context);
events.push(`LATE:${binding.key}:${val}:${event}`);
});
});

await ctx.waitUntilObserversNotified();
Expand Down Expand Up @@ -208,7 +206,7 @@ describe('Context', () => {
// An observer does not match the criteria
const nonMatchingObserver: ContextObserver = {
filter: binding => false,
observe: (event, binding) => {
observe: () => {
nonMatchingObserverCalled = true;
},
};
Expand Down Expand Up @@ -270,6 +268,30 @@ describe('Context', () => {
expect(await getControllers()).to.eql(['3']);
});

it('reports error on current context if an observer fails', async () => {
const err = new Error('something wrong');
server.subscribe((event, binding) => {
if (binding.key === 'bar') {
return Promise.reject(err);
}
});
server.bind('bar').to('bar-value');
const obj = await pEvent(server, 'error');
expect(obj).to.equal(err);
});

it('reports error on parent context if an observer fails', async () => {
const err = new Error('something wrong');
server.subscribe((event, binding) => {
if (binding.key === 'bar') {
return Promise.reject(err);
}
});
server.bind('bar').to('bar-value');
const obj = await pEvent(app, 'error');
expect(obj).to.equal(err);
});

class MyObserverForControllers implements ContextObserver {
controllers: Set<string> = new Set();
filter = filterByTag('controller');
Expand Down
23 changes: 18 additions & 5 deletions packages/context/src/context-observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ import {Context} from './context';
*/
export type ContextEventType = 'bind' | 'unbind' | string;

/**
* Listen on `bind`, `unbind`, or other events
* @param eventType Context event type
* @param binding The binding as event source
* @param context Context object for the binding event
*/
export type ContextObserverFn = (
eventType: ContextEventType,
binding: Readonly<Binding<unknown>>,
context: Context,
) => ValueOrPromise<void>;

/**
* Observers of context bind/unbind events
*/
Expand All @@ -29,13 +41,14 @@ export interface ContextObserver {
* @param eventType Context event type
* @param binding The binding as event source
*/
observe(
eventType: ContextEventType,
binding: Readonly<Binding<unknown>>,
context: Context,
): ValueOrPromise<void>;
observe: ContextObserverFn;
}

/**
* Context event observer type - An instance of `ContextObserver` or a function
*/
export type ContextEventObserver = ContextObserver | ContextObserverFn;

/**
* Subscription of context events. It's modeled after
* https://github.com/tc39/proposal-observable.
Expand Down
70 changes: 43 additions & 27 deletions packages/context/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {Binding, BindingTag} from './binding';
import {BindingFilter, filterByKey, filterByTag} from './binding-filter';
import {BindingAddress, BindingKey} from './binding-key';
import {
ContextEventObserver,
ContextEventType,
ContextObserver,
Subscription,
Expand Down Expand Up @@ -54,7 +55,7 @@ export class Context extends EventEmitter {
/**
* A list of registered context observers
*/
protected readonly observers: Set<ContextObserver> = new Set();
protected readonly observers: Set<ContextEventObserver> = new Set();

/**
* Internal counter for pending events which observers have not processed yet
Expand Down Expand Up @@ -100,14 +101,14 @@ export class Context extends EventEmitter {
* @param args
*/
// tslint:disable-next-line:no-any
private _debug(formatter: any, ...args: any[]) {
private _debug(...args: any[]) {
/* istanbul ignore if */
if (debug.enabled) {
if (typeof formatter === 'string') {
debug(`[%s] ${formatter}`, this.name, ...args);
} else {
debug('[%s] ', this.name, formatter, ...args);
}
if (!debug.enabled) return;
const formatter = args.shift();
if (typeof formatter === 'string') {
debug(`[%s] ${formatter}`, this.name, ...args);
} else {
debug('[%s] ', this.name, formatter, ...args);
}
}

Expand All @@ -116,18 +117,32 @@ export class Context extends EventEmitter {
* upon `bind` and `unbind` events
*/
private setupEventHandlers() {
// tslint:disable-next-line:no-any
const notificationErrorHandler = (...args: any[]) => {
// Catch error to avoid lint violations
this._debug(...args);
// Bubbling up the error event over the context chain
// until we find an error listener
let ctx: Context | undefined = this;
while (ctx) {
if (ctx.listenerCount('error')) {
this._debug('Emitting error to context %s', ctx.name);
ctx.emit('error', ...args);
break;
}
ctx = ctx._parent;
}
if (ctx === undefined) {
this._debug('No error handler is configured for the context chain');
// Let it crash now by emitting an error event
this.emit('error', ...args);
}
};

// The following are two async functions. Returned promises are ignored as
// they are long-running background tasks.
this.startNotificationTask('bind').catch(err => {
// Catch error to avoid lint violations
debug(err);
this.emit('error', err);
});
this.startNotificationTask('unbind').catch(err => {
// Catch error to avoid lint violations
debug(err);
this.emit('error', err);
});
this.startNotificationTask('bind').catch(notificationErrorHandler);
this.startNotificationTask('unbind').catch(notificationErrorHandler);
}

/**
Expand All @@ -153,14 +168,13 @@ export class Context extends EventEmitter {
// Create an async iterator from the given event type
const events: AsyncIterable<{
binding: Readonly<Binding<unknown>>;
observers: Set<ContextObserver>;
observers: Set<ContextEventObserver>;
}> = pEvent.iterator(this, notificationEvent);
for await (const event of events) {
for await (const {binding, observers} of events) {
// The loop will happen asynchronously upon events
try {
// The execution of observers happen in the Promise micro-task queue
const binding = event.binding;
await this.notifyObservers(eventType, binding, event.observers);
await this.notifyObservers(eventType, binding, observers);
this.pendingEvents--;
this._debug(
'Observers notified for %s of binding %s',
Expand Down Expand Up @@ -255,9 +269,9 @@ export class Context extends EventEmitter {

/**
* Add a context event observer to the context chain, including its ancestors
* @param observer Context event observer
* @param observer Context observer instance or function
*/
subscribe(observer: ContextObserver): Subscription {
subscribe(observer: ContextEventObserver): Subscription {
let ctx: Context | undefined = this;
while (ctx != null) {
ctx.observers.add(observer);
Expand All @@ -270,7 +284,7 @@ export class Context extends EventEmitter {
* Remove the context event observer from the context chain
* @param observer Context event observer
*/
unsubscribe(observer: ContextObserver) {
unsubscribe(observer: ContextEventObserver) {
let ctx: Context | undefined = this;
while (ctx != null) {
ctx.observers.delete(observer);
Expand Down Expand Up @@ -321,7 +335,9 @@ export class Context extends EventEmitter {
if (observers.size === 0) return;

for (const observer of observers) {
if (!observer.filter || observer.filter(binding)) {
if (typeof observer === 'function') {
await observer(eventType, binding, this);
} else if (!observer.filter || observer.filter(binding)) {
await observer.observe(eventType, binding, this);
}
}
Expand Down Expand Up @@ -678,7 +694,7 @@ export class Context extends EventEmitter {
class ContextSubscription implements Subscription {
constructor(
protected context: Context,
protected observer: ContextObserver,
protected observer: ContextEventObserver,
) {}

private _closed = false;
Expand Down

0 comments on commit 9425ee6

Please sign in to comment.