Skip to content

Commit

Permalink
DOM: Reorder Observable completion events
Browse files Browse the repository at this point in the history
This CL "fixes" Observable unsubscription/teardown timing. As a matter
of accidental historical precedent, Observables in JavaScript (but not
in other languages) had implemented the "rule" that upon
Subscriber#error() or Subscriber#complete(), the subscriber would:
 1. First, invoke the appropriate Observer callback, if provided (i.e.,
    complete() or error() callback).
 2. Signal abort Subscriber#signal, which invokes any teardowns and also
    fires the `abort` event at the signal.

However, after dom@chromium.org discussed this more with
ben@benlesh.com, we came to the conclusion that the principle of "as
soon as you know you will teardown, you MUST close the subscription and
any upstream subscriptions" should be adhered. This means the above
steps must be inverted. This is a small-in-size but medium-in-impact
design change for the Observable concept, and led to a blog post [1] and
an announcement [2] that the RxJS library intends to change its
historical ordering of these events.

This CL:
 1. Inverts the order of the aforementioned steps in the Blink
    implementation.
 2. Improves some tests that assert this new ordering.
 3. Simplifies the takeUntil() operator in general.

The Observable spec will be updated alongside this commit:
WICG/observable#120.

[1]: https://benlesh.com/posts/observables-are-broken-and-so-is-javascript/
[2]: ReactiveX/rxjs#7443

R=masonf@chromium.org

Bug: 1485981
Change-Id: I376e66eef490808d264dc999862a801d591aa278
  • Loading branch information
domfarolino authored and chromium-wpt-export-bot committed Feb 21, 2024
1 parent 0fff981 commit a157fb0
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 57 deletions.
35 changes: 28 additions & 7 deletions dom/observable/tentative/observable-constructor.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,18 @@ test(t => {

source.subscribe({
complete: () => {
activeDuringComplete = innerSubscriber.active
abortedDuringComplete = innerSubscriber.active
activeDuringComplete = innerSubscriber.active;
abortedDuringComplete = innerSubscriber.signal.aborted;
}
});
assert_true(activeBeforeComplete, "Subscription is active before complete");
assert_false(abortedBeforeComplete, "Subscription is not aborted before complete");
assert_false(activeDuringComplete, "Subscription is not active during complete");
assert_false(abortedDuringComplete, "Subscription is not aborted during complete");
assert_false(activeDuringComplete,
"Subscription becomes inactive during Subscriber#complete(), just " +
"before Observer#complete() callback is invoked");
assert_true(abortedDuringComplete,
"Subscription's signal is aborted during Subscriber#complete(), just " +
"before Observer#complete() callback is invoked");
assert_false(activeAfterComplete, "Subscription is not active after complete");
assert_true(abortedAfterComplete, "Subscription is aborted after complete");
}, "Subscription is inactive after complete()");
Expand All @@ -269,13 +273,18 @@ test(t => {

source.subscribe({
error: () => {
activeDuringError = innerSubscriber.active
activeDuringError = innerSubscriber.active;
abortedDuringError = innerSubscriber.signal.aborted;
}
});
assert_true(activeBeforeError, "Subscription is active before error");
assert_false(abortedBeforeError, "Subscription is not aborted before error");
assert_false(activeDuringError, "Subscription is not active during error");
assert_false(abortedDuringError, "Subscription is not aborted during error");
assert_false(activeDuringError,
"Subscription becomes inactive during Subscriber#error(), just " +
"before Observer#error() callback is invoked");
assert_true(abortedDuringError,
"Subscription's signal is aborted during Subscriber#error(), just " +
"before Observer#error() callback is invoked");
assert_false(activeAfterError, "Subscription is not active after error");
assert_true(abortedAfterError, "Subscription is not aborted after error");
}, "Subscription is inactive after error()");
Expand Down Expand Up @@ -690,6 +699,18 @@ test(() => {
assert_true(abortedDuringTeardown2, 'should be aborted during teardown callback 2');
}, "Unsubscription lifecycle");

test(t => {
let innerSubscriber = null;
const source = new Observable(subscriber => {
innerSubscriber = subscriber;
subscriber.error('calling error()');
});

source.subscribe();
assert_equals(innerSubscriber.signal.reason, "calling error()",
"Reason is set correctly");
}, "Subscriber#error() value is stored as Subscriber's AbortSignal's reason");

test(t => {
const source = new Observable((subscriber) => {
let n = 0;
Expand Down
130 changes: 80 additions & 50 deletions dom/observable/tentative/observable-takeUntil.any.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,74 +32,102 @@ promise_test(async () => {
// `takeUntil()` operator, the spec responds to `notifier`'s `next()` by
// unsubscribing from `notifier`, which is what this test asserts.
promise_test(async () => {
const source = new Observable(subscriber => {});
const results = [];
const source = new Observable(subscriber => {
results.push('source subscribe callback');
subscriber.addTeardown(() => results.push('source teardown'));
});

let notifierSubscriberActiveBeforeNext;
let notifierSubscriberActiveAfterNext;
let teardownCalledAfterNext;
let notifierSignalAbortedAfterNext;
const notifier = new Observable(subscriber => {
let teardownCalled;
subscriber.addTeardown(() => teardownCalled = true);
subscriber.addTeardown(() => results.push('notifier teardown'));

results.push('notifier subscribe callback');
// Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`.
notifierSubscriberActiveBeforeNext = subscriber.active;
results.push(`notifer active before next(): ${subscriber.active}`);
subscriber.next('value');
notifierSubscriberActiveAfterNext = subscriber.active;
teardownCalledAfterNext = (teardownCalled === true);
notifierSignalAbortedAfterNext = subscriber.signal.aborted;
results.push(`notifer active after next(): ${subscriber.active}`);
});

let nextOrErrorCalled = false;
let completeCalled = false;
source.takeUntil(notifier).subscribe({
next: () => nextOrErrorCalled = true,
error: () => nextOrErrorCalled = true,
complete: () => completeCalled = true,
next: () => results.push('takeUntil() next callback'),
error: e => results.push(`takeUntil() error callback: ${error}`),
complete: () => results.push('takeUntil() complete callback'),
});
assert_true(notifierSubscriberActiveBeforeNext);
assert_false(notifierSubscriberActiveAfterNext);
assert_true(teardownCalledAfterNext);
assert_true(notifierSignalAbortedAfterNext);
assert_false(nextOrErrorCalled);
assert_true(completeCalled);
}, "takeUntil: notifier next() unsubscribes to notifier");

assert_array_equals(results, [
'notifier subscribe callback',
'notifer active before next(): true',
'notifier teardown',
'takeUntil() complete callback',
'notifer active after next(): false',
]);
}, "takeUntil: notifier next() unsubscribes from notifier");
// This test is identical to the one above, with the exception being that the
// `notifier` calls `subscriber.error()` instead `subscriber.next()`.
promise_test(async () => {
const source = new Observable(subscriber => {});
const results = [];
const source = new Observable(subscriber => {
results.push('source subscribe callback');
subscriber.addTeardown(() => results.push('source teardown'));
});

let notifierSubscriberActiveBeforeNext;
let notifierSubscriberActiveAfterNext;
let teardownCalledAfterNext;
let notifierSignalAbortedAfterNext;
const notifier = new Observable(subscriber => {
let teardownCalled;
subscriber.addTeardown(() => teardownCalled = true);
subscriber.addTeardown(() => results.push('notifier teardown'));

results.push('notifier subscribe callback');
// Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`.
notifierSubscriberActiveBeforeNext = subscriber.active;
results.push(`notifer active before error(): ${subscriber.active}`);
subscriber.error('error');
notifierSubscriberActiveAfterNext = subscriber.active;
teardownCalledAfterNext = (teardownCalled === true);
notifierSignalAbortedAfterNext = subscriber.signal.aborted;
results.push(`notifer active after error(): ${subscriber.active}`);
});

let nextOrErrorCalled = false;
let completeCalled = false;
source.takeUntil(notifier).subscribe({
next: () => nextOrErrorCalled = true,
error: () => nextOrErrorCalled = true,
complete: () => completeCalled = true,
next: () => results.push('takeUntil() next callback'),
error: e => results.push(`takeUntil() error callback: ${error}`),
complete: () => results.push('takeUntil() complete callback'),
});
assert_true(notifierSubscriberActiveBeforeNext);
assert_false(notifierSubscriberActiveAfterNext);
assert_true(teardownCalledAfterNext);
assert_true(notifierSignalAbortedAfterNext);
assert_false(nextOrErrorCalled);
assert_true(completeCalled);
}, "takeUntil: notifier error() unsubscribes to notifier");

assert_array_equals(results, [
'notifier subscribe callback',
'notifer active before error(): true',
'notifier teardown',
'takeUntil() complete callback',
'notifer active after error(): false',
]);
}, "takeUntil: notifier error() unsubscribes from notifier");
// This test is identical to the above except it `throw`s instead of calling
// `Subscriber#error()`.
promise_test(async () => {
const results = [];
const source = new Observable(subscriber => {
results.push('source subscribe callback');
subscriber.addTeardown(() => results.push('source teardown'));
});

const notifier = new Observable(subscriber => {
subscriber.addTeardown(() => results.push('notifier teardown'));

results.push('notifier subscribe callback');
// Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`.
results.push(`notifer active before throw: ${subscriber.active}`);
throw new Error('custom error');
// Won't run:
results.push(`notifer active after throw: ${subscriber.active}`);
});

source.takeUntil(notifier).subscribe({
next: () => results.push('takeUntil() next callback'),
error: e => results.push(`takeUntil() error callback: ${error}`),
complete: () => results.push('takeUntil() complete callback'),
});

assert_array_equals(results, [
'notifier subscribe callback',
'notifer active before throw: true',
'notifier teardown',
'takeUntil() complete callback',
]);
}, "takeUntil: notifier throw Error unsubscribes from notifier");

// Test that `notifier` unsubscribes from source Observable.
promise_test(async t => {
Expand Down Expand Up @@ -130,9 +158,10 @@ promise_test(async t => {
let notifierTeardownCalledBeforeCompleteCallback;
await new Promise(resolve => {
source.takeUntil(notifier).subscribe({
next: () => nextOrErrorCalled = true,
error: () => nextOrErrorCalled = true,
next: () => {nextOrErrorCalled = true; results.push('next callback');},
error: () => {nextOrErrorCalled = true; results.push('error callback');},
complete: () => {
results.push('complete callback');
notifierTeardownCalledBeforeCompleteCallback = notifierTeardownCalled;
resolve();
},
Expand All @@ -145,15 +174,16 @@ promise_test(async t => {
// The notifier/source teardowns are not called by the time the outer
// `Observer#complete()` callback is invoked, but they are all run *after*
// (i.e., before `notifier`'s `subscriber.next()` returns internally).
assert_false(notifierTeardownCalledBeforeCompleteCallback);
assert_true(notifierTeardownCalledBeforeCompleteCallback);
assert_true(notifierTeardownCalled);
assert_array_equals(results, [
"notifier subscribed",
"source subscribed",
"notifier teardown",
"notifier signal abort",
"source teardown",
"source signal abort"
"source signal abort",
"complete callback",
]);
}, "takeUntil: notifier next() unsubscribes from notifier & source observable");

Expand Down

0 comments on commit a157fb0

Please sign in to comment.