From 9fea36ddef7c98fe1c47e4dfa7f9ba30d8981ef0 Mon Sep 17 00:00:00 2001 From: Martin Sikora Date: Sat, 31 Mar 2018 00:36:23 +0200 Subject: [PATCH] perf(ReplaySubject): slightly improved performance (#2677) * perf(ReplaySubject): slightly improved performance * perf(ReplaySubject): little performance adjustments * refactor(ReplaySubject): switch back to using splice() instead of shift() * refactor(ReplaySubject): wrap splice() with an "if" condition for performance reasons --- .../subject/replaysubject.js | 17 ++++++ .../subject/replaysubject_windowtime.js | 17 ++++++ .../subject/replaysubject.js | 17 ++++++ .../subject/replaysubject_windowtime.js | 17 ++++++ src/internal/ReplaySubject.ts | 53 ++++++++++++++----- 5 files changed, 108 insertions(+), 13 deletions(-) create mode 100644 perf/micro/current-thread-scheduler/subject/replaysubject.js create mode 100644 perf/micro/current-thread-scheduler/subject/replaysubject_windowtime.js create mode 100644 perf/micro/immediate-scheduler/subject/replaysubject.js create mode 100644 perf/micro/immediate-scheduler/subject/replaysubject_windowtime.js diff --git a/perf/micro/current-thread-scheduler/subject/replaysubject.js b/perf/micro/current-thread-scheduler/subject/replaysubject.js new file mode 100644 index 0000000000..3a5e88fcd9 --- /dev/null +++ b/perf/micro/current-thread-scheduler/subject/replaysubject.js @@ -0,0 +1,17 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldRangeWithCurrentThreadScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.currentThread); + var newRangeWithCurrentThreadScheduler = RxNew.Observable.range(0, 25, RxNew.Scheduler.queue); + + return suite + .add('old ReplaySubject with immediate scheduler', function () { + var subject = new RxOld.ReplaySubject(5, Number.POSITIVE_INFINITY, RxOld.Scheduler.currentThread); + oldRangeWithCurrentThreadScheduler.subscribe(subject); + }) + .add('new ReplaySubject with immediate scheduler', function () { + var subject = new RxNew.ReplaySubject(5, Number.POSITIVE_INFINITY, RxNew.Scheduler.queue); + newRangeWithCurrentThreadScheduler.subscribe(subject); + }); +}; \ No newline at end of file diff --git a/perf/micro/current-thread-scheduler/subject/replaysubject_windowtime.js b/perf/micro/current-thread-scheduler/subject/replaysubject_windowtime.js new file mode 100644 index 0000000000..a338502aef --- /dev/null +++ b/perf/micro/current-thread-scheduler/subject/replaysubject_windowtime.js @@ -0,0 +1,17 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldRangeWithCurrentThreadScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.currentThread); + var newRangeWithCurrentThreadScheduler = RxNew.Observable.range(0, 25, RxNew.Scheduler.queue); + + return suite + .add('old ReplaySubject with immediate scheduler', function () { + var subject = new RxOld.ReplaySubject(5, 50, RxOld.Scheduler.currentThread); + oldRangeWithCurrentThreadScheduler.subscribe(subject); + }) + .add('new ReplaySubject with immediate scheduler', function () { + var subject = new RxNew.ReplaySubject(5, 50, RxNew.Scheduler.queue); + newRangeWithCurrentThreadScheduler.subscribe(subject); + }); +}; \ No newline at end of file diff --git a/perf/micro/immediate-scheduler/subject/replaysubject.js b/perf/micro/immediate-scheduler/subject/replaysubject.js new file mode 100644 index 0000000000..0da0bdbb9d --- /dev/null +++ b/perf/micro/immediate-scheduler/subject/replaysubject.js @@ -0,0 +1,17 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldRangeWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate); + var newRangeWithImmediateScheduler = RxNew.Observable.range(0, 25); + + return suite + .add('old ReplaySubject with immediate scheduler', function () { + var subject = new RxOld.ReplaySubject(5, Number.POSITIVE_INFINITY, RxOld.Scheduler.immediate); + oldRangeWithImmediateScheduler.subscribe(subject); + }) + .add('new ReplaySubject with immediate scheduler', function () { + var subject = new RxNew.ReplaySubject(5, Number.POSITIVE_INFINITY); + newRangeWithImmediateScheduler.subscribe(subject); + }); +}; \ No newline at end of file diff --git a/perf/micro/immediate-scheduler/subject/replaysubject_windowtime.js b/perf/micro/immediate-scheduler/subject/replaysubject_windowtime.js new file mode 100644 index 0000000000..98e5291c01 --- /dev/null +++ b/perf/micro/immediate-scheduler/subject/replaysubject_windowtime.js @@ -0,0 +1,17 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldRangeWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate); + var newRangeWithImmediateScheduler = RxNew.Observable.range(0, 25); + + return suite + .add('old ReplaySubject with immediate scheduler', function () { + var subject = new RxOld.ReplaySubject(5, 50, RxOld.Scheduler.immediate); + oldRangeWithImmediateScheduler.subscribe(subject); + }) + .add('new ReplaySubject with immediate scheduler', function () { + var subject = new RxNew.ReplaySubject(5, 50); + newRangeWithImmediateScheduler.subscribe(subject); + }); +}; \ No newline at end of file diff --git a/src/internal/ReplaySubject.ts b/src/internal/ReplaySubject.ts index 164ace4b0d..171cc48581 100644 --- a/src/internal/ReplaySubject.ts +++ b/src/internal/ReplaySubject.ts @@ -10,9 +10,10 @@ import { SubjectSubscription } from './SubjectSubscription'; * @class ReplaySubject */ export class ReplaySubject extends Subject { - private _events: ReplayEvent[] = []; + private _events: (ReplayEvent | T)[] = []; private _bufferSize: number; private _windowTime: number; + private _infiniteTimeWindow: boolean = false; constructor(bufferSize: number = Number.POSITIVE_INFINITY, windowTime: number = Number.POSITIVE_INFINITY, @@ -20,25 +21,45 @@ export class ReplaySubject extends Subject { super(); this._bufferSize = bufferSize < 1 ? 1 : bufferSize; this._windowTime = windowTime < 1 ? 1 : windowTime; + + if (windowTime === Number.POSITIVE_INFINITY) { + this._infiniteTimeWindow = true; + this.next = this.nextInfiniteTimeWindow; + } else { + this.next = this.nextTimeWindow; + } } - next(value: T): void { - const now = this._getNow(); - this._events.push(new ReplayEvent(now, value)); + private nextInfiniteTimeWindow(value: T): void { + const _events = this._events; + _events.push(value); + // Since this method is invoked in every next() call than the buffer + // can overgrow the max size only by one item + if (_events.length > this._bufferSize) { + _events.shift(); + } + + super.next(value); + } + + private nextTimeWindow(value: T): void { + this._events.push(new ReplayEvent(this._getNow(), value)); this._trimBufferThenGetEvents(); + super.next(value); } protected _subscribe(subscriber: Subscriber): Subscription { - const _events = this._trimBufferThenGetEvents(); + // When `_infiniteTimeWindow === true` then the buffer is already trimmed + const _infiniteTimeWindow = this._infiniteTimeWindow; + const _events = _infiniteTimeWindow ? this._events : this._trimBufferThenGetEvents(); const scheduler = this.scheduler; + const len = _events.length; let subscription: Subscription; if (this.closed) { throw new ObjectUnsubscribedError(); - } else if (this.hasError) { - subscription = Subscription.EMPTY; - } else if (this.isStopped) { + } else if (this.isStopped || this.hasError) { subscription = Subscription.EMPTY; } else { this.observers.push(subscriber); @@ -49,9 +70,14 @@ export class ReplaySubject extends Subject { subscriber.add(subscriber = new ObserveOnSubscriber(subscriber, scheduler)); } - const len = _events.length; - for (let i = 0; i < len && !subscriber.closed; i++) { - subscriber.next(_events[i].value); + if (_infiniteTimeWindow) { + for (let i = 0; i < len && !subscriber.closed; i++) { + subscriber.next(_events[i]); + } + } else { + for (let i = 0; i < len && !subscriber.closed; i++) { + subscriber.next((>_events[i]).value); + } } if (this.hasError) { @@ -71,9 +97,9 @@ export class ReplaySubject extends Subject { const now = this._getNow(); const _bufferSize = this._bufferSize; const _windowTime = this._windowTime; - const _events = this._events; + const _events = []>this._events; - let eventsCount = _events.length; + const eventsCount = _events.length; let spliceCount = 0; // Trim events that fall out of the time window. @@ -96,6 +122,7 @@ export class ReplaySubject extends Subject { return _events; } + } class ReplayEvent {