Skip to content

Commit

Permalink
fix(audit, auditTime): audit and auditTime emit last value after sour…
Browse files Browse the repository at this point in the history
…ce completes (#5799)

* test(audit): 5730

* test(auditTime): 5730

* fix(audit,auditTime): audit and auditTime emit last value after source completes

* fix(review): address review comments

resolves #5730
  • Loading branch information
Oleksandr Sherekin authored Oct 15, 2020
1 parent ee48f60 commit 643bc85
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 42 deletions.
75 changes: 46 additions & 29 deletions spec/operators/audit-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,17 @@ describe('audit operator', () => {

it('should handle a busy producer emitting a regular repeating sequence', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const e1 = hot(' abcdefabcdefabcdefabcdefa|');
const e1subs = ' ^------------------------!';
const e2 = cold(' -----| ');
const e1 = hot(' abcdefabcdefabcdefabcdefa| ');
const e1subs = ' ^------------------------! ';
const e2 = cold(' -----| ');
const e2subs = [
' ^----! ',
' ------^----! ',
' ------------^----! ',
' ------------------^----! ',
' ------------------------^!'
' ^----! ',
' ------^----! ',
' ------------^----! ',
' ------------------^----! ',
' ------------------------^----!'
];
const expected = '-----f-----f-----f-----f-|';
const expected = '-----f-----f-----f-----f-----(a|)';

const result = e1.pipe(audit(() => e2));

Expand Down Expand Up @@ -168,13 +168,13 @@ describe('audit operator', () => {
});
});

it('should emit no values if duration is a never', () => {
it('should emit no values and never complete if duration is a never', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const e1 = hot(' ----abcdefabcdefabcdefabcdefa|');
const e1subs = ' ^----------------------------!';
const e2 = cold(' -');
const e2subs = ' ----^------------------------!';
const expected = '-----------------------------|';
const e2subs = ' ----^-------------------------';
const expected = '------------------------------';

const result = e1.pipe(audit(() => e2));

Expand Down Expand Up @@ -232,23 +232,23 @@ describe('audit operator', () => {

it('should audit using durations of varying lengths', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const e1 = hot(' abcdefabcdabcdefghabca|');
const e1subs = ' ^---------------------!';
const e1 = hot(' abcdefabcdabcdefghabca| ');
const e1subs = ' ^---------------------! ';
const e2 = [
cold(' -----| '),
cold(' ---| '),
cold(' -------| '),
cold(' --| '),
cold(' ----| ')
cold(' -----| '),
cold(' ---| '),
cold(' -------| '),
cold(' --| '),
cold(' ----| ')
];
const e2subs = [
' ^----! ',
' ------^--! ',
' ----------^------! ',
' ------------------^-! ',
' ---------------------^! '
' ^----! ',
' ------^--! ',
' ----------^------! ',
' ------------------^-! ',
' ---------------------^---! '
];
const expected = '-----f---d-------h--c-| ';
const expected = '-----f---d-------h--c----(a|)';

let i = 0;
const result = e1.pipe(audit(() => e2[i++]));
Expand Down Expand Up @@ -391,12 +391,10 @@ describe('audit operator', () => {

it('should audit by promise resolves', (done: MochaDone) => {
const e1 = interval(10).pipe(take(5));
const expected = [0, 1, 2, 3];
const expected = [0, 1, 2, 3, 4];

e1.pipe(
audit(() => {
return new Promise((resolve: any) => { resolve(42); });
})
audit(() => Promise.resolve(42))
).subscribe(
(x: number) => {
expect(x).to.equal(expected.shift()); },
Expand Down Expand Up @@ -455,4 +453,23 @@ describe('audit operator', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

it('should emit last value after duration completes if source completes first', () => {
testScheduler.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const e1 = hot('-a--------xy| ');
const e1subs = ' ^-----------! ';
const e2 = cold(' ----| ');
const e2subs = [
' -^---! ',
' ----------^---!'
];
const expected = '-----a--------(y|)';

const result = e1.pipe(audit(() => e2));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});
});
10 changes: 5 additions & 5 deletions spec/operators/auditTime-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ describe('auditTime operator', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -a-x-y----b---x-cx---|');
const subs = ' ^--------------------!';
const expected = '------y--------x-----|';
const expected = '------y--------x-----(x|)';

const result = e1.pipe(auditTime(5, testScheduler));

Expand All @@ -26,11 +26,11 @@ describe('auditTime operator', () => {
});

it('should auditTime events by 5 time units', (done: MochaDone) => {
const expected = 3;
of(1, 2, 3).pipe(
auditTime(5)
).subscribe((x: number) => {
done(new Error('should not be called'));
}, null, () => {
expect(x).to.equal(expected);
done();
});
});
Expand All @@ -50,7 +50,7 @@ describe('auditTime operator', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -a--------b-----c----|');
const subs = ' ^--------------------!';
const expected = '------a--------b-----|';
const expected = '------a--------b-----(c|)';

expectObservable(e1.pipe(auditTime(5, testScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
Expand All @@ -61,7 +61,7 @@ describe('auditTime operator', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' abcdefabcdefabcdefabcdefa|');
const subs = ' ^------------------------!';
const expected = '-----f-----f-----f-----f-|';
const expected = '-----f-----f-----f-----f-----(a|)';

expectObservable(e1.pipe(auditTime(5, testScheduler))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
Expand Down
26 changes: 18 additions & 8 deletions src/internal/operators/audit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export function audit<T>(durationSelector: (value: T) => ObservableInput<any>):
let hasValue = false;
let lastValue: T | null = null;
let durationSubscriber: Subscriber<any> | null = null;
let isComplete = false;

const endDuration = () => {
durationSubscriber?.unsubscribe();
Expand All @@ -65,18 +66,27 @@ export function audit<T>(durationSelector: (value: T) => ObservableInput<any>):
lastValue = null;
subscriber.next(value);
}
isComplete && subscriber.complete();
};

source.subscribe(
new OperatorSubscriber(subscriber, (value) => {
hasValue = true;
lastValue = value;
if (!durationSubscriber) {
innerFrom(durationSelector(value)).subscribe(
(durationSubscriber = new OperatorSubscriber(subscriber, endDuration, undefined, endDuration))
);
new OperatorSubscriber(
subscriber,
(value) => {
hasValue = true;
lastValue = value;
if (!durationSubscriber) {
innerFrom(durationSelector(value)).subscribe(
(durationSubscriber = new OperatorSubscriber(subscriber, endDuration, undefined, endDuration))
);
}
},
undefined,
() => {
isComplete = true;
(!hasValue || !durationSubscriber || durationSubscriber.closed) && subscriber.complete();
}
})
)
);
});
}

0 comments on commit 643bc85

Please sign in to comment.