Skip to content
This repository has been archived by the owner on Feb 22, 2018. It is now read-only.

Commit

Permalink
feat(core, testability): PendingAsync service
Browse files Browse the repository at this point in the history
A new PendingAsync service that is used to track pending async
operations in Angular.  VmTurnZone ties in to this service to track
timers as async tasks and correctly handles canceled timers.  Http
registers async tasks for network requests.  The Testability API uses
the new API to provide a much more useful whenStable implementation.
  • Loading branch information
chirayuk committed Sep 30, 2014
1 parent 00960bb commit 1d29b79
Show file tree
Hide file tree
Showing 12 changed files with 264 additions and 10 deletions.
1 change: 1 addition & 0 deletions lib/core/module.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export "package:angular/core_dom/module_internal.dart" show
NgElement,
NoOpAnimation,
NullTreeSanitizer,
PendingAsync,
Animate,
RequestErrorInterceptor,
RequestInterceptor,
Expand Down
3 changes: 3 additions & 0 deletions lib/core/module_internal.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export 'package:angular/core/formatter.dart';
import 'package:angular/core/parser/utils.dart';
import 'package:angular/core/registry.dart';
import 'package:angular/core/static_keys.dart';
import 'package:angular/core/pending_async.dart';
export 'package:angular/core/pending_async.dart';

part "exception_handler.dart";
part "interpolate.dart";
Expand All @@ -41,6 +43,7 @@ class CoreModule extends Module {
bind(FormatterMap);
bind(Interpolate);
bind(RootScope);
bind(PendingAsync);
bind(Scope, toInstanceOf: RootScope);
bind(ClosureMap, toFactory: () => throw "Must provide dynamic/static ClosureMap.");
bind(ScopeStats);
Expand Down
68 changes: 68 additions & 0 deletions lib/core/pending_async.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
library angular.core.pending_async;

import 'dart:async';
import 'package:di/annotations.dart';

typedef void WhenStableCallback();

/**
* Tracks pending operations and notifies when they are all complete.
*/
@Injectable()
class PendingAsync {
/// a count of the number of pending async operations.
int _numPending = 0;
List<WhenStableCallback> _callbacks;

/**
* A count of the number of tracked pending async operations.
*/
int get numPending => _numPending;

/**
* Register a callback to be called synchronously when the number of tracked
* pending async operations reaches a count of zero from a non-zero count.
*/
void whenStable(WhenStableCallback cb) {
if (_numPending == 0) {
cb();
return;
}
if (_callbacks == null) {
_callbacks = <WhenStableCallback>[cb];
} else {
_callbacks.add(cb);
}
}

/**
* Increase the counter of the number of tracked pending operations. Returns
* the new count of the number of tracked pending operations.
*/
int increaseCount([int delta = 1]) {
if (delta == 0) {
return _numPending;
}
_numPending += delta;
if (_numPending < 0) {
throw "Attempting to reduce pending async count below zero.";
} else if (_numPending == 0) {
_runAllCallbacks();
}
return _numPending;
}

/**
* Decrease the counter of the number of tracked pending operations. Returns
* the new count of the number of tracked pending operations.
*/
int decreaseCount([int delta = 1]) => increaseCount(-delta);

void _runAllCallbacks() {
while (_callbacks != null) {
var callbacks = _callbacks;
_callbacks = null;
callbacks.forEach((fn) { fn(); });
}
}
}
20 changes: 18 additions & 2 deletions lib/core/scope.dart
Original file line number Diff line number Diff line change
Expand Up @@ -750,9 +750,11 @@ class RootScope extends Scope {
*/
String get state => _state;

PendingAsync _pendingAsync;

RootScope(Object context, Parser parser, ASTParser astParser, FieldGetterFactory fieldGetterFactory,
FormatterMap formatters, this._exceptionHandler, this._ttl, this._zone,
ScopeStats _scopeStats, CacheRegister cacheRegister)
ScopeStats _scopeStats, CacheRegister cacheRegister, this._pendingAsync)
: _scopeStats = _scopeStats,
_parser = parser,
_astParser = astParser,
Expand All @@ -764,7 +766,19 @@ class RootScope extends Scope {
'',
_scopeStats)
{
_zone.onTurnDone = apply;
_zone.countPendingAsync = _pendingAsync.increaseCount;
_zone.onTurnDone = () {
// NOTE: Ideally, we would just set _zone.onTurnStart = _pendingAsync.increaseCount.
// However, when the RootScope is constructed, we would have already executed the
// nop onTurnStart causing a count mismatch. While we could adjust for it, our
// test set doesn't really enter/leave the VmTurnZone. So for simplicity, we do the
// increaseCount here.
_pendingAsync.increaseCount();
apply();
_pendingAsync.decreaseCount();
_runAsyncFns(); // if any were scheduled by _pendingAsync.whenStable callbacks.
};

_zone.onError = (e, s, ls) => _exceptionHandler(e, s);
_zone.onScheduleMicrotask = runAsync;
cacheRegister.registerCache("ScopeWatchASTs", astCache);
Expand Down Expand Up @@ -900,6 +914,7 @@ class RootScope extends Scope {
if (_state == STATE_FLUSH_ASSERT) {
throw "Scheduling microtasks not allowed in $state state.";
}
_pendingAsync.increaseCount();
var chain = new _FunctionChain(fn);
if (_runAsyncHead == null) {
_runAsyncHead = _runAsyncTail = chain;
Expand All @@ -918,6 +933,7 @@ class RootScope extends Scope {
} catch (e, s) {
_exceptionHandler(e, s);
}
_pendingAsync.decreaseCount();
_runAsyncHead = _runAsyncHead._next;
}
_runAsyncTail = null;
Expand Down
41 changes: 41 additions & 0 deletions lib/core/zone.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ part of angular.core_internal;
*/
typedef void ZoneOnTurnDone();

typedef void CountPendingAsync(int count);

/**
* Handles a [VmTurnZone] onTurnDone event.
*/
Expand Down Expand Up @@ -39,6 +41,7 @@ class LongStackTrace {
}
}


/**
* A [Zone] wrapper that lets you schedule tasks after its private microtask
* queue is exhausted but before the next "turn", i.e. event loop iteration.
Expand Down Expand Up @@ -75,12 +78,14 @@ class VmTurnZone {
run: _onRun,
runUnary: _onRunUnary,
scheduleMicrotask: _onScheduleMicrotask,
createTimer: _onCreateTimer,
handleUncaughtError: _uncaughtError
));
onError = _defaultOnError;
onTurnDone = _defaultOnTurnDone;
onTurnStart = _defaultOnTurnStart;
onScheduleMicrotask = _defaultOnScheduleMicrotask;
countPendingAsync = _defaultCountPendingAsync;
}

List _asyncQueue = [];
Expand Down Expand Up @@ -126,6 +131,15 @@ class VmTurnZone {
}
}

async.Timer _onCreateTimer(async.Zone self, async.ZoneDelegate delegate, async.Zone zone, Duration duration, fn()) {
var s = traceEnter(VmTurnZone_createTimer);
try {
return new _WrappedTimer(this, delegate, zone, duration, fn);
} finally {
traceLeave(s);
}
}

void _uncaughtError(async.Zone self, async.ZoneDelegate delegate, async.Zone zone,
e, StackTrace s) {
if (!_errorThrownFromOnRun) onError(e, s, _longStacktrace);
Expand Down Expand Up @@ -199,7 +213,9 @@ class VmTurnZone {
* the turn will _never_ end.
*/
ZoneOnTurnDone onTurnDone;
CountPendingAsync countPendingAsync;
void _defaultOnTurnDone() => null;
void _defaultCountPendingAsync(int count) => null;

/**
* Called any time a microtask is scheduled. If you override [onScheduleMicrotask], you
Expand Down Expand Up @@ -276,3 +292,28 @@ class VmTurnZone {
assertInTurn();
}
}


// Automatically adjusts the pending async task count when the timer is
// scheduled, canceled or fired.
class _WrappedTimer implements async.Timer {
async.Timer _realTimer;
VmTurnZone _vmTurnZone;

_WrappedTimer(this._vmTurnZone, async.ZoneDelegate delegate, async.Zone zone, Duration duration, Function fn()) {
_vmTurnZone.countPendingAsync(1);
_realTimer = delegate.createTimer(zone, duration, () {
fn();
_vmTurnZone.countPendingAsync(-1);
});
}

bool get isActive => _realTimer.isActive;

void cancel() {
if (_realTimer.isActive) {
_vmTurnZone.countPendingAsync(-1);
}
_realTimer.cancel();
}
}
20 changes: 16 additions & 4 deletions lib/core_dom/http.dart
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ class Http {
final RootScope _rootScope;
final HttpConfig _httpConfig;
final VmTurnZone _zone;
final PendingAsync _pendingAsync;

final _responseQueue = <Function>[];
async.Timer _responseQueueTimer;
Expand All @@ -396,7 +397,7 @@ class Http {
* Constructor, useful for DI.
*/
Http(this._cookies, this._location, this._rewriter, this._backend, this.defaults,
this._interceptors, this._rootScope, this._httpConfig, this._zone);
this._interceptors, this._rootScope, this._httpConfig, this._zone, this._pendingAsync);

/**
* Parse a [requestUrl] and determine whether this is a same-origin request as
Expand Down Expand Up @@ -494,14 +495,25 @@ class Http {
return new async.Future.value(new HttpResponse.copy(cachedResponse));
}

requestFromBackend(runCoalesced, onComplete, onError) => _backend.request(
requestFromBackend(runCoalesced, onComplete, onError) {
var request = _backend.request(
url,
method: method,
requestHeaders: config.headers,
sendData: config.data,
withCredentials: withCredentials
).then((dom.HttpRequest req) => _onResponse(req, runCoalesced, onComplete, config, cache, url),
onError: (e) => _onError(e, runCoalesced, onError, config, url));
);
_pendingAsync.increaseCount();
return request.then((dom.HttpRequest req) {
_pendingAsync.decreaseCount();
return _onResponse(req, runCoalesced, onComplete, config, cache, url);
},
onError: (e) {
_pendingAsync.decreaseCount();
return _onError(e, runCoalesced, onError, config, url);
});
return request;
}

async.Future responseFuture;
if (_httpConfig.coalesceDuration != null) {
Expand Down
9 changes: 6 additions & 3 deletions lib/introspection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,15 @@ typedef List<String> _GetExpressionsFromProbe(ElementProbe probe);
class _Testability implements _JsObjectProxyable {
final dom.Node node;
final ElementProbe probe;
final PendingAsync _pendingAsync;

_Testability(this.node, this.probe);
_Testability(dom.Node node, ElementProbe probe):
node = node,
probe = probe,
_pendingAsync = probe.injector.get(PendingAsync);

whenStable(callback) {
(probe.injector.get(VmTurnZone) as VmTurnZone).run(
() => new async.Timer(Duration.ZERO, callback));
_pendingAsync.whenStable(callback);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion lib/mock/module.dart
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ part 'mock_cache_register.dart';
* - [Logger]
* - [RethrowExceptionHandler] instead of [ExceptionHandler]
* - [VmTurnZone] which displays errors to console;
* - [MockCacheRegister
* - [MockCacheRegister]
*/
class AngularMockModule extends Module {
AngularMockModule() {
Expand Down
12 changes: 12 additions & 0 deletions lib/mock/zone.dart
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,15 @@ class _TimerSpec implements dart_async.Timer {
isActive = false;
}
}


class MockZone {
MockZone._internal();

MockZone get current => Zone.current['AngularMockZone'];

static Zone fork(Zone zone) {
MockZone mockZone = new MockZone._internal();
return zone.fork(zoneValues: { 'AngularMockZone': mockZone });
}
}
8 changes: 8 additions & 0 deletions lib/ng_tracing_scopes.dart
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ final VmTurnZone_run = traceCreateScope('VmTurnZone#run()');
final VmTurnZone_scheduleMicrotask = traceCreateScope('VmTurnZone#scheduleMicrotask()');


/**
* Name: `VmTurnZone#createTimer()`
*
* Designates where new timers are scheduled.
*/
final VmTurnZone_createTimer = traceCreateScope('VmTurnZone#createTimer()');


/**
* Name: `Compiler#compile()`
*
Expand Down
Loading

0 comments on commit 1d29b79

Please sign in to comment.