Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename some Operator* classes to OnSubscribe* as per #1270 #1286

Merged
merged 3 commits into from
May 30, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions rxjava-core/src/main/java/rx/Observable.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.operators.OperatorRefCount;
import rx.operators.OnSubscribeRefCount;

/**
* A {@code ConnectableObservable} resembles an ordinary {@link Observable}, except that it does not begin
Expand Down Expand Up @@ -75,6 +75,6 @@ public void call(Subscription t1) {
* @return a {@link Observable}
*/
public Observable<T> refCount() {
return create(new OperatorRefCount<T>(this));
return create(new OnSubscribeRefCount<T>(this));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
/**
* Propagates the observable sequence that reacts first.
*/
public final class OperatorAmb<T> implements OnSubscribe<T>{
public final class OnSubscribeAmb<T> implements OnSubscribe<T>{

public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
Expand Down Expand Up @@ -113,7 +113,7 @@ public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? ex
}

public static <T> OnSubscribe<T> amb(final Iterable<? extends Observable<? extends T>> sources) {
return new OperatorAmb<T>(sources);
return new OnSubscribeAmb<T>(sources);
}

private static final class AmbSubscriber<T> extends Subscriber<T> {
Expand Down Expand Up @@ -168,7 +168,7 @@ private boolean isSelected() {

private final Iterable<? extends Observable<? extends T>> sources;

private OperatorAmb(Iterable<? extends Observable<? extends T>> sources) {
private OnSubscribeAmb(Iterable<? extends Observable<? extends T>> sources) {
this.sources = sources;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@
* @param <T>
* the cached value type
*/
public final class OperatorCache<T> implements OnSubscribe<T> {
public final class OnSubscribeCache<T> implements OnSubscribe<T> {
protected final Observable<? extends T> source;
protected final Subject<? super T, ? extends T> cache;
volatile int sourceSubscribed;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<OperatorCache> SRC_SUBSCRIBED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(OperatorCache.class, "sourceSubscribed");
static final AtomicIntegerFieldUpdater<OnSubscribeCache> SRC_SUBSCRIBED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(OnSubscribeCache.class, "sourceSubscribed");

public OperatorCache(Observable<? extends T> source) {
public OnSubscribeCache(Observable<? extends T> source) {
this(source, ReplaySubject.<T> create());
}

/* accessible to tests */OperatorCache(Observable<? extends T> source, Subject<? super T, ? extends T> cache) {
/* accessible to tests */OnSubscribeCache(Observable<? extends T> source, Subject<? super T, ? extends T> cache) {
this.source = source;
this.cache = cache;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
* @param <T> the common basetype of the source values
* @param <R> the result type of the combinator function
*/
public final class OperatorCombineLatest<T, R> implements OnSubscribe<R> {
public final class OnSubscribeCombineLatest<T, R> implements OnSubscribe<R> {
final List<? extends Observable<? extends T>> sources;
final FuncN<? extends R> combinator;

public OperatorCombineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
public OnSubscribeCombineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
this.sources = sources;
this.combinator = combinator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
* return an Observable that will call this function to generate its Observable sequence afresh
* each time a new Observer subscribes.
*/
public final class OperatorDefer<T> implements OnSubscribe<T> {
public final class OnSubscribeDefer<T> implements OnSubscribe<T> {
final Func0<? extends Observable<? extends T>> observableFactory;

public OperatorDefer(Func0<? extends Observable<? extends T>> observableFactory) {
public OnSubscribeDefer(Func0<? extends Observable<? extends T>> observableFactory) {
this.observableFactory = observableFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
* Delays the emission of onNext events by a given amount of time.
* @param <T> the value type
*/
public final class OperatorDelay<T> implements OnSubscribe<T> {
public final class OnSubscribeDelay<T> implements OnSubscribe<T> {

final Observable<? extends T> source;
final long delay;
final TimeUnit unit;
final Scheduler scheduler;

public OperatorDelay(Observable<? extends T> source, long delay, TimeUnit unit, Scheduler scheduler) {
public OnSubscribeDelay(Observable<? extends T> source, long delay, TimeUnit unit, Scheduler scheduler) {
this.source = source;
this.delay = delay;
this.unit = unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
*
* @param <T> the value type
*/
public final class OperatorDelaySubscription<T> implements OnSubscribe<T> {
public final class OnSubscribeDelaySubscription<T> implements OnSubscribe<T> {
final Observable<? extends T> source;
final long time;
final TimeUnit unit;
final Scheduler scheduler;

public OperatorDelaySubscription(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
public OnSubscribeDelaySubscription(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
this.source = source;
this.time = time;
this.unit = unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.SerializedSubscriber;
import rx.operators.OperatorDelay.Emitter;
import rx.operators.OnSubscribeDelay.Emitter;
import rx.subscriptions.CompositeSubscription;

/**
Expand All @@ -30,12 +30,12 @@
* @param <U> the value type of the subscription-delaying observable
* @param <V> the value type of the item-delaying observable
*/
public final class OperatorDelayWithSelector<T, U, V> implements OnSubscribe<T> {
public final class OnSubscribeDelayWithSelector<T, U, V> implements OnSubscribe<T> {
final Observable<? extends T> source;
final Func0<? extends Observable<U>> subscriptionDelay;
final Func1<? super T, ? extends Observable<V>> itemDelay;

public OperatorDelayWithSelector(Observable<? extends T> source, Func1<? super T, ? extends Observable<V>> itemDelay) {
public OnSubscribeDelayWithSelector(Observable<? extends T> source, Func1<? super T, ? extends Observable<V>> itemDelay) {
this.source = source;
this.subscriptionDelay = new Func0<Observable<U>>() {
@Override
Expand All @@ -46,7 +46,7 @@ public Observable<U> call() {
this.itemDelay = itemDelay;
}

public OperatorDelayWithSelector(Observable<? extends T> source, Func0<? extends Observable<U>> subscriptionDelay, Func1<? super T, ? extends Observable<V>> itemDelay) {
public OnSubscribeDelayWithSelector(Observable<? extends T> source, Func0<? extends Observable<U>> subscriptionDelay, Func1<? super T, ? extends Observable<V>> itemDelay) {
this.source = source;
this.subscriptionDelay = subscriptionDelay;
this.itemDelay = itemDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@
* @param <D2> the value type of the right duration
* @param <R> the result value type
*/
public final class OperatorGroupJoin<T1, T2, D1, D2, R> implements OnSubscribe<R> {
public final class OnSubscribeGroupJoin<T1, T2, D1, D2, R> implements OnSubscribe<R> {
protected final Observable<T1> left;
protected final Observable<T2> right;
protected final Func1<? super T1, ? extends Observable<D1>> leftDuration;
protected final Func1<? super T2, ? extends Observable<D2>> rightDuration;
protected final Func2<? super T1, ? super Observable<T2>, ? extends R> resultSelector;

public OperatorGroupJoin(
public OnSubscribeGroupJoin(
Observable<T1> left,
Observable<T2> right,
Func1<? super T1, ? extends Observable<D1>> leftDuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@
* @param <TRightDuration> the right duration type
* @param <R> the result type
*/
public final class OperatorJoin<TLeft, TRight, TLeftDuration, TRightDuration, R> implements OnSubscribe<R> {
public final class OnSubscribeJoin<TLeft, TRight, TLeftDuration, TRightDuration, R> implements OnSubscribe<R> {
final Observable<TLeft> left;
final Observable<TRight> right;
final Func1<TLeft, Observable<TLeftDuration>> leftDurationSelector;
final Func1<TRight, Observable<TRightDuration>> rightDurationSelector;
final Func2<TLeft, TRight, R> resultSelector;

public OperatorJoin(
public OnSubscribeJoin(
Observable<TLeft> left,
Observable<TRight> right,
Func1<TLeft, Observable<TLeftDuration>> leftDurationSelector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
* @param <TIntermediate> the intermediate type
* @param <TResult> the result type
*/
public final class OperatorMulticastSelector<TInput, TIntermediate, TResult> implements OnSubscribe<TResult> {
public final class OnSubscribeMulticastSelector<TInput, TIntermediate, TResult> implements OnSubscribe<TResult> {
final Observable<? extends TInput> source;
final Func0<? extends Subject<? super TInput, ? extends TIntermediate>> subjectFactory;
final Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> resultSelector;

public OperatorMulticastSelector(Observable<? extends TInput> source,
public OnSubscribeMulticastSelector(Observable<? extends TInput> source,
Func0<? extends Subject<? super TInput, ? extends TIntermediate>> subjectFactory,
Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> resultSelector) {
this.source = source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* as there is at least one subscription to the observable sequence.
* @param <T> the value type
*/
public final class OperatorRefCount<T> implements OnSubscribe<T> {
public final class OnSubscribeRefCount<T> implements OnSubscribe<T> {
final ConnectableObservable<? extends T> source;
final Object guard;
/** Guarded by guard. */
Expand All @@ -48,7 +48,7 @@ public final class OperatorRefCount<T> implements OnSubscribe<T> {
final Map<Token, Object> connectionStatus;
/** Occupied indicator. */
private static final Object OCCUPIED = new Object();
public OperatorRefCount(ConnectableObservable<? extends T> source) {
public OnSubscribeRefCount(ConnectableObservable<? extends T> source) {
this.source = source;
this.guard = new Object();
this.connectionStatus = new WeakHashMap<Token, Object>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
* Timer that emits a single 0L and completes after the specified time.
* @see <a href='http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.timer.aspx'>MSDN Observable.Timer</a>
*/
public final class OperatorTimerOnce implements OnSubscribe<Long> {
public final class OnSubscribeTimerOnce implements OnSubscribe<Long> {
final long time;
final TimeUnit unit;
final Scheduler scheduler;

public OperatorTimerOnce(long time, TimeUnit unit, Scheduler scheduler) {
public OnSubscribeTimerOnce(long time, TimeUnit unit, Scheduler scheduler) {
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
* Emit 0L after the initial period and ever increasing number after each period.
* @see <a href='http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.timer.aspx'>MSDN Observable.Timer</a>
*/
public final class OperatorTimerPeriodically implements OnSubscribe<Long> {
public final class OnSubscribeTimerPeriodically implements OnSubscribe<Long> {
final long initialDelay;
final long period;
final TimeUnit unit;
final Scheduler scheduler;

public OperatorTimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* This is blocking so the Subscription returned when calling
* <code>Observable.unsafeSubscribe(Observer)</code> does nothing.
*/
public class OperatorToObservableFuture {
public class OnSubscribeToObservableFuture {
/* package accessible for unit tests */static class ToObservableFuture<T> implements OnSubscribe<T> {
private final Future<? extends T> that;
private final long time;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
/**
* Constructs an observable sequence that depends on a resource object.
*/
public final class OperatorUsing<T, Resource extends Subscription> implements OnSubscribe<T> {
public final class OnSubscribeUsing<T, Resource extends Subscription> implements OnSubscribe<T> {

private final Func0<Resource> resourceFactory;
private final Func1<Resource, ? extends Observable<? extends T>> observableFactory;

public OperatorUsing(Func0<Resource> resourceFactory, Func1<Resource, ? extends Observable<? extends T>> observableFactory) {
public OnSubscribeUsing(Func0<Resource> resourceFactory, Func1<Resource, ? extends Observable<? extends T>> observableFactory) {
this.resourceFactory = resourceFactory;
this.observableFactory = observableFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static rx.operators.OperatorAmb.amb;
import static rx.operators.OnSubscribeAmb.amb;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,7 +36,7 @@
import rx.schedulers.TestScheduler;
import rx.subscriptions.CompositeSubscription;

public class OperatorAmbTest {
public class OnSubscribeAmbTest {

private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;

public class OperatorCacheTest {
public class OnSubscribeCacheTest {

@Test
public void testCache() throws InterruptedException {
Expand Down Expand Up @@ -107,7 +107,7 @@ public Integer call(Long t1) {
}
});

Observable<Integer> source1 = Observable.create(new OperatorCache<Integer>(source0, subject));
Observable<Integer> source1 = Observable.create(new OnSubscribeCache<Integer>(source0, subject));

Observable<Integer> source2 = source1
.repeat(4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

public class OperatorCombineLatestTest {
public class OnSubscribeCombineLatestTest {

@Test
public void testCombineLatestWithFunctionThatThrowsAnException() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import rx.functions.Func0;

@SuppressWarnings("unchecked")
public class OperatorDeferTest {
public class OnSubscribeDeferTest {

@Test
public void testDefer() throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;

public class OperatorDelayTest {
public class OnSubscribeDelayTest {
@Mock
private Observer<Long> observer;
@Mock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import rx.Observable;
import rx.Observer;

public class OperatorFromIterableTest {
public class OnSubscribeFromIterableTest {

@Test
public void testIterable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import rx.functions.Func2;
import rx.subjects.PublishSubject;

public class OperatorGroupJoinTest {
public class OnSubscribeGroupJoinTest {
@Mock
Observer<Object> observer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import rx.functions.Func2;
import rx.subjects.PublishSubject;

public class OperatorJoinTest {
public class OnSubscribeJoinTest {
@Mock
Observer<Object> observer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import rx.subjects.PublishSubject;
import rx.subjects.Subject;

public class OperatorMulticastTest {
public class OnSubscribeMulticastTest {

@Test
public void testMulticast() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import rx.observables.ConnectableObservable;
import rx.schedulers.TestScheduler;

public class OperatorTimerTest {
public class OnSubscribeTimerTest {
@Mock
Observer<Object> observer;
@Mock
Expand Down
Loading