diff --git a/src/operators/retry.ts b/src/operators/retry.ts index 8cb10d288e..5b000c9d74 100644 --- a/src/operators/retry.ts +++ b/src/operators/retry.ts @@ -2,36 +2,88 @@ import Operator from '../Operator'; import Observer from '../Observer'; import Subscriber from '../Subscriber'; import Observable from '../Observable'; +import Subscription from '../Subscription'; export default function retry(count: number = 0): Observable { return this.lift(new RetryOperator(count, this)); } class RetryOperator implements Operator { - constructor(private count: number, protected original: Observable) { + constructor(private count: number, + protected source: Observable) { } call(subscriber: Subscriber): Subscriber { - return new RetrySubscriber(subscriber, this.count, this.original); + return new FirstRetrySubscriber(subscriber, this.count, this.source); } } -class RetrySubscriber extends Subscriber { - private retries: number = 0; - constructor(destination: Subscriber, private count: number, private original: Observable) { - super(destination); +class FirstRetrySubscriber extends Subscriber { + private lastSubscription: Subscription; + + constructor(public destination: Subscriber, + private count: number, + private source: Observable) { + super(null); + this.lastSubscription = this; + } + + _next(value: T) { + this.destination.next(value); + } + + error(error?) { + if (!this.isUnsubscribed) { + super.unsubscribe(); + this.resubscribe(); + } + } + + _complete() { + super.unsubscribe(); + this.destination.complete(); + } + + unsubscribe() { + const lastSubscription = this.lastSubscription; + if (lastSubscription === this) { + super.unsubscribe(); + } else { + lastSubscription.unsubscribe(); + } + } + + resubscribe(retried: number = 0) { + this.lastSubscription.unsubscribe(); + const nextSubscriber = new RetryMoreSubscriber(this, this.count, retried + 1); + this.lastSubscription = this.source.subscribe(nextSubscriber); + } +} + +class RetryMoreSubscriber extends Subscriber { + constructor(private parent: FirstRetrySubscriber, + private count: number, + private retried: number = 0) { + super(null); + } + + _next(value: T) { + this.parent.destination.next(value); } _error(err: any) { + const parent = this.parent; + const retried = this.retried; const count = this.count; - if (count && count === (this.retries += 1)) { - this.destination.error(err); + + if (count && retried === count) { + parent.destination.error(err); } else { - this.resubscribe(); + parent.resubscribe(retried); } } - resubscribe() { - this.original.subscribe(this); + _complete() { + this.parent.destination.complete(); } -} \ No newline at end of file +}