Skip to content

Commit

Permalink
Merge pull request #2759 from davidmoten/using-redev
Browse files Browse the repository at this point in the history
Observable.using should use unsafeSubscribe and enable eager disposal
  • Loading branch information
benjchristensen committed Feb 21, 2015
2 parents 94de8ee + ffe0fa1 commit 6c4c9f1
Show file tree
Hide file tree
Showing 3 changed files with 338 additions and 28 deletions.
39 changes: 37 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2547,7 +2547,7 @@ public final static Observable<Long> timer(long delay, TimeUnit unit, Scheduler
}

/**
* Constructs an Observable that creates a dependent resource object.
* Constructs an Observable that creates a dependent resource object which is disposed of on unsubscription.
* <p>
* <img width="640" height="400" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/using.png" alt="">
* <dl>
Expand All @@ -2568,7 +2568,42 @@ public final static <T, Resource> Observable<T> using(
final Func0<Resource> resourceFactory,
final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory,
final Action1<? super Resource> disposeAction) {
return create(new OnSubscribeUsing<T, Resource>(resourceFactory, observableFactory, disposeAction));
return using(resourceFactory, observableFactory, disposeAction, false);
}

/**
* Constructs an Observable that creates a dependent resource object which is disposed of just before
* termination if <code>disposeEagerly</code> is set to true and unsubscription does not occur before termination. Otherwise
* resource disposal will occur on unsubscription. Eager disposal is particularly appropriate for a synchronous observable
* that resuses resources. <code>disposeAction</code> will only be called once per subscription.
* <p>
* <img width="640" height="400" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/using.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code using} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param resourceFactory
* the factory function to create a resource object that depends on the Observable
* @param observableFactory
* the factory function to create an Observable
* @param disposeAction
* the function that will dispose of the resource
* @param disposeEagerly
* if true then disposal will happen either on unsubscription or just before emission of
* a terminal event (onComplete or onError).
* @return the Observable whose lifetime controls the lifetime of the dependent resource object
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a>
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace
* this parenthetical with the release number)
*/
@Experimental
public final static <T, Resource> Observable<T> using(
final Func0<Resource> resourceFactory,
final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory,
final Action1<? super Resource> disposeAction, boolean disposeEagerly) {
return create(new OnSubscribeUsing<T, Resource>(resourceFactory, observableFactory, disposeAction, disposeEagerly));
}

/**
Expand Down
100 changes: 86 additions & 14 deletions src/main/java/rx/internal/operators/OnSubscribeUsing.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
*/
package rx.internal.operators;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.CompositeException;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subscriptions.Subscriptions;

/**
* Constructs an observable sequence that depends on a resource object.
Expand All @@ -32,35 +36,103 @@ public final class OnSubscribeUsing<T, Resource> implements OnSubscribe<T> {
private final Func0<Resource> resourceFactory;
private final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory;
private final Action1<? super Resource> dispose;
private final boolean disposeEagerly;

public OnSubscribeUsing(Func0<Resource> resourceFactory,
Func1<? super Resource, ? extends Observable<? extends T>> observableFactory,
Action1<? super Resource> dispose) {
Action1<? super Resource> dispose, boolean disposeEagerly) {
this.resourceFactory = resourceFactory;
this.observableFactory = observableFactory;
this.dispose = dispose;
this.disposeEagerly = disposeEagerly;
}

@Override
public void call(Subscriber<? super T> subscriber) {
try {
final Resource resource = resourceFactory.call();
subscriber.add(Subscriptions.create(new Action0() {

@Override
public void call() {
dispose.call(resource);
}
try {

}));
Observable<? extends T> observable = observableFactory.call(resource);
observable.subscribe(subscriber);
// create the resource
final Resource resource = resourceFactory.call();
// create an action/subscription that disposes only once
final DisposeAction<Resource> disposeOnceOnly = new DisposeAction<Resource>(dispose,
resource);
// dispose on unsubscription
subscriber.add(disposeOnceOnly);
// create the observable
final Observable<? extends T> source = observableFactory
// create the observable
.call(resource);
final Observable<? extends T> observable;
// supplement with on termination disposal if requested
if (disposeEagerly)
observable = source
// dispose on completion or error
.doOnTerminate(disposeOnceOnly);
else
observable = source;
try {
// start
observable.unsafeSubscribe(subscriber);
} catch (Throwable e) {
Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly);
if (disposeError != null)
subscriber.onError(new CompositeException(Arrays.asList(e, disposeError)));
else
// propagate error
subscriber.onError(e);
}
} catch (Throwable e) {
// eagerly call unsubscribe since this operator is specifically about resource management
subscriber.unsubscribe();
// then propagate error
subscriber.onError(e);
}
}

private Throwable disposeEagerlyIfRequested(final Action0 disposeOnceOnly) {
if (disposeEagerly)
try {
disposeOnceOnly.call();
return null;
} catch (Throwable e) {
return e;
}
else
return null;
}

private static final class DisposeAction<Resource> extends AtomicBoolean implements Action0,
Subscription {
private static final long serialVersionUID = 4262875056400218316L;

private Action1<? super Resource> dispose;
private Resource resource;

private DisposeAction(Action1<? super Resource> dispose, Resource resource) {
this.dispose = dispose;
this.resource = resource;
lazySet(false); // StoreStore barrier
}

@Override
public void call() {
if (compareAndSet(false, true)) {
try {
dispose.call(resource);
} finally {
resource = null;
dispose = null;
}
}
}

@Override
public boolean isUnsubscribed() {
return get();
}

@Override
public void unsubscribe() {
call();
}
}
}
Loading

0 comments on commit 6c4c9f1

Please sign in to comment.