Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zip(iterable, selector) to RxScala #1247

Merged
merged 2 commits into from
May 27, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,31 @@ class RxScalaDemo extends JUnitSuite {
.toBlockingObservable.foreach(println(_))
}

/**
* This is a bad way of using `zip` with an `Iterable`: even if the consumer unsubscribes,
* some elements may still be pulled from `Iterable`.
*/
@Test def zipWithIterableBadExample() {
val o1 = Observable.interval(100 millis, IOScheduler()).map(_ * 100).take(3)
val o2 = Observable.from(0 until Int.MaxValue).doOnEach(i => println(i + " from o2"))
o1.zip(o2).toBlockingObservable.foreach(println(_))
}

/**
* This is a good way of using `zip` with an `Iterable`: if the consumer unsubscribes,
* no more elements will be pulled from `Iterable`.
*/
@Test def zipWithIterableGoodExample() {
val o1 = Observable.interval(100 millis, IOScheduler()).map(_ * 100).take(3)
val iter = (0 until Int.MaxValue).view.map {
i => {
println(i + " from iter")
i
}
}
o1.zip(iter).toBlockingObservable.foreach(println(_))
}

@Test def takeFirstWithCondition() {
val condition: Int => Boolean = _ >= 3
assertEquals(3, List(1, 2, 3, 4).toObservable.filter(condition).first.toBlockingObservable.single)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,42 @@ trait Observable[+T]
zipWith(that, (t: T, u: U) => (t, u))
}

/**
* Returns an Observable formed from `this` Observable and `other` Iterable by combining
* corresponding elements in pairs.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.i.png">
* <p>
* Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is
* not pre-consumed. This allows you to zip infinite streams on either side.
*
* @param other the Iterable sequence
* @return an Observable that pairs up values from the source Observable and the `other` Iterable.
*/
def zip[U](other: Iterable[U]): Observable[(T, U)] = {
zipWith(other, (t: T, u: U) => (t, u))
}

/**
* Returns an Observable that emits items that are the result of applying a specified function to pairs of
* values, one each from the source Observable and a specified Iterable sequence.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.i.png">
* <p>
* Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is
* not pre-consumed. This allows you to zip infinite streams on either side.
*
* @param other the Iterable sequence
* @param selector a function that combines the pairs of items from the Observable and the Iterable to generate
* the items to be emitted by the resulting Observable
* @return an Observable that pairs up values from the source Observable and the `other` Iterable
* sequence and emits the results of `selector` applied to these pairs
*/
def zipWith[U, R](other: Iterable[U], selector: (T, U) => R): Observable[R] = {
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
toScalaObservable[R](thisJava.zip(other.asJava, selector))
}

/**
* Returns an Observable formed from this Observable and another Observable by combining
* corresponding elements using the selector function.
Expand All @@ -353,7 +389,7 @@ trait Observable[+T]
* their index. Indices start at 0.
*/
def zipWithIndex: Observable[(T, Int)] = {
zip((0 until Int.MaxValue).toObservable)
zip(0 until Int.MaxValue)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ class CompletenessTest extends JUnitSuite {
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
"groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])",
"ignoreElements()" -> "[use `filter(_ => false)`]",
"lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])",
"limit(Int)" -> "take(Int)",
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]",
"multicast(Subject[_ >: T, _ <: R])" -> "multicast(Subject[R])",
"multicast(Func0[_ <: Subject[_ >: T, _ <: TIntermediate]], Func1[_ >: Observable[TIntermediate], _ <: Observable[TResult]])" -> "multicast(() => Subject[R], Observable[R] => Observable[U])",
Expand Down Expand Up @@ -118,6 +120,7 @@ class CompletenessTest extends JUnitSuite {
"skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)",
"skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary,
"skipUntil(Observable[U])" -> "dropUntil(Observable[E])",
"single(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).single`]",
"startWith(T)" -> "[use `item +: o`]",
"startWith(Array[T])" -> "[use `Observable.items(items) ++ o`]",
"startWith(Array[T], Scheduler)" -> "[use `Observable.items(items).subscribeOn(scheduler) ++ o`]",
Expand Down Expand Up @@ -151,6 +154,8 @@ class CompletenessTest extends JUnitSuite {
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"window(Long, Long, TimeUnit)" -> "window(Duration, Duration)",
"window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)",
"zip(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U], (T, U) => R)",
"zip(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U], (T, U) => R)",

// manually added entries for Java static methods
"average(Observable[Integer])" -> averageProblem,
Expand Down