diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5a0f668dc5..c19ec3c824 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -63,6 +63,7 @@ import rx.operators.OperationSample; import rx.operators.OperationScan; import rx.operators.OperationSkip; +import rx.operators.OperationSkipLast; import rx.operators.OperationSkipWhile; import rx.operators.OperationSubscribeOn; import rx.operators.OperationSum; @@ -4054,6 +4055,30 @@ public Observable skipWhile(Func1 predicate) { return create(OperationSkipWhile.skipWhile(this, predicate)); } + /** + * Bypasses a specified number of elements at the end of an observable + * sequence. + *

+ * This operator accumulates a queue with a length enough to store the first + * count elements. As more elements are received, elements are taken from + * the front of the queue and produced on the result sequence. This causes + * elements to be delayed. + * + * @param count + * number of elements to bypass at the end of the source + * sequence. + * @return An observable sequence containing the source sequence elements + * except for the bypassed ones at the end. + * + * @throws IndexOutOfBoundsException + * count is less than zero. + * + * @see MSDN: Observable.SkipLast + */ + public Observable skipLast(int count) { + return create(OperationSkipLast.skipLast(this, count)); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java b/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java new file mode 100644 index 0000000000..10da0c06b4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipLast.java @@ -0,0 +1,224 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Deque; +import java.util.LinkedList; +import java.util.concurrent.locks.ReentrantLock; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; + +/** + * Bypasses a specified number of elements at the end of an observable sequence. + */ +public class OperationSkipLast { + + /** + * Bypasses a specified number of elements at the end of an observable + * sequence. + *

+ * This operator accumulates a queue with a length enough to store the first + * count elements. As more elements are received, elements are taken from + * the front of the queue and produced on the result sequence. This causes + * elements to be delayed. + * + * @param source + * the source sequence. + * @param count + * number of elements to bypass at the end of the source + * sequence. + * @return An observable sequence containing the source sequence elements + * except for the bypassed ones at the end. + * + * @throws IndexOutOfBoundsException + * count is less than zero. + */ + public static OnSubscribeFunc skipLast( + Observable source, int count) { + return new SkipLast(source, count); + } + + private static class SkipLast implements OnSubscribeFunc { + private final int count; + private final Observable source; + + private SkipLast(Observable source, int count) { + this.count = count; + this.source = source; + } + + public Subscription onSubscribe(final Observer observer) { + if (count < 0) { + throw new IndexOutOfBoundsException( + "count could not be negative"); + } + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(source.subscribe(new Observer() { + + private final ReentrantLock lock = new ReentrantLock(); + + /** + * Store the last count elements until now. + */ + private final Deque deque = new LinkedList(); + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(T value) { + if (count == 0) { + // If count == 0, we do not need to put value into deque + // and remove it at once. We can emit the value + // directly. + try { + observer.onNext(value); + } catch (Throwable ex) { + observer.onError(ex); + subscription.unsubscribe(); + } + return; + } + lock.lock(); + try { + deque.offerLast(value); + if (deque.size() > count) { + // Now deque has count + 1 elements, so the first + // element in the deque definitely does not belong + // to the last count elements of the source + // sequence. We can emit it now. + observer.onNext(deque.removeFirst()); + } + } catch (Throwable ex) { + observer.onError(ex); + subscription.unsubscribe(); + } finally { + lock.unlock(); + } + } + + })); + } + } + + public static class UnitTest { + + @Test + public void testSkipLastEmpty() { + Observable w = Observable.empty(); + Observable observable = Observable.create(skipLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(any(String.class)); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testSkipLast1() { + Observable w = Observable.from("one", "two", "three"); + Observable observable = Observable.create(skipLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + InOrder inOrder = inOrder(aObserver); + observable.subscribe(aObserver); + inOrder.verify(aObserver, never()).onNext("two"); + inOrder.verify(aObserver, never()).onNext("three"); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testSkipLast2() { + Observable w = Observable.from("one", "two"); + Observable observable = Observable.create(skipLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(any(String.class)); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testSkipLastWithZeroCount() { + Observable w = Observable.from("one", "two"); + Observable observable = Observable.create(skipLast(w, 0)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testSkipLastWithNull() { + Observable w = Observable.from("one", null, "two"); + Observable observable = Observable.create(skipLast(w, 1)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, times(1)).onNext(null); + verify(aObserver, never()).onNext("two"); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testSkipLastWithNegativeCount() { + Observable w = Observable.from("one"); + Observable observable = Observable.create(skipLast(w, -1)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(any(String.class)); + verify(aObserver, times(1)).onError( + any(IndexOutOfBoundsException.class)); + verify(aObserver, never()).onCompleted(); + } + } +}