Skip to content

Commit

Permalink
Merge pull request #3448 from hyleung/single_delay
Browse files Browse the repository at this point in the history
Single delay
  • Loading branch information
abersnaze committed Oct 16, 2015
2 parents f9d3e99 + a596f0f commit 1fd245b
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4127,7 +4127,7 @@ public final Observable<T> delay(long delay, TimeUnit unit) {
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
return lift(new OperatorDelay<T>(this, delay, unit, scheduler));
return lift(new OperatorDelay<T>(delay, unit, scheduler));
}

/**
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import rx.annotations.Experimental;
import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
Expand All @@ -32,6 +33,7 @@
import rx.functions.Func8;
import rx.functions.Func9;
import rx.internal.operators.OnSubscribeToObservableFuture;
import rx.internal.operators.OperatorDelay;
import rx.internal.operators.OperatorDoOnEach;
import rx.internal.operators.OperatorMap;
import rx.internal.operators.OperatorObserveOn;
Expand Down Expand Up @@ -1898,4 +1900,50 @@ public void onNext(T t) {

return lift(new OperatorDoOnEach<T>(observer));
}

/**
* Returns an Single that emits the items emitted by the source Single shifted forward in time by a
* specified delay. Error notifications from the source Single are not delayed.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param delay
* the delay to shift the source by
* @param unit
* the time unit of {@code delay}
* @param scheduler
* the {@link Scheduler} to use for delaying
* @return the source Single shifted in time by the specified delay
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
@Experimental
public final Single<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
return lift(new OperatorDelay<T>(delay, unit, scheduler));
}

/**
* Returns an Single that emits the items emitted by the source Single shifted forward in time by a
* specified delay. Error notifications from the source Observable are not delayed.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code delay} operates by default on the {@code compuation} {@link Scheduler}.</dd>
* </dl>
*
* @param delay
* the delay to shift the source by
* @param unit
* the {@link TimeUnit} in which {@code period} is defined
* @return the source Single shifted in time by the specified delay
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
*/
@Experimental
public final Single<T> delay(long delay, TimeUnit unit) {
return delay(delay, unit, Schedulers.computation());
}
}
4 changes: 1 addition & 3 deletions src/main/java/rx/internal/operators/OperatorDelay.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@
*/
public final class OperatorDelay<T> implements Operator<T, T> {

final Observable<? extends T> source;
final long delay;
final TimeUnit unit;
final Scheduler scheduler;

public OperatorDelay(Observable<? extends T> source, long delay, TimeUnit unit, Scheduler scheduler) {
this.source = source;
public OperatorDelay(long delay, TimeUnit unit, Scheduler scheduler) {
this.delay = delay;
this.unit = unit;
this.scheduler = scheduler;
Expand Down
45 changes: 43 additions & 2 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.TestScheduler;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subscriptions.Subscriptions;


public class SingleTest {

@Test
Expand Down Expand Up @@ -436,7 +438,7 @@ public void call() {
fail("timed out waiting for latch");
}
}

@Test
public void testBackpressureAsObservable() {
Single<String> s = Single.create(new OnSubscribe<String>() {
Expand All @@ -462,7 +464,7 @@ public void onStart() {

ts.assertValue("hello");
}

@Test
public void testToObservable() {
Observable<String> a = Single.just("a").toObservable();
Expand Down Expand Up @@ -648,4 +650,43 @@ public void doOnSuccessShouldNotSwallowExceptionThrownByAction() {

verify(action).call(eq("value"));
}

@Test
public void delayWithSchedulerShouldDelayCompletion() {
TestScheduler scheduler = new TestScheduler();
Single<Integer> single = Single.just(1).delay(100, TimeUnit.DAYS, scheduler);

TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
single.subscribe(subscriber);

subscriber.assertNotCompleted();
scheduler.advanceTimeBy(99, TimeUnit.DAYS);
subscriber.assertNotCompleted();
scheduler.advanceTimeBy(91, TimeUnit.DAYS);
subscriber.assertCompleted();
subscriber.assertValue(1);
}

@Test
public void delayWithSchedulerShouldShortCutWithFailure() {
TestScheduler scheduler = new TestScheduler();
final RuntimeException expected = new RuntimeException();
Single<Integer> single = Single.create(new OnSubscribe<Integer>() {
@Override
public void call(SingleSubscriber<? super Integer> singleSubscriber) {
singleSubscriber.onSuccess(1);
singleSubscriber.onError(expected);
}
}).delay(100, TimeUnit.DAYS, scheduler);

TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
single.subscribe(subscriber);

subscriber.assertNotCompleted();
scheduler.advanceTimeBy(99, TimeUnit.DAYS);
subscriber.assertNotCompleted();
scheduler.advanceTimeBy(91, TimeUnit.DAYS);
subscriber.assertNoValues();
subscriber.assertError(expected);
}
}

0 comments on commit 1fd245b

Please sign in to comment.