From ea575d0911b6b98b1b4ab537a97c6c56e26f96d9 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 26 Sep 2013 09:40:18 +0800 Subject: [PATCH 1/5] Merge branch 'master', remote-tracking branch 'origin' From 4323b704fee1c4339a5e77b2f9724f4c074d7086 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 29 Sep 2013 15:20:58 +0800 Subject: [PATCH 2/5] Implemented the 'SkipLast' operator --- rxjava-core/src/main/java/rx/Observable.java | 27 +++ .../java/rx/operators/OperationSkipLast.java | 197 ++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationSkipLast.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5a0f668dc5..bafedc29d8 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -63,6 +63,7 @@ import rx.operators.OperationSample; import rx.operators.OperationScan; import rx.operators.OperationSkip; +import rx.operators.OperationSkipLast; import rx.operators.OperationSkipWhile; import rx.operators.OperationSubscribeOn; import rx.operators.OperationSum; @@ -4054,6 +4055,32 @@ public Observable skipWhile(Func1 predicate) { return create(OperationSkipWhile.skipWhile(this, predicate)); } + /** + * Bypasses a specified number of elements at the end of an observable + * sequence. + *

+ * This operator accumulates a queue with a length enough to store the first + * count elements. As more elements are received, elements are taken from + * the front of the queue and produced on the result sequence. This causes + * elements to be delayed. + * + * @param source + * the source sequence. + * @param count + * number of elements to bypass at the end of the source + * sequence. + * @return An observable sequence containing the source sequence elements + * except for the bypassed ones at the end. + * + * @throws IndexOutOfBoundsException + * count is less than zero. + * + * @see MSDN: Observable.SkipLast + */ + public Observable skipLast(int count) { + return create(OperationSkipLast.skipLast(this, count)); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java b/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java new file mode 100644 index 0000000000..73e0014b22 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java @@ -0,0 +1,197 @@ +package rx.operators; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Deque; +import java.util.LinkedList; +import java.util.concurrent.locks.ReentrantLock; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; + +/** + * Bypasses a specified number of elements at the end of an observable sequence. + */ +public class OperationSkipLast { + + /** + * Bypasses a specified number of elements at the end of an observable + * sequence. + *

+ * This operator accumulates a queue with a length enough to store the first + * count elements. As more elements are received, elements are taken from + * the front of the queue and produced on the result sequence. This causes + * elements to be delayed. + * + * @param source + * the source sequence. + * @param count + * number of elements to bypass at the end of the source + * sequence. + * @return An observable sequence containing the source sequence elements + * except for the bypassed ones at the end. + * + * @throws IndexOutOfBoundsException + * count is less than zero. + */ + public static OnSubscribeFunc skipLast( + Observable source, int count) { + return new SkipLast(source, count); + } + + private static class SkipLast implements OnSubscribeFunc { + private final int count; + private final Observable source; + + private SkipLast(Observable source, int count) { + this.count = count; + this.source = source; + } + + public Subscription onSubscribe(final Observer observer) { + if (count < 0) { + throw new IndexOutOfBoundsException( + "count could not be negative"); + } + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(source.subscribe(new Observer() { + + private final ReentrantLock lock = new ReentrantLock(); + + /** + * Store the last count elements until now. + */ + private final Deque deque = new LinkedList(); + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(T value) { + lock.lock(); + try { + deque.offerLast(value); + if (deque.size() > count) { + // Now deque has count + 1 elements, so the first + // element in the deque definitely does not belong + // to the last count elements of the source + // sequence. We can emit it now. + observer.onNext(deque.removeFirst()); + } + } catch (Throwable ex) { + observer.onError(ex); + subscription.unsubscribe(); + } finally { + lock.unlock(); + } + } + + })); + } + } + + public static class UnitTest { + + @Test + public void testSkipLastEmpty() { + Observable w = Observable.empty(); + Observable observable = Observable.create(skipLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(any(String.class)); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testSkipLast1() { + Observable w = Observable.from("one", "two", "three"); + Observable observable = Observable.create(skipLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + InOrder inOrder = inOrder(aObserver); + observable.subscribe(aObserver); + inOrder.verify(aObserver, never()).onNext("two"); + inOrder.verify(aObserver, never()).onNext("three"); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testSkipLast2() { + Observable w = Observable.from("one", "two"); + Observable observable = Observable.create(skipLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(any(String.class)); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testSkipLastWithZeroCount() { + Observable w = Observable.from("one", "two"); + Observable observable = Observable.create(skipLast(w, 0)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testSkipLastWithNull() { + Observable w = Observable.from("one", null, "two"); + Observable observable = Observable.create(skipLast(w, 1)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext(null); + verify(aObserver, never()).onNext("two"); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testSkipLastWithNegativeCount() { + Observable w = Observable.from("one"); + Observable observable = Observable.create(skipLast(w, -1)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(any(String.class)); + verify(aObserver, times(1)).onError( + any(IndexOutOfBoundsException.class)); + verify(aObserver, never()).onCompleted(); + } + } +} From 9abfb5a1d6abee7c3f54416bac595ff1d22adcb5 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 29 Sep 2013 15:23:16 +0800 Subject: [PATCH 3/5] Updated the comments --- rxjava-core/src/main/java/rx/Observable.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index bafedc29d8..c19ec3c824 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -4064,8 +4064,6 @@ public Observable skipWhile(Func1 predicate) { * the front of the queue and produced on the result sequence. This causes * elements to be delayed. * - * @param source - * the source sequence. * @param count * number of elements to bypass at the end of the source * sequence. From b46b5b35ff36064d77e80a66850677e357d183d0 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 29 Sep 2013 16:40:26 +0800 Subject: [PATCH 4/5] Optimize for 'count == 0' --- .../main/java/rx/operators/OperationSkipLast.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java b/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java index 73e0014b22..f8518b21df 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java @@ -85,6 +85,18 @@ public void onError(Throwable e) { @Override public void onNext(T value) { + if (count == 0) { + // If count == 0, we do not need to put value into deque + // and remove it at once. We can emit the value + // directly. + try { + observer.onNext(value); + } catch (Throwable ex) { + observer.onError(ex); + subscription.unsubscribe(); + } + return; + } lock.lock(); try { deque.offerLast(value); From be3b3700ddc3839c1dceb73d790cf3528fa0abf1 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 30 Sep 2013 16:11:29 +0800 Subject: [PATCH 5/5] Added missing license header --- .../main/java/rx/operators/OperationSkipLast.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java b/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java index f8518b21df..10da0c06b4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; import static org.mockito.Matchers.any;