Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

Commit

Permalink
begin to apply observable methods, 1: bindCallback
Browse files Browse the repository at this point in the history
  • Loading branch information
JiaLiPassion committed Jul 19, 2017
1 parent f56c213 commit b238b09
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 195 deletions.
277 changes: 157 additions & 120 deletions lib/rxjs/rxjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,17 @@ declare let define: any;
const completeSource = 'rxjs.Subscriber.complete';
const unsubscribeSource = 'rxjs.Subscriber.unsubscribe';

const Observable = Rx.Observable;

// monkey-patch Observable to save the
// current zone as ConstructorZone
Rx.Observable = function() {
Observable.apply(this, arguments);
this._zone = Zone.current;

const patchObservableInstance = function(observable: any) {
observable._zone = Zone.current;
// patch inner function this._subscribe to check
// SubscriptionZone is same with ConstuctorZone or not
if (this._subscribe && typeof this._subscribe === 'function' && !this._originalSubscribe) {
this._originalSubscribe = this._subscribe;
this._subscribe = _patchedSubscribe;
if (observable._subscribe && typeof observable._subscribe === 'function' &&
!observable._originalSubscribe) {
observable._originalSubscribe = observable._subscribe;
observable._subscribe = _patchedSubscribe;
}

return this;
};

Rx.Observable.prototype = Observable.prototype;

const subscribe = Observable.prototype.subscribe;
const lift = Observable.prototype.lift;
const create = Observable.create;

const _patchedSubscribe = function() {
const currentZone = Zone.current;
const _zone = this._zone;
Expand Down Expand Up @@ -84,124 +71,174 @@ declare let define: any;
return tearDownLogic;
};

// patch Observable.prototype.subscribe
// if SubscripitionZone is different with ConstructorZone
// we should run _subscribe in ConstructorZone and
// create sinke in SubscriptionZone,
// and tearDown should also run into ConstructorZone
Observable.prototype.subscribe = function() {
const _zone = this._zone;
const currentZone = Zone.current;
const patchObservable = function(Rx: any, observableType: string) {
const symbol = Zone.__symbol__(observableType);
if (Rx[symbol]) {
// has patched
return;
}

const Observable = Rx[symbol] = Rx[observableType];
if (!Observable) {
// the subclass of Observable not loaded
return;
}

// if operator is involved, we should also
// patch the call method to save the Subscription zone
if (this.operator && _zone && _zone !== currentZone) {
const call = this.operator.call;
this.operator.call = function() {
const args = Array.prototype.slice.call(arguments);
const subscriber = args.length > 0 ? args[0] : undefined;
if (!subscriber._zone) {
subscriber._zone = currentZone;
// monkey-patch Observable to save the
// current zone as ConstructorZone
const patchedObservable: any = Rx[observableType] = function() {
Observable.apply(this, arguments);
patchObservableInstance(this);
return this;
};

patchedObservable.prototype = Observable.prototype;
Object.keys(Observable).forEach(key => {
patchedObservable[key] = Observable[key];
});

const ObservablePrototype: any = Observable.prototype;
const symbolSubscribe = Zone.__symbol__('subscribe');

if (!ObservablePrototype[symbolSubscribe]) {
const subscribe = ObservablePrototype[symbolSubscribe] = ObservablePrototype.subscribe;
// patch Observable.prototype.subscribe
// if SubscripitionZone is different with ConstructorZone
// we should run _subscribe in ConstructorZone and
// create sinke in SubscriptionZone,
// and tearDown should also run into ConstructorZone
Observable.prototype.subscribe = function() {
const _zone = this._zone;
const currentZone = Zone.current;

// if operator is involved, we should also
// patch the call method to save the Subscription zone
if (this.operator && _zone && _zone !== currentZone) {
const call = this.operator.call;
this.operator.call = function() {
const args = Array.prototype.slice.call(arguments);
const subscriber = args.length > 0 ? args[0] : undefined;
if (!subscriber._zone) {
subscriber._zone = currentZone;
}
return _zone.run(call, this, args);
};
}
return _zone.run(call, this, args);
const result = subscribe.apply(this, arguments);
// the result is the subscriber sink,
// we save the current Zone here
result._zone = currentZone;
return result;
};
}
const result = subscribe.apply(this, arguments);
// the result is the subscriber sink,
// we save the current Zone here
result._zone = currentZone;
return result;
};

// patch lift method to save ConstructorZone of Observable
Observable.prototype.lift = function() {
const observable = lift.apply(this, arguments);
observable._zone = Zone.current;
// patch inner function this._subscribe to check
// SubscriptionZone is same with ConstuctorZone or not
if (observable._subscribe && typeof observable._subscribe === 'function' &&
!observable._originalSubscribe) {
observable._originalSubscribe = observable._subscribe;
observable._subscribe = _patchedSubscribe;
}
const symbolLift = Zone.__symbol__('lift');
if (!ObservablePrototype[symbolLift]) {
const lift = ObservablePrototype[symbolLift] = ObservablePrototype.lift;

return observable;
};
// patch lift method to save ConstructorZone of Observable
Observable.prototype.lift = function() {
const observable = lift.apply(this, arguments);
patchObservableInstance(observable);

// patch create method to save ConstructorZone of Observable
Rx.Observable.create = function() {
const observable = create.apply(this, arguments);
observable._zone = Zone.current;
// patch inner function this._subscribe to check
// SubscriptionZone is same with ConstuctorZone or not
if (observable._subscribe && typeof observable._subscribe === 'function' &&
!observable._originalSubscribe) {
observable._originalSubscribe = observable._subscribe;
observable._subscribe = _patchedSubscribe;
return observable;
};
}

return observable;
};

const Subscriber = Rx.Subscriber;

const next = Subscriber.prototype.next;
const error = Subscriber.prototype.error;
const complete = Subscriber.prototype.complete;
const unsubscribe = Subscriber.prototype.unsubscribe;
const symbolCreate = Zone.__symbol__('create');
if (!Observable[symbolCreate]) {
const create = Observable[symbolCreate] = Observable.create;
// patch create method to save ConstructorZone of Observable
Rx.Observable.create = function() {
const observable = create.apply(this, arguments);
patchObservableInstance(observable);

// patch Subscriber.next to make sure it run
// into SubscriptionZone
Subscriber.prototype.next = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(next, this, arguments, nextSource);
} else {
return next.apply(this, arguments);
return observable;
};
}
};

Subscriber.prototype.error = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(error, this, arguments, nextSource);
} else {
return error.apply(this, arguments);
}
const patchSubscriber = function() {
const Subscriber = Rx.Subscriber;

const next = Subscriber.prototype.next;
const error = Subscriber.prototype.error;
const complete = Subscriber.prototype.complete;
const unsubscribe = Subscriber.prototype.unsubscribe;

// patch Subscriber.next to make sure it run
// into SubscriptionZone
Subscriber.prototype.next = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(next, this, arguments, nextSource);
} else {
return next.apply(this, arguments);
}
};

Subscriber.prototype.error = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(error, this, arguments, nextSource);
} else {
return error.apply(this, arguments);
}
};

Subscriber.prototype.complete = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(complete, this, arguments, nextSource);
} else {
return complete.apply(this, arguments);
}
};

Subscriber.prototype.unsubscribe = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(unsubscribe, this, arguments, nextSource);
} else {
return unsubscribe.apply(this, arguments);
}
};
};

Subscriber.prototype.complete = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(complete, this, arguments, nextSource);
} else {
return complete.apply(this, arguments);
const patchObservableFactoryCreator = function(obj: any, factoryName: string) {
const symbol = Zone.__symbol__(factoryName);
if (obj[symbol]) {
return;
}
const factoryCreator: any = obj[symbol] = obj[factoryName];
obj[factoryName] = function() {
const factory: any = factoryCreator.apply(this, arguments);
return function() {
const observable = factory.apply(this, arguments);
patchObservableInstance(observable);
return observable;
};
};
};

Subscriber.prototype.unsubscribe = function() {
const currentZone = Zone.current;
const subscriptionZone = this._zone;

// for performance concern, check Zone.current
// equal with this._zone(SubscriptionZone) or not
if (subscriptionZone && subscriptionZone !== currentZone) {
return subscriptionZone.run(unsubscribe, this, arguments, nextSource);
} else {
return unsubscribe.apply(this, arguments);
}
};
patchObservable(Rx, 'Observable');
patchSubscriber();
patchObservableFactoryCreator(Rx.Observable, 'bindCallback');
});
}));
3 changes: 2 additions & 1 deletion test/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ declare const __karma__: {
__karma__.loaded = function() {};
(window as any).global = window;

System.config({defaultJSExtensions: true, map: {'rxjs': 'base/node_modules/rxjs/bundles/Rx.js'}});
System.config(
{defaultJSExtensions: true, map: {'rxjs/Rx': 'base/node_modules/rxjs/bundles/Rx.js'}});
let browserPatchedPromise: any = null;
if ((window as any)[(Zone as any).__symbol__('setTimeout')]) {
browserPatchedPromise = Promise.resolve('browserPatched');
Expand Down
Loading

0 comments on commit b238b09

Please sign in to comment.