Skip to content

Commit e1c5355

Browse files
Merge pull request ReactiveX#562 from samuelgruetter/RxJavaBugFixesSam
Scala Adaptor Improvements by Erik
2 parents 0f687d9 + d898af5 commit e1c5355

23 files changed

+361
-264
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 9 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ import org.scalatest.junit.JUnitSuite
3030

3131
import rx.lang.scala.Notification
3232
import rx.lang.scala.Observable
33-
import rx.lang.scala.observable
34-
import rx.lang.scala.concurrency.Schedulers
33+
import rx.lang.scala.concurrency._
3534

3635
@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
3736
class RxScalaDemo extends JUnitSuite {
@@ -167,21 +166,21 @@ class RxScalaDemo extends JUnitSuite {
167166
@Test def testTwoSubscriptionsToOneInterval() {
168167
val o = Observable.interval(100 millis).take(8)
169168
o.subscribe(
170-
i => println(s"${i}a (on thread #${Thread.currentThread().getId()})")
169+
i => println(s"${i}a (on thread #${Thread.currentThread().getId})")
171170
)
172171
o.subscribe(
173-
i => println(s"${i}b (on thread #${Thread.currentThread().getId()})")
172+
i => println(s"${i}b (on thread #${Thread.currentThread().getId})")
174173
)
175174
waitFor(o)
176175
}
177176

178177
@Test def schedulersExample() {
179178
val o = Observable.interval(100 millis).take(8)
180-
o.observeOn(Schedulers.newThread).subscribe(
181-
i => println(s"${i}a (on thread #${Thread.currentThread().getId()})")
179+
o.observeOn(NewThreadScheduler()).subscribe(
180+
i => println(s"${i}a (on thread #${Thread.currentThread().getId})")
182181
)
183-
o.observeOn(Schedulers.newThread).subscribe(
184-
i => println(s"${i}b (on thread #${Thread.currentThread().getId()})")
182+
o.observeOn(NewThreadScheduler()).subscribe(
183+
i => println(s"${i}b (on thread #${Thread.currentThread().getId})")
185184
)
186185
waitFor(o)
187186
}
@@ -357,13 +356,13 @@ class RxScalaDemo extends JUnitSuite {
357356
}
358357

359358
def square(x: Int): Int = {
360-
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId()}")
359+
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}")
361360
Thread.sleep(100) // calculating a square is heavy work :)
362361
x*x
363362
}
364363

365364
def work(o1: Observable[Int]): Observable[String] = {
366-
println(s"map() is being called on thread ${Thread.currentThread().getId()}")
365+
println(s"map() is being called on thread ${Thread.currentThread().getId}")
367366
o1.map(i => s"The square of $i is ${square(i)}")
368367
}
369368

@@ -428,40 +427,6 @@ class RxScalaDemo extends JUnitSuite {
428427
assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single)
429428
}
430429

431-
@Test def observableLikeFuture1() {
432-
implicit val scheduler = Schedulers.threadPoolForIO
433-
val o1 = observable {
434-
Thread.sleep(1000)
435-
5
436-
}
437-
val o2 = observable {
438-
Thread.sleep(500)
439-
4
440-
}
441-
Thread.sleep(500)
442-
val t1 = System.currentTimeMillis
443-
println((o1 merge o2).first.toBlockingObservable.single)
444-
println(System.currentTimeMillis - t1)
445-
}
446-
447-
@Test def observableLikeFuture2() {
448-
class Friend {}
449-
val session = new Object {
450-
def getFriends: List[Friend] = List(new Friend, new Friend)
451-
}
452-
453-
implicit val scheduler = Schedulers.threadPoolForIO
454-
val o: Observable[List[Friend]] = observable {
455-
session.getFriends
456-
}
457-
o.subscribe(
458-
friendList => println(friendList),
459-
err => println(err.getMessage)
460-
)
461-
462-
Thread.sleep(1500) // or convert to BlockingObservable
463-
}
464-
465430
@Test def takeWhileWithIndexAlternative {
466431
val condition = true
467432
Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class TestSchedulerExample extends JUnitSuite {
2828

2929
scheduler.advanceTimeTo(2 seconds)
3030

31-
val inOrdr = inOrder(observer);
31+
val inOrdr = inOrder(observer)
3232
inOrdr.verify(observer, times(1)).onNext(0L)
3333
inOrdr.verify(observer, times(1)).onNext(1L)
3434
inOrdr.verify(observer, never).onNext(2L)
@@ -37,7 +37,7 @@ class TestSchedulerExample extends JUnitSuite {
3737

3838
verify(observer, never).onNext(2L)
3939

40-
sub.unsubscribe();
40+
sub.unsubscribe()
4141

4242
scheduler.advanceTimeTo(4 seconds)
4343

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ package rx.lang.scala
1717

1818
import java.lang.Exception
1919
import java.{ lang => jlang }
20-
import rx.lang.scala._
21-
import rx.util.functions._
20+
2221
import scala.collection.Seq
23-
import java.{lang => jlang}
2422
import scala.language.implicitConversions
2523

24+
import rx.util.functions._
25+
2626
/**
2727
* These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
2828
* Most RxScala users won't need them, but they might be useful if one wants to use
@@ -32,10 +32,17 @@ import scala.language.implicitConversions
3232
object ImplicitFunctionConversions {
3333
import language.implicitConversions
3434

35+
// implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription): Func2[rx.Scheduler, T, rx.Subscription] with Object {def call(s: rx.Scheduler, t: T): rx.Subscription} =
36+
// new Func2[rx.Scheduler, T, rx.Subscription] {
37+
// def call(s: rx.Scheduler, t: T): rx.Subscription = {
38+
// action(rx.lang.scala.Scheduler(s), t).asJavaSubscription
39+
// }
40+
// }
41+
3542
implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription) =
3643
new Func2[rx.Scheduler, T, rx.Subscription] {
3744
def call(s: rx.Scheduler, t: T): rx.Subscription = {
38-
action(s, t).asJavaSubscription
45+
action(Scheduler(s), t).asJavaSubscription
3946
}
4047
}
4148

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ import rx.Observable.OnSubscribeFunc
2222
/**
2323
* The Observable interface that implements the Reactive Pattern.
2424
*
25-
* @param asJavaObservable the underlying Java observable
26-
*
2725
* @define subscribeObserverMain
2826
* Call this method to subscribe an [[rx.lang.scala.Observer]] for receiving
2927
* items and notifications from the Observable.
@@ -227,7 +225,6 @@ trait Observable[+T]
227225
* otherwise you'll get a compilation error.
228226
*
229227
* @usecase def concat[U]: Observable[U]
230-
* @inheritdoc
231228
*/
232229
def concat[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
233230
val o2: Observable[Observable[U]] = this
@@ -273,7 +270,19 @@ trait Observable[+T]
273270
* is the minumum of the number of `onNext` invocations of `this` and `that`.
274271
*/
275272
def zip[U](that: Observable[U]): Observable[(T, U)] = {
276-
Observable[(T, U)](rx.Observable.zip[T, U, (T, U)](this.asJavaObservable, that.asJavaObservable, (t: T, u: U) => (t, u)))
273+
zip(that, (t: T, u: U) => (t, u))
274+
}
275+
276+
/**
277+
* Returns an Observable formed from this Observable and another Observable by combining
278+
* corresponding elements using the selector function.
279+
* The number of `onNext` invocations of the resulting `Observable[(T, U)]`
280+
* is the minumum of the number of `onNext` invocations of `this` and `that`.
281+
*
282+
* Note that this function is private because Scala collections don't have such a function.
283+
*/
284+
private def zip[U, R](that: Observable[U], selector: (T,U) => R): Observable[R] = {
285+
Observable[R](rx.Observable.zip[T, U, R](this.asJavaObservable, that.asJavaObservable, selector))
277286
}
278287

279288
/**
@@ -1903,7 +1912,7 @@ object Observable {
19031912
/**
19041913
* Creates a new Scala Observable from a given Java Observable.
19051914
*/
1906-
def apply[T](observable: rx.Observable[_ <: T]): Observable[T] = {
1915+
private [scala] def apply[T](observable: rx.Observable[_ <: T]): Observable[T] = {
19071916
new Observable[T]{
19081917
def asJavaObservable = observable
19091918
}
@@ -1926,13 +1935,13 @@ object Observable {
19261935
*
19271936
*
19281937
* @tparam T
1929-
* the type of the items that this Observable emits
1938+
* the type of the items that this Observable emits.
19301939
* @param func
19311940
* a function that accepts an `Observer[T]`, invokes its `onNext`, `onError`, and `onCompleted` methods
19321941
* as appropriate, and returns a [[rx.lang.scala.Subscription]] to allow the Observer to
1933-
* canceling the subscription
1934-
* @return an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given
1935-
* function
1942+
* canceling the subscription.
1943+
* @return
1944+
* an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given function.
19361945
*/
19371946
def apply[T](func: Observer[T] => Subscription): Observable[T] = {
19381947
Observable[T](rx.Observable.create(new OnSubscribeFunc[T] {
@@ -1942,16 +1951,26 @@ object Observable {
19421951
}))
19431952
}
19441953

1954+
def create[T](func: Observer[T] => Subscription): Observable[T] = {
1955+
Observable[T](rx.Observable.create(new OnSubscribeFunc[T] {
1956+
def onSubscribe(t1: rx.Observer[_ >: T]): rx.Subscription = {
1957+
func(Observer(t1))
1958+
}
1959+
}))
1960+
}
1961+
19451962
/**
1946-
* Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] method when the Observer subscribes to it
1963+
* Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]]
1964+
* method when the Observer subscribes to it.
19471965
*
19481966
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/error.png">
19491967
*
19501968
* @param exception
19511969
* the particular error to report
19521970
* @tparam T
19531971
* the type of the items (ostensibly) emitted by the Observable
1954-
* @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] method when the Observer subscribes to it
1972+
* @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]]
1973+
* method when the Observer subscribes to it
19551974
*/
19561975
def apply[T](exception: Throwable): Observable[T] = {
19571976
Observable[T](rx.Observable.error(exception))

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package rx.lang.scala
1717

18+
import rx.joins.ObserverBase
19+
1820
/**
1921
Provides a mechanism for receiving push-based notifications.
2022
*
@@ -24,7 +26,11 @@ package rx.lang.scala
2426
*/
2527
trait Observer[-T] {
2628

27-
def asJavaObserver: rx.Observer[_ >: T]
29+
private [scala] def asJavaObserver: rx.Observer[_ >: T] = new ObserverBase[T] {
30+
protected def onCompletedCore(): Unit = onCompleted()
31+
protected def onErrorCore(error: Throwable): Unit = onError(error)
32+
protected def onNextCore(value: T): Unit = onNext(value)
33+
}
2834

2935
/**
3036
* Provides the Observer with new data.
@@ -33,30 +39,37 @@ trait Observer[-T] {
3339
*
3440
* The [[rx.lang.scala.Observable]] will not call this method again after it calls either `onCompleted` or `onError`.
3541
*/
36-
def onNext(value: T): Unit = asJavaObserver.onNext(value)
42+
def onNext(value: T): Unit
3743

3844
/**
3945
* Notifies the Observer that the [[rx.lang.scala.Observable]] has experienced an error condition.
4046
*
4147
* If the [[rx.lang.scala.Observable]] calls this method, it will not thereafter call `onNext` or `onCompleted`.
4248
*/
43-
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
49+
def onError(error: Throwable): Unit
4450

4551
/**
4652
* Notifies the Observer that the [[rx.lang.scala.Observable]] has finished sending push-based notifications.
4753
*
4854
* The [[rx.lang.scala.Observable]] will not call this method if it calls `onError`.
4955
*/
50-
def onCompleted(): Unit = asJavaObserver.onCompleted()
56+
def onCompleted(): Unit
5157

5258
}
5359

5460
object Observer {
55-
def apply[T](observer: rx.Observer[T]) : Observer[T] = {
56-
new Observer[T]() {
57-
def asJavaObserver: rx.Observer[_ >: T] = observer
58-
}
59-
}
60-
}
61+
/**
62+
* Assume that the underlying rx.Observer does not need to be wrapped.
63+
*/
64+
private [scala] def apply[T](observer: rx.Observer[T]) : Observer[T] = {
65+
new Observer[T] {
66+
67+
override def asJavaObserver = observer
6168

69+
def onNext(value: T): Unit = asJavaObserver.onNext(value)
70+
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
71+
def onCompleted(): Unit = asJavaObserver.onCompleted()
6272

73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)