Skip to content

Commit

Permalink
fix(share): don't have the subscriber capture itself in a closure
Browse files Browse the repository at this point in the history
Resolves an issue where we were inadvertantly returning a teardown that was effectively adding a closure to a subscriber to itself.

Basically it was doing something like this:

```ts
new Observable(subscriber => {
    return () => {
        subscriber.unsubscribe();
    };
});
```
  • Loading branch information
benlesh committed Feb 19, 2021
1 parent 285c6fd commit 67b8035
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions src/internal/operators/share.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Subject } from '../Subject';

import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike, Unsubscribable } from '../types';
import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike } from '../types';
import { Subscription } from '../Subscription';
import { from } from '../observable/from';
import { operate } from '../util/lift';
Expand Down Expand Up @@ -95,24 +95,30 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
let hasCompleted = false;
let hasErrored = false;

// Used to reset the internal state to a "cold"
// state, as though it had never been subscribed to.
const reset = () => {
connection = subject = null;
hasCompleted = hasErrored = false;
};

return operate((source, subscriber) => {
refCount++;
if (!subject) {
subject = connector!();
}

let castSubscription: Unsubscribable | null = subject.subscribe(subscriber);
// Create the subject if we don't have one yet.
subject = subject ?? connector();

// The following line adds the subscription to the subscriber passed.
// Basically, `subscriber === subject.subscribe(subscriber)` is `true`.
subject.subscribe(subscriber);

if (!connection) {
connection = from(source).subscribe({
next: (value) => subject!.next(value),
error: (err) => {
hasErrored = true;
// We need to capture the subject before
// we reset (if we need to reset).
const dest = subject!;
if (resetOnError) {
reset();
Expand All @@ -122,6 +128,8 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
complete: () => {
hasCompleted = true;
const dest = subject!;
// We need to capture the subject before
// we reset (if we need to reset).
if (resetOnComplete) {
reset();
}
Expand All @@ -130,11 +138,16 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
});
}

// This is also added to `subscriber`, technically.
return () => {
refCount--;
castSubscription!.unsubscribe();
castSubscription = null;
if (!refCount && resetOnRefCountZero && !hasErrored && !hasCompleted) {

// If we're resetting on refCount === 0, and it's 0, we only want to do
// that on "unsubscribe", really. Resetting on error or completion is a different
// configuration.
if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) {
// We need to capture the connection before
// we reset (if we need to reset).
const conn = connection;
reset();
conn?.unsubscribe();
Expand Down

0 comments on commit 67b8035

Please sign in to comment.