Skip to content

Commit

Permalink
[ui/utils] share sync subscribe logic (#23341)
Browse files Browse the repository at this point in the history
In `ui/public/config/config.js` we have subscription logic that delivers values from an observable synchronously and also ensuring that values are received within a digest cycle. I need to do the same in #23217 but want to keep as few checks for `$rootScope.$$phase` as possible, so I broke the subscription logic into a shared util.

In order to make the utility a little more helpful it will also trigger fatal errors if an observable errors without having an error handler, or if `observer.next()`, `observer.error()`, or `observer.complete()` throw, which would normally be swallowed by RxJS.
  • Loading branch information
Spencer authored Oct 17, 2018
1 parent aa85072 commit c0e13fd
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 12 deletions.
16 changes: 4 additions & 12 deletions src/ui/public/config/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import angular from 'angular';
import chrome from '../chrome';
import { isPlainObject } from 'lodash';
import { uiModules } from '../modules';
import { subscribeWithScope } from '../utils/subscribe_with_scope';

const module = uiModules.get('kibana/config');

Expand Down Expand Up @@ -59,20 +60,11 @@ module.service(`config`, function ($rootScope, Promise) {
//* angular specific methods *
//////////////////////////////

const subscription = uiSettings.getUpdate$().subscribe(({ key, newValue, oldValue }) => {
const emit = () => {
const subscription = subscribeWithScope($rootScope, uiSettings.getUpdate$(), {
next: ({ key, newValue, oldValue }) => {
$rootScope.$broadcast('change:config', newValue, oldValue, key, this);
$rootScope.$broadcast(`change:config.${key}`, newValue, oldValue, key, this);
};

// this is terrible, but necessary to emulate the same API
// that the `config` service had before where changes were
// emitted to scopes synchronously. All methods that don't
// require knowing if we are currently in a digest cycle are
// async and would deliver events too late for several usecases
//
// If you copy this code elsewhere you better have a good reason :)
$rootScope.$$phase ? emit() : $rootScope.$apply(emit);
}
});
$rootScope.$on('$destroy', () => subscription.unsubscribe());

Expand Down
154 changes: 154 additions & 0 deletions src/ui/public/utils/subscribe_with_scope.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const mockFatalError = jest.fn();
jest.mock('ui/notify/fatal_error', () => ({
fatalError: mockFatalError,
}));

import * as Rx from 'rxjs';
import { subscribeWithScope } from './subscribe_with_scope';

let $rootScope: Scope;

class Scope {
public $$phase?: string;
public $root = $rootScope;
public $apply = jest.fn((fn: () => void) => fn());
}

$rootScope = new Scope();

afterEach(() => {
jest.clearAllMocks();
});

it('subscribes to the passed observable, returns subscription', () => {
const $scope = new Scope();

const unsubSpy = jest.fn();
const subSpy = jest.fn(() => unsubSpy);
const observable = new Rx.Observable(subSpy);

const subscription = subscribeWithScope($scope as any, observable);
expect(subSpy).toHaveBeenCalledTimes(1);
expect(unsubSpy).not.toHaveBeenCalled();

subscription.unsubscribe();

expect(subSpy).toHaveBeenCalledTimes(1);
expect(unsubSpy).toHaveBeenCalledTimes(1);
});

it('calls observer.next() if already in a digest cycle, wraps in $scope.$apply if not', () => {
const subject = new Rx.Subject();
const nextSpy = jest.fn();
const $scope = new Scope();

subscribeWithScope($scope as any, subject, { next: nextSpy });

subject.next();
expect($scope.$apply).toHaveBeenCalledTimes(1);
expect(nextSpy).toHaveBeenCalledTimes(1);

jest.clearAllMocks();

$rootScope.$$phase = '$digest';
subject.next();
expect($scope.$apply).not.toHaveBeenCalled();
expect(nextSpy).toHaveBeenCalledTimes(1);
});

it('reports fatalError if observer.next() throws', () => {
const $scope = new Scope();
subscribeWithScope($scope as any, Rx.of(undefined), {
next() {
throw new Error('foo bar');
},
});

expect(mockFatalError.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
[Error: foo bar],
],
]
`);
});

it('reports fatal error if observer.error is not defined and observable errors', () => {
const $scope = new Scope();
const error = new Error('foo');
error.stack = `${error.message}\n---stack trace ---`;
subscribeWithScope($scope as any, Rx.throwError(error));

expect(mockFatalError.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
[Error: Uncaught error in subscribeWithScope(): foo
---stack trace ---],
],
]
`);
});

it('reports fatal error if observer.error throws', () => {
const $scope = new Scope();
subscribeWithScope($scope as any, Rx.throwError(new Error('foo')), {
error: () => {
throw new Error('foo');
},
});

expect(mockFatalError.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
[Error: foo],
],
]
`);
});

it('does not report fatal error if observer.error handles the error', () => {
const $scope = new Scope();
subscribeWithScope($scope as any, Rx.throwError(new Error('foo')), {
error: () => {
// noop, swallow error
},
});

expect(mockFatalError.mock.calls).toEqual([]);
});

it('reports fatal error if observer.complete throws', () => {
const $scope = new Scope();
subscribeWithScope($scope as any, Rx.EMPTY, {
complete: () => {
throw new Error('foo');
},
});

expect(mockFatalError.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
[Error: foo],
],
]
`);
});
76 changes: 76 additions & 0 deletions src/ui/public/utils/subscribe_with_scope.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { IScope } from 'angular';
import * as Rx from 'rxjs';
import { fatalError } from 'ui/notify/fatal_error';

function callInDigest<T extends any[]>($scope: IScope, fn: (...args: T) => void, ...args: T) {
try {
// this is terrible, but necessary to synchronously deliver subscription values
// to angular scopes. This is required by some APIs, like the `config` service,
// and beneficial for root level directives where additional digest cycles make
// kibana sluggish to load.
//
// If you copy this code elsewhere you better have a good reason :)
if ($scope.$root.$$phase) {
fn(...args);
} else {
$scope.$apply(() => fn(...args));
}
} catch (error) {
fatalError(error);
}
}

/**
* Subscribe to an observable at a $scope, ensuring that the digest cycle
* is run for subscriber hooks and routing errors to fatalError if not handled.
*/
export function subscribeWithScope<T>(
$scope: IScope,
observable: Rx.Observable<T>,
observer?: Rx.PartialObserver<T>
) {
return observable.subscribe({
next(value) {
if (observer && observer.next) {
callInDigest($scope, observer.next, value);
}
},
error(error) {
callInDigest($scope, () => {
if (observer && observer.error) {
observer.error(error);
} else {
throw new Error(
`Uncaught error in subscribeWithScope(): ${
error ? error.stack || error.message : error
}`
);
}
});
},
complete() {
if (observer && observer.complete) {
callInDigest($scope, observer.complete);
}
},
});
}

0 comments on commit c0e13fd

Please sign in to comment.