Skip to content

Commit

Permalink
fix(bufferToggle): accepts promise as openings
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Apr 6, 2016
1 parent 02239fb commit 3d22c7a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 49 deletions.
46 changes: 44 additions & 2 deletions spec/operators/bufferToggle-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,48 @@ describe('Observable.prototype.bufferToggle', () => {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should accept openings resolved promise', (done: MochaDone) => {
const e1 = Observable.concat(
Observable.timer(10).mapTo(1),
Observable.timer(100).mapTo(2),
Observable.timer(150).mapTo(3),
Observable.timer(200).mapTo(4));

const expected = [[1]];

e1.bufferToggle(new Promise((resolve: any) => { resolve(42); }), () => {
return Observable.timer(50);
}).subscribe((x) => {
expect(x).to.deep.equal(expected.shift());
}, (x) => {
done(new Error('should not be called'));
}, () => {
expect(expected.length).to.be.equal(0);
done();
});
});

it('should accept openings rejected promise', (done: MochaDone) => {
const e1 = Observable.concat(Observable.of(1),
Observable.timer(10).mapTo(2),
Observable.timer(10).mapTo(3),
Observable.timer(100).mapTo(4)
);

const expected = 42;

e1.bufferToggle(new Promise((resolve: any, reject: any) => { reject(expected); }), () => {
return Observable.timer(50);
}).subscribe((x) => {
done(new Error('should not be called'));
}, (x) => {
expect(x).to.equal(expected);
done();
}, () => {
done(new Error('should not be called'));
});
});

it('should accept closing selector that returns a resolved promise', (done: MochaDone) => {
const e1 = Observable.concat(Observable.of(1),
Observable.timer(10).mapTo(2),
Expand All @@ -357,8 +399,8 @@ describe('Observable.prototype.bufferToggle', () => {
}, () => {
done(new Error('should not be called'));
}, () => {
expect(expected.length).to.be.equal(0);
done();
expect(expected.length).to.be.equal(0);
done();
});
});

Expand Down
71 changes: 24 additions & 47 deletions src/operator/bufferToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,29 @@ import {InnerSubscriber} from '../InnerSubscriber';
* @see {@link bufferWhen}
* @see {@link windowToggle}
*
* @param {Observable<O>} openings An observable of notifications to start new
* @param {SubscribableOrPromise<O>} openings A Subscribable or Promise of notifications to start new
* buffers.
* @param {function(value: O): Observable} closingSelector A function that takes
* @param {function(value: O): SubscribableOrPromise} closingSelector A function that takes
* the value emitted by the `openings` observable and returns a Subscribable or Promise,
* which, when it emits, signals that the associated buffer should be emitted
* and cleared.
* @return {Observable<T[]>} An observable of arrays of buffered values.
* @method bufferToggle
* @owner Observable
*/
export function bufferToggle<T, O>(openings: Observable<O>,
closingSelector: (value: O) => SubscribableOrPromise<any> | void): Observable<T[]> {
export function bufferToggle<T, O>(openings: SubscribableOrPromise<O>,
closingSelector: (value: O) => SubscribableOrPromise<any>): Observable<T[]> {
return this.lift(new BufferToggleOperator<T, O>(openings, closingSelector));
}

export interface BufferToggleSignature<T> {
<O>(openings: Observable<O>, closingSelector: (value: O) => SubscribableOrPromise<any> | void): Observable<T[]>;
<O>(openings: SubscribableOrPromise<O>, closingSelector: (value: O) => SubscribableOrPromise<any>): Observable<T[]>;
}

class BufferToggleOperator<T, O> implements Operator<T, T[]> {

constructor(private openings: Observable<O>,
private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
constructor(private openings: SubscribableOrPromise<O>,
private closingSelector: (value: O) => SubscribableOrPromise<any>) {
}

call(subscriber: Subscriber<T[]>, source: any): any {
Expand All @@ -79,10 +79,10 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
private contexts: Array<BufferContext<T>> = [];

constructor(destination: Subscriber<T[]>,
private openings: Observable<O>,
private openings: SubscribableOrPromise<O>,
private closingSelector: (value: O) => SubscribableOrPromise<any> | void) {
super(destination);
this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this)));
this.add(subscribeToResult(this, openings));
}

protected _next(value: T): void {
Expand Down Expand Up @@ -118,7 +118,17 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
super._complete();
}

openBuffer(value: O): void {
notifyNext(outerValue: any, innerValue: O,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, O>): void {
outerValue ? this.closeBuffer(outerValue) : this.openBuffer(innerValue);
}

notifyComplete(innerSub: InnerSubscriber<T, O>): void {
this.closeBuffer((<any> innerSub).context);
}

private openBuffer(value: O): void {
try {
const closingSelector = this.closingSelector;
const closingNotifier = closingSelector.call(this, value);
Expand All @@ -130,16 +140,6 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {
}
}

notifyNext(outerValue: any, innerValue: O,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, O>): void {
this.closeBuffer(outerValue);
}

notifyComplete(innerSub: InnerSubscriber<T, O>): void {
this.closeBuffer((<any> innerSub).context);
}

private closeBuffer(context: BufferContext<T>): void {
const contexts = this.contexts;

Expand All @@ -162,36 +162,13 @@ class BufferToggleSubscriber<T, O> extends OuterSubscriber<T, O> {

const innerSubscription = subscribeToResult(this, closingNotifier, <any>context);

if (!innerSubscription.isUnsubscribed) {
if (!innerSubscription || innerSubscription.isUnsubscribed) {
this.closeBuffer(context);
} else {
(<any> innerSubscription).context = context;

this.add(innerSubscription);
subscription.add(innerSubscription);
} else {
this.closeBuffer(context);
}
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class BufferToggleOpeningsSubscriber<T, O> extends Subscriber<O> {
constructor(private parent: BufferToggleSubscriber<T, O>) {
super(null);
}

protected _next(value: O) {
this.parent.openBuffer(value);
}

protected _error(err: any) {
this.parent.error(err);
}

protected _complete() {
// noop
}
}
}

0 comments on commit 3d22c7a

Please sign in to comment.