-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Observable.using should use unsafeSubscribe and enable eager disposal #2759
Conversation
// create the resource | ||
final Resource resource = resourceFactory.call(); | ||
// create an action that disposes only once | ||
final Action0 disposeOnceOnly = createOnceOnlyDisposeAction(resource); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd have a class that implements both Action0 and subscription to avoid creating too many objects:
static final class DisposeAction<R> extends AtomicBoolean implements Action0, Subscription {
Action1<? super R> dispose;
R resource;
public DisposeAction(Action1<? super R> dispose, R resource) {
this.dispose = dispose;
this.resource = resource;
lazySet(false); // StoreStore barrier
}
@Override
public void call() {
if (compareAndSet(false, true)) {
try {
dispose.call(resource);
} finally {
resource = null;
dispose = null;
}
}
}
@Override
public boolean isUnsubscribed() {
return get();
}
public void unsubscribe() {
call();
}
}
18393e4
to
dba1fa1
Compare
Thanks @akarnokd, I've made the change and squashed commits. |
* @return the Observable whose lifetime controls the lifetime of the dependent resource object | ||
* @see <a href="http://reactivex.io/documentation/operators/using.html">ReactiveX operators documentation: Using</a> | ||
*/ | ||
public final static <T, Resource> Observable<T> using( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add @Experimental
here? In addition, could you replace tabs with 4 spaces in OnSubscribeUsing?
687adcd
to
ffe0fa1
Compare
Replaced tabs with spaces, added @experimental annotation to new overload and fixed typo in javadoc, squashed commits |
This seems reasonable to me. I don't use 'using' so trust the conversation you've had to address the necessary functionality. Reading through the other PR this does seem necessary and I agree that 'unsafeSubscribe' is appropriate for this. |
Observable.using should use unsafeSubscribe and enable eager disposal
See #2604 where it was discovered that
Observable.using
usedsubscribe
instead ofunsafeSubscribe
which provoked a race condition leading to an IllegalArgumentException from the merge operator.This PR uses
unsafeSubscribe
and adds an overload forusing
to optionally dispose of resources eagerly (just before completion or error). The use case for this is a synchronous observable where a downstream operation wants to reuse a resource (but because the observable is synchronous the resource cannot get disposed till the downstream completes).Unit tests included.