Skip to content

Commit

Permalink
replace tabs with spaces and add Experimental annotation to new using…
Browse files Browse the repository at this point in the history
… overload, fix typo in javadoc
  • Loading branch information
davidmoten committed Feb 20, 2015
1 parent dba1fa1 commit 687adcd
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 92 deletions.
6 changes: 5 additions & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2575,7 +2575,7 @@ public final static <T, Resource> Observable<T> using(
* Constructs an Observable that creates a dependent resource object which is disposed of just before
* termination if <code>disposeEagerly</code> is set to true and unsubscription does not occur before termination. Otherwise
* resource disposal will occur on unsubscription. Eager disposal is particularly appropriate for a synchronous observable
* that resuse resources. <code>disposeAction</code> will only be called once per subscription.
* that resuses resources. <code>disposeAction</code> will only be called once per subscription.
* <p>
* <img width="640" height="400" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/using.png" alt="">
* <dl>
Expand All @@ -2594,7 +2594,11 @@ public final static <T, Resource> Observable<T> using(
* a terminal event (onComplete or onError).
* @return the Observable whose lifetime controls the lifetime of the dependent resource object
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a>
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace
* this parenthetical with the release number)
*/
@Experimental
public final static <T, Resource> Observable<T> using(
final Func0<Resource> resourceFactory,
final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory,
Expand Down
186 changes: 95 additions & 91 deletions src/main/java/rx/internal/operators/OnSubscribeUsing.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,102 +33,106 @@
*/
public final class OnSubscribeUsing<T, Resource> implements OnSubscribe<T> {

private final Func0<Resource> resourceFactory;
private final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory;
private final Action1<? super Resource> dispose;
private final boolean disposeEagerly;
private final Func0<Resource> resourceFactory;
private final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory;
private final Action1<? super Resource> dispose;
private final boolean disposeEagerly;

public OnSubscribeUsing(
Func0<Resource> resourceFactory,
Func1<? super Resource, ? extends Observable<? extends T>> observableFactory,
Action1<? super Resource> dispose, boolean disposeEagerly) {
this.resourceFactory = resourceFactory;
this.observableFactory = observableFactory;
this.dispose = dispose;
this.disposeEagerly = disposeEagerly;
}
public OnSubscribeUsing(Func0<Resource> resourceFactory,
Func1<? super Resource, ? extends Observable<? extends T>> observableFactory,
Action1<? super Resource> dispose, boolean disposeEagerly) {
this.resourceFactory = resourceFactory;
this.observableFactory = observableFactory;
this.dispose = dispose;
this.disposeEagerly = disposeEagerly;
}

@Override
public void call(Subscriber<? super T> subscriber) {
@Override
public void call(Subscriber<? super T> subscriber) {

try {

// create the resource
final Resource resource = resourceFactory.call();
// create an action/subscription that disposes only once
final DisposeAction<Resource> disposeOnceOnly = new DisposeAction<Resource>(dispose, resource);
// dispose on unsubscription
subscriber.add(disposeOnceOnly);
// create the observable
final Observable<? extends T> source = observableFactory
// create the observable
.call(resource);
final Observable<? extends T> observable;
// supplement with on termination disposal if requested
if (disposeEagerly)
observable = source
// dispose on completion or error
.doOnTerminate(disposeOnceOnly);
else
observable = source;
try {
// start
observable.unsafeSubscribe(subscriber);
} catch (Throwable e) {
Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly);
if (disposeError != null)
subscriber.onError(new CompositeException(Arrays.asList(e,
disposeError)));
else
// propagate error
subscriber.onError(e);
}
} catch (Throwable e) {
// then propagate error
subscriber.onError(e);
}
}
try {

private Throwable disposeEagerlyIfRequested(final Action0 disposeOnceOnly) {
if (disposeEagerly)
try {
disposeOnceOnly.call();
return null;
} catch (Throwable e) {
return e;
}
else
return null;
}
// create the resource
final Resource resource = resourceFactory.call();
// create an action/subscription that disposes only once
final DisposeAction<Resource> disposeOnceOnly = new DisposeAction<Resource>(dispose,
resource);
// dispose on unsubscription
subscriber.add(disposeOnceOnly);
// create the observable
final Observable<? extends T> source = observableFactory
// create the observable
.call(resource);
final Observable<? extends T> observable;
// supplement with on termination disposal if requested
if (disposeEagerly)
observable = source
// dispose on completion or error
.doOnTerminate(disposeOnceOnly);
else
observable = source;
try {
// start
observable.unsafeSubscribe(subscriber);
} catch (Throwable e) {
Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly);
if (disposeError != null)
subscriber.onError(new CompositeException(Arrays.asList(e, disposeError)));
else
// propagate error
subscriber.onError(e);
}
} catch (Throwable e) {
// then propagate error
subscriber.onError(e);
}
}

private static final class DisposeAction<Resource> extends AtomicBoolean implements Action0, Subscription {
private Throwable disposeEagerlyIfRequested(final Action0 disposeOnceOnly) {
if (disposeEagerly)
try {
disposeOnceOnly.call();
return null;
} catch (Throwable e) {
return e;
}
else
return null;
}

private static final class DisposeAction<Resource> extends AtomicBoolean implements Action0,
Subscription {
private static final long serialVersionUID = 4262875056400218316L;

private Action1<? super Resource> dispose;
private Resource resource;

private DisposeAction(Action1<? super Resource> dispose, Resource resource) {
this.dispose = dispose;
this.resource = resource;
lazySet(false); // StoreStore barrier
}
@Override
public void call() {
if (compareAndSet(false, true)) {
try {
dispose.call(resource);
} finally {
resource = null;
dispose = null;
}
}
}
@Override
public boolean isUnsubscribed() {
return get();
}
public void unsubscribe() {
call();
}
}
private Resource resource;

private DisposeAction(Action1<? super Resource> dispose, Resource resource) {
this.dispose = dispose;
this.resource = resource;
lazySet(false); // StoreStore barrier
}

@Override
public void call() {
if (compareAndSet(false, true)) {
try {
dispose.call(resource);
} finally {
resource = null;
dispose = null;
}
}
}

@Override
public boolean isUnsubscribed() {
return get();
}

@Override
public void unsubscribe() {
call();
}
}
}

0 comments on commit 687adcd

Please sign in to comment.