Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize OperatorSkipLastTimed #1065

Merged
merged 1 commit into from
Apr 24, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,7 @@ trait Observable[+T]
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.t.png">
*
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
* Note: this action will cache the latest items arriving in the specified time window.
*
* @param time the length of the time window
* @return an Observable that drops those items emitted by the source Observable in a time window before the
Expand All @@ -1316,7 +1316,7 @@ trait Observable[+T]
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.ts.png">
*
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
* Note: this action will cache the latest items arriving in the specified time window.
*
* @param time the length of the time window
* @param scheduler the scheduler used as the time source
Expand Down
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5567,7 +5567,7 @@ public final Observable<T> skipLast(int count) {
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.t.png">
*
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
* Note: this action will cache the latest items arriving in the specified time window.
*
* @param time
* the length of the time window
Expand All @@ -5588,7 +5588,7 @@ public final Observable<T> skipLast(long time, TimeUnit unit) {
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/skipLast.ts.png">
*
* Note: this action will cache all items until "onCompleted" arrives. So don't use it on an infinite Observable.
* Note: this action will cache the latest items arriving in the specified time window.
*
* @param time
* the length of the time window
Expand Down
45 changes: 21 additions & 24 deletions rxjava-core/src/main/java/rx/operators/OperatorSkipLastTimed.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
*/
package rx.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;

import rx.Observable.Operator;
Expand All @@ -42,39 +41,37 @@ public OperatorSkipLastTimed(long time, TimeUnit unit, Scheduler scheduler) {
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
return new Subscriber<T>(subscriber) {

private List<Timestamped<T>> buffer = new ArrayList<Timestamped<T>>();
private Deque<Timestamped<T>> buffer = new ArrayDeque<Timestamped<T>>();

private void emitItemsOutOfWindow(long now) {
long limit = now - timeInMillis;
while (!buffer.isEmpty()) {
Timestamped<T> v = buffer.getFirst();
if (v.getTimestampMillis() < limit) {
buffer.removeFirst();
subscriber.onNext(v.getValue());
} else {
break;
}
}
}

@Override
public void onNext(T value) {
buffer.add(new Timestamped<T>(scheduler.now(), value));
long now = scheduler.now();
emitItemsOutOfWindow(now);
buffer.offerLast(new Timestamped<T>(now, value));
}

@Override
public void onError(Throwable e) {
buffer = Collections.emptyList();
subscriber.onError(e);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we need to call emitItemsOutOfWindow in onError. Which one is more reasonable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm doing buffer now and they all emit buffered data in case of onError.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested in Rx.Net.

            var o = Observable.Create<int>(observer => {
                observer.OnNext(2);
                Thread.Sleep(2000);
                observer.OnError(new Exception("test"));
                return Disposable.Empty;
            });
            o.SkipLast(TimeSpan.FromMilliseconds(100)).ObserveOn(Scheduler.NewThread).Subscribe(
                next=>Console.WriteLine(next),
                e => Console.WriteLine(e),
                () => Console.WriteLine("onCompleted")
                );
            Console.ReadLine();

The above codes only output

System.Exception: test

So Rx.Net only emits onError and drops the buffer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When onError occurs it immediately emits and does not work any further work.

We had this discussion a while back when debating delay I think.

Rx Design Guideline 6.6

6.6. OnError messages should have abort semantics

As normal control flow in .NET uses abort semantics for exceptions (the stack is unwound, current code path is interrupted), Rx mimics this behavior. To ensure this behavior, no messages should be sent out by an operator once one of it sources has an error message or an exception is thrown within the operator.

...

In this sample, a buffering operator will abandon the observable sequence as soon as the subscription to source encounters an error. The current buffer is not sent to any subscribers, maintain abort semantics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for clarification. Then this PR should be ready to merge.

}

@Override
public void onCompleted() {
long limit = scheduler.now() - timeInMillis;
try {
for (Timestamped<T> v : buffer) {
if (v.getTimestampMillis() < limit) {
try {
subscriber.onNext(v.getValue());
} catch (Throwable t) {
subscriber.onError(t);
return;
}
} else {
break;
}
}
subscriber.onCompleted();
} finally {
buffer = Collections.emptyList();
}
emitItemsOutOfWindow(scheduler.now());
subscriber.onCompleted();
}

};
Expand Down