diff --git a/spec/operators/inspect-spec.js b/spec/operators/inspect-spec.js new file mode 100644 index 0000000000..c2dbc1ab36 --- /dev/null +++ b/spec/operators/inspect-spec.js @@ -0,0 +1,329 @@ +/* globals describe, it, expect, expectObservable, expectSubscription, hot, cold */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; +var Scheduler = Rx.Scheduler; +var Promise = require('promise'); + +describe('Observable.prototype.inspect()', function () { + it.asDiagram('inspect')('should emit the last value in each time window', function () { + var e1 = hot('-a-xy-----b--x--cxxx-|'); + var e1subs = '^ !'; + var e2 = cold( '----| '); + var e2subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ']; + var expected = '-----y--------x-----x|'; + + var result = e1.inspect(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should delay the source if values are not emitted often enough', function () { + var e1 = hot('-a--------b-----c----|'); + var e1subs = '^ !'; + var e2 = cold( '----| '); + var e2subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ']; + var expected = '-----a--------b-----c|'; + + var result = e1.inspect(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should inspect with duration Observable using next to close the duration', function () { + var e1 = hot('-a-xy-----b--x--cxxx-|'); + var e1subs = '^ !'; + var e2 = cold( '----x-y-z '); + var e2subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ']; + var expected = '-----y--------x-----x|'; + + var result = e1.inspect(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should interrupt source and duration when result is unsubscribed early', function () { + var e1 = hot('-a-x-y-z-xyz-x-y-z----b--x-x-|'); + var unsub = ' ! '; + var e1subs = '^ ! '; + var e2 = cold( '-----x------------| '); + var e2subs = [' ^ ! ', + ' ^ ! ', + ' ^! ']; + var expected = '------y-----z-- '; + + var result = e1.inspect(function () { return e2; }); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should not break unsubscription chains when result is unsubscribed explicitly', function () { + var e1 = hot('-a-x-y-z-xyz-x-y-z----b--x-x-|'); + var e1subs = '^ ! '; + var e2 = cold( '-----x------------| '); + var e2subs = [' ^ ! ', + ' ^ ! ', + ' ^! ']; + var expected = '------y-----z-- '; + var unsub = ' ! '; + + var result = e1 + .mergeMap(function (x) { return Observable.of(x); }) + .inspect(function () { return e2; }) + .mergeMap(function (x) { return Observable.of(x); }); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should handle a busy producer emitting a regular repeating sequence', function () { + var e1 = hot('abcdefabcdefabcdefabcdefa|'); + var e1subs = '^ !'; + var e2 = cold('-----| '); + var e2subs = ['^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^!']; + var expected = '-----f-----f-----f-----f-|'; + + var result = e1.inspect(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should mirror source if durations are always empty', function () { + var e1 = hot('abcdefabcdefabcdefabcdefa|'); + var e1subs = '^ !'; + var e2 = cold('|'); + var expected = 'abcdefabcdefabcdefabcdefa|'; + + var result = e1.inspect(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should emit no values if duration is a never', function () { + var e1 = hot('----abcdefabcdefabcdefabcdefa|'); + var e1subs = '^ !'; + var e2 = cold('-'); + var e2subs = ' ^ !'; + var expected = '-----------------------------|'; + + var result = e1.inspect(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should unsubscribe duration Observable when source raise error', function () { + var e1 = hot('----abcdefabcdefabcdefabcdefa#'); + var e1subs = '^ !'; + var e2 = cold('-'); + var e2subs = ' ^ !'; + var expected = '-----------------------------#'; + + var result = e1.inspect(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should raise error as soon as just-throw duration is used', function () { + var e1 = hot('----abcdefabcdefabcdefabcdefa|'); + var e1subs = '^ ! '; + var e2 = cold('#'); + var e2subs = ' (^!) '; + var expected = '----(-#) '; + + var result = e1.inspect(function () { return e2; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should inspect using durations of varying lengths', function () { + var e1 = hot('abcdefabcdabcdefghabca| '); + var e1subs = '^ ! '; + var e2 = [cold('-----| '), + cold( '---| '), + cold( '-------| '), + cold( '--| '), + cold( '----|')]; + var e2subs = ['^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^! ']; + var expected = '-----f---d-------h--c-| '; + + var i = 0; + var result = e1.inspect(function () { return e2[i++]; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + for (var j = 0; j < e2.length; j++) { + expectSubscriptions(e2[j].subscriptions).toBe(e2subs[j]); + } + }); + + it('should propagate error from duration Observable', function () { + var e1 = hot('abcdefabcdabcdefghabca| '); + var e1subs = '^ ! '; + var e2 = [cold('-----| '), + cold( '---| '), + cold( '-------# ')]; + var e2subs = ['^ ! ', + ' ^ ! ', + ' ^ ! ']; + var expected = '-----f---d-------# '; + + var i = 0; + var result = e1.inspect(function () { return e2[i++]; }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + for (var j = 0; j < e2.length; j++) { + expectSubscriptions(e2[j].subscriptions).toBe(e2subs[j]); + } + }); + + it('should propagate error thrown from durationSelector function', function () { + var e1 = hot('abcdefabcdabcdefghabca| '); + var e1subs = '^ ! '; + var e2 = [cold('-----| '), + cold( '---| '), + cold( '-------| ')]; + var e2subs = ['^ ! ', + ' ^ ! ']; + var expected = '-----f---d# '; + + var i = 0; + var result = e1.inspect(function () { + if (i === 2) { + throw 'error'; + } + return e2[i++]; + }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + for (var j = 0; j < e2subs.length; j++) { + expectSubscriptions(e2[j].subscriptions).toBe(e2subs[j]); + } + }); + + it('should complete when source does not emit', function () { + var e1 = hot('-----|'); + var subs = '^ !'; + var expected = '-----|'; + function durationSelector() { return cold('-----|'); } + + expectObservable(e1.inspect(durationSelector)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should raise error when source does not emit and raises error', function () { + var e1 = hot('-----#'); + var subs = '^ !'; + var expected = '-----#'; + function durationSelector() { return cold('-----|'); } + + expectObservable(e1.inspect(durationSelector)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle an empty source', function () { + var e1 = cold('|'); + var subs = '(^!)'; + var expected = '|'; + function durationSelector() { return cold('-----|'); } + + expectObservable(e1.inspect(durationSelector)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a never source', function () { + var e1 = cold('-'); + var subs = '^'; + var expected = '-'; + function durationSelector() { return cold('-----|'); } + + expectObservable(e1.inspect(durationSelector)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a throw source', function () { + var e1 = cold('#'); + var subs = '(^!)'; + var expected = '#'; + function durationSelector() { return cold('-----|'); } + + expectObservable(e1.inspect(durationSelector)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should inspect by promise resolves', function (done) { + var e1 = Observable.interval(10).take(5); + var expected = [0,1,2,3]; + + e1.inspect(function () { + return new Promise(function (resolve) { resolve(42); }); + }).subscribe( + function (x) { + expect(x).toEqual(expected.shift()); }, + function () { + done('should not be called'); + }, + function () { + expect(expected.length).toBe(0); + done(); + } + ); + }); + + it('should raise error when promise rejects', function (done) { + var e1 = Observable.interval(10).take(4); + var expected = [0,1,2]; + var error = new Error('error'); + + e1.inspect(function (x) { + if (x === 4) { + return new Promise(function (resolve, reject) {reject(error);}); + } else { + return new Promise(function (resolve) {resolve(42);}); + } + }).subscribe( + function (x) { + expect(x).toEqual(expected.shift()); }, + function (err) { + expect(err).toBe(error); + expect(expected.length).toBe(0); + done(); + }, + function () { + done('should not be called'); + } + ); + }); +}); diff --git a/spec/operators/inspectTime-spec.js b/spec/operators/inspectTime-spec.js new file mode 100644 index 0000000000..41a7c8cbce --- /dev/null +++ b/spec/operators/inspectTime-spec.js @@ -0,0 +1,136 @@ +/* globals describe, it, expect, expectObservable, expectSubscription, hot, cold, rxTestScheduler, expectSubscriptions */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; +var Scheduler = Rx.Scheduler; + +describe('Observable.prototype.inspectTime()', function () { + it.asDiagram('inspectTime(50)')('should emit the last value in each time window', function () { + var e1 = hot('-a-x-y----b---x-cx---|'); + var subs = '^ !'; + var expected = '------y--------x-----|'; + + var result = e1.inspectTime(50, rxTestScheduler); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should inspect events by 50 time units', function (done) { + Observable.of(1, 2, 3) + .inspectTime(50) + .subscribe(function (x) { + done('should not be called'); + }, null, done); + }); + + it('should inspect events multiple times', function () { + var expected = ['1-2', '2-2']; + Observable.concat( + Observable.timer(0, 10, rxTestScheduler).take(3).map(function (x) { return '1-' + x; }), + Observable.timer(80, 10, rxTestScheduler).take(5).map(function (x) { return '2-' + x; }) + ) + .inspectTime(50, rxTestScheduler) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }); + + rxTestScheduler.flush(); + }); + + it('should delay the source if values are not emitted often enough', function () { + var e1 = hot('-a--------b-----c----|'); + var subs = '^ !'; + var expected = '------a--------b-----|'; + + expectObservable(e1.inspectTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a busy producer emitting a regular repeating sequence', function () { + var e1 = hot('abcdefabcdefabcdefabcdefa|'); + var subs = '^ !'; + var expected = '-----f-----f-----f-----f-|'; + + expectObservable(e1.inspectTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should complete when source does not emit', function () { + var e1 = hot('-----|'); + var subs = '^ !'; + var expected = '-----|'; + + expectObservable(e1.inspectTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should raise error when source does not emit and raises error', function () { + var e1 = hot('-----#'); + var subs = '^ !'; + var expected = '-----#'; + + expectObservable(e1.inspectTime(10, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle an empty source', function () { + var e1 = cold('|'); + var subs = '(^!)'; + var expected = '|'; + + expectObservable(e1.inspectTime(30, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a never source', function () { + var e1 = cold('-'); + var subs = '^'; + var expected = '-'; + + expectObservable(e1.inspectTime(30, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a throw source', function () { + var e1 = cold('#'); + var subs = '(^!)'; + var expected = '#'; + + expectObservable(e1.inspectTime(30, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should not complete when source does not complete', function () { + var e1 = hot('-a--(bc)-------d----------------'); + var unsub = ' !'; + var subs = '^ !'; + var expected = '------c-------------d-----------'; + + expectObservable(e1.inspectTime(50, rxTestScheduler), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should not break unsubscription chains when result is unsubscribed explicitly', function () { + var e1 = hot('-a--(bc)-------d----------------'); + var subs = '^ !'; + var expected = '------c-------------d-----------'; + var unsub = ' !'; + + var result = e1 + .mergeMap(function (x) { return Observable.of(x); }) + .inspectTime(50, rxTestScheduler) + .mergeMap(function (x) { return Observable.of(x); }); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should inspect values until source raises error', function () { + var e1 = hot('-a--(bc)-------d---------------#'); + var subs = '^ !'; + var expected = '------c-------------d----------#'; + + expectObservable(e1.inspectTime(50, rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); +}); diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 0fae32bac9..9aa9769ab8 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -40,6 +40,8 @@ export interface CoreOperators { elementSelector?: (value: T) => R, durationSelector?: (group: GroupedObservable) => Observable) => Observable>; ignoreElements?: () => Observable; + inspect?: (durationSelector: (value: T) => Observable | Promise) => Observable; + inspectTime?: (delay: number, scheduler?: Scheduler) => Observable; last?: (predicate?: (value: T, index: number) => boolean, resultSelector?: (value: T, index: number) => R, defaultValue?: any) => Observable | Observable; diff --git a/src/Observable.ts b/src/Observable.ts index 893583aba3..4e3ddc6234 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -220,6 +220,8 @@ export class Observable implements CoreOperators { elementSelector?: (value: T) => R, durationSelector?: (group: GroupedObservable) => Observable) => Observable>; ignoreElements: () => Observable; + inspect: (durationSelector: (value: T) => Observable | Promise) => Observable; + inspectTime: (delay: number, scheduler?: Scheduler) => Observable; last: (predicate?: (value: T, index: number) => boolean, resultSelector?: (value: T, index: number) => R, defaultValue?: any) => Observable | Observable; diff --git a/src/Rx.DOM.ts b/src/Rx.DOM.ts index 864d6cefed..d8fe44f357 100644 --- a/src/Rx.DOM.ts +++ b/src/Rx.DOM.ts @@ -58,6 +58,8 @@ import './add/operator/finally'; import './add/operator/first'; import './add/operator/groupBy'; import './add/operator/ignoreElements'; +import './add/operator/inspect'; +import './add/operator/inspectTime'; import './add/operator/every'; import './add/operator/last'; import './add/operator/map'; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index c9d10dd49c..eadbe64436 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -82,6 +82,8 @@ import './add/operator/finally'; import './add/operator/first'; import './add/operator/groupBy'; import './add/operator/ignoreElements'; +import './add/operator/inspect'; +import './add/operator/inspectTime'; import './add/operator/isEmpty'; import './add/operator/every'; import './add/operator/last'; diff --git a/src/Rx.ts b/src/Rx.ts index b051da45ff..6507158e97 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -57,6 +57,8 @@ import './add/operator/finally'; import './add/operator/first'; import './add/operator/groupBy'; import './add/operator/ignoreElements'; +import './add/operator/inspect'; +import './add/operator/inspectTime'; import './add/operator/every'; import './add/operator/last'; import './add/operator/let'; diff --git a/src/add/operator/inspect.ts b/src/add/operator/inspect.ts new file mode 100644 index 0000000000..31e147bf38 --- /dev/null +++ b/src/add/operator/inspect.ts @@ -0,0 +1,10 @@ +/** + * Everything in this file is generated by the 'tools/generate-operator-patches.ts' script. + * Any manual edits to this file will be lost next time the script is run. + **/ +import {Observable} from '../../Observable'; +import {inspect} from '../../operator/inspect'; + +Observable.prototype.inspect = inspect; + +export var _void: void; \ No newline at end of file diff --git a/src/add/operator/inspectTime.ts b/src/add/operator/inspectTime.ts new file mode 100644 index 0000000000..9ab20aa2cb --- /dev/null +++ b/src/add/operator/inspectTime.ts @@ -0,0 +1,10 @@ +/** + * Everything in this file is generated by the 'tools/generate-operator-patches.ts' script. + * Any manual edits to this file will be lost next time the script is run. + **/ +import {Observable} from '../../Observable'; +import {inspectTime} from '../../operator/inspectTime'; + +Observable.prototype.inspectTime = inspectTime; + +export var _void: void; \ No newline at end of file diff --git a/src/operator/inspect.ts b/src/operator/inspect.ts new file mode 100644 index 0000000000..942d6cf558 --- /dev/null +++ b/src/operator/inspect.ts @@ -0,0 +1,69 @@ +import {Operator} from '../Operator'; +import {Subscriber} from '../Subscriber'; +import {Observable} from '../Observable'; +import {Subscription} from '../Subscription'; + +import {tryCatch} from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import {OuterSubscriber} from '../OuterSubscriber'; +import {subscribeToResult} from '../util/subscribeToResult'; + +export function inspect(durationSelector: (value: T) => Observable | Promise): Observable { + return this.lift(new InspectOperator(durationSelector)); +} + +class InspectOperator implements Operator { + constructor(private durationSelector: (value: T) => Observable | Promise) { + } + + call(subscriber: Subscriber): Subscriber { + return new InspectSubscriber(subscriber, this.durationSelector); + } +} + +class InspectSubscriber extends OuterSubscriber { + + private value: T; + private hasValue: boolean = false; + private throttled: Subscription; + + constructor(destination: Subscriber, + private durationSelector: (value: T) => Observable | Promise) { + super(destination); + } + + protected _next(value: T): void { + this.value = value; + this.hasValue = true; + if (!this.throttled) { + const duration = tryCatch(this.durationSelector)(value); + if (duration === errorObject) { + this.destination.error(errorObject.e); + } else { + this.add(this.throttled = subscribeToResult(this, duration)); + } + } + } + + clearThrottle() { + const { value, hasValue, throttled } = this; + if (throttled) { + this.remove(throttled); + this.throttled = null; + throttled.unsubscribe(); + } + if (hasValue) { + this.value = null; + this.hasValue = false; + this.destination.next(value); + } + } + + notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { + this.clearThrottle(); + } + + notifyComplete(): void { + this.clearThrottle(); + } +} diff --git a/src/operator/inspectTime.ts b/src/operator/inspectTime.ts new file mode 100644 index 0000000000..cef4f928d4 --- /dev/null +++ b/src/operator/inspectTime.ts @@ -0,0 +1,58 @@ +import {asap} from '../scheduler/asap'; +import {Operator} from '../Operator'; +import {Scheduler} from '../Scheduler'; +import {Subscriber} from '../Subscriber'; +import {Observable} from '../Observable'; +import {Subscription} from '../Subscription'; + +export function inspectTime(delay: number, scheduler: Scheduler = asap): Observable { + return this.lift(new InspectTimeOperator(delay, scheduler)); +} + +class InspectTimeOperator implements Operator { + constructor(private delay: number, private scheduler: Scheduler) { + } + + call(subscriber: Subscriber): Subscriber { + return new InspectTimeSubscriber(subscriber, this.delay, this.scheduler); + } +} + +class InspectTimeSubscriber extends Subscriber { + + private value: T; + private hasValue: boolean = false; + private throttled: Subscription; + + constructor(destination: Subscriber, + private delay: number, + private scheduler: Scheduler) { + super(destination); + } + + protected _next(value: T): void { + this.value = value; + this.hasValue = true; + if (!this.throttled) { + this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.delay, this)); + } + } + + clearThrottle(): void { + const { value, hasValue, throttled } = this; + if (throttled) { + this.remove(throttled); + this.throttled = null; + throttled.unsubscribe(); + } + if (hasValue) { + this.value = null; + this.hasValue = false; + this.destination.next(value); + } + } +} + +function dispatchNext(subscriber: InspectTimeSubscriber): void { + subscriber.clearThrottle(); +}