From 6a858c4578cf05a79f993d3282b2ac6299a2c401 Mon Sep 17 00:00:00 2001 From: MarioAriasC Date: Wed, 16 Apr 2014 02:33:14 +0100 Subject: [PATCH 1/3] Kotlin M7, minor fixes, compatibility with 0.17.0 --- language-adaptors/rxjava-kotlin/build.gradle | 4 ++-- .../main/kotlin/rx/lang/kotlin/namespace.kt | 11 ++++++--- .../kotlin/rx/lang/kotlin/BasicKotlinTests.kt | 23 +++++++++---------- .../kotlin/rx/lang/kotlin/ExtensionTests.kt | 15 +++++------- 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/language-adaptors/rxjava-kotlin/build.gradle b/language-adaptors/rxjava-kotlin/build.gradle index aa44049d05..fc710d89fb 100644 --- a/language-adaptors/rxjava-kotlin/build.gradle +++ b/language-adaptors/rxjava-kotlin/build.gradle @@ -4,7 +4,7 @@ buildscript { } dependencies { - classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:0.6.1673' + classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:0.7.270' } } @@ -13,7 +13,7 @@ apply plugin: 'osgi' dependencies { compile project(':rxjava-core') - compile 'org.jetbrains.kotlin:kotlin-stdlib:0.6.1673' + compile 'org.jetbrains.kotlin:kotlin-stdlib:0.7.270' provided 'junit:junit-dep:4.10' provided 'org.mockito:mockito-core:1.8.5' } diff --git a/language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/namespace.kt b/language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/namespace.kt index d5eb8420f5..a333c3062d 100644 --- a/language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/namespace.kt +++ b/language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/namespace.kt @@ -21,11 +21,16 @@ import rx.Observable import rx.Observable.OnSubscribe import rx.Subscription import rx.Observable.OnSubscribeFunc +import rx.Subscriber -public fun Function1, Unit>.asObservable(): Observable { - return Observable.create(OnSubscribe{ t1 -> - this(t1!!) +public fun Function1, Unit>.asObservable(): Observable { + val v = this + return Observable.create(object:OnSubscribe { + override fun call(t1: Subscriber?) { + v(t1!!) + } + })!! } diff --git a/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt b/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt index cf4d3787b3..4237ab10d1 100644 --- a/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt +++ b/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt @@ -16,10 +16,7 @@ package rx.lang.kotlin -import org.mockito.Mock import rx.Observable -import org.junit.Before -import org.mockito.MockitoAnnotations import org.junit.Test import rx.subscriptions.Subscriptions import org.mockito.Mockito.* @@ -31,21 +28,23 @@ import rx.Subscription import kotlin.concurrent.thread import rx.Observable.OnSubscribeFunc import rx.lang.kotlin.BasicKotlinTests.AsyncObservable +import rx.Observable.OnSubscribe +import rx.Subscriber /** * This class use plain Kotlin without extensions from the language adaptor */ -public class BasicKotlinTests:KotlinTests() { - +public class BasicKotlinTests : KotlinTests() { [Test] public fun testCreate() { - Observable.create(OnSubscribeFunc { - it!!.onNext("Hello") - it.onCompleted() - Subscriptions.empty() + Observable.create(object:OnSubscribe { + override fun call(subscriber: Subscriber?) { + subscriber!!.onNext("Hello") + subscriber.onCompleted() + } })!!.subscribe { result -> a!!.received(result) } @@ -310,7 +309,7 @@ public class BasicKotlinTests:KotlinTests() { - public class TestFactory(){ + public class TestFactory() { var counter = 1 val numbers: Observable @@ -330,7 +329,7 @@ public class BasicKotlinTests:KotlinTests() { } - class AsyncObservable : OnSubscribeFunc{ + class AsyncObservable : OnSubscribeFunc { override fun onSubscribe(op: Observer?): Subscription? { thread { Thread.sleep(50) @@ -343,7 +342,7 @@ public class BasicKotlinTests:KotlinTests() { } } - class TestOnSubscribe(val count: Int) : OnSubscribeFunc{ + class TestOnSubscribe(val count: Int) : OnSubscribeFunc { override fun onSubscribe(op: Observer?): Subscription? { op!!.onNext("hello_$count") op.onCompleted() diff --git a/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt b/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt index c9fa67a6a1..614dc9e970 100644 --- a/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt +++ b/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt @@ -16,10 +16,7 @@ package rx.lang.kotlin -import org.mockito.Mock import rx.Observable -import org.junit.Before -import org.mockito.MockitoAnnotations import org.junit.Test import rx.subscriptions.Subscriptions import org.mockito.Mockito.* @@ -29,6 +26,7 @@ import org.junit.Assert.* import rx.Notification import rx.Subscription import kotlin.concurrent.thread +import rx.Subscriber /** * This class contains tests using the extension functions provided by the language adaptor. @@ -39,11 +37,10 @@ public class ExtensionTests : KotlinTests() { [Test] public fun testCreate() { - {(observer: Observer) -> - observer.onNext("Hello") - observer.onCompleted() - Subscriptions.empty()!! - }.asObservableFunc().subscribe { result -> + {(subscriber: Subscriber) -> + subscriber.onNext("Hello") + subscriber.onCompleted() + }.asObservable().subscribe { result -> a!!.received(result) } @@ -283,7 +280,7 @@ public class ExtensionTests : KotlinTests() { return {(p2: P2) -> this(p1, p2) } } - inner public class TestFactory(){ + inner public class TestFactory() { var counter = 1 val numbers: Observable From 1a368734010ac371e12b182ba040efca5808f12b Mon Sep 17 00:00:00 2001 From: MarioAriasC Date: Wed, 16 Apr 2014 02:49:40 +0100 Subject: [PATCH 2/3] Test with full compatibility with 0.17.0 --- .../kotlin/rx/lang/kotlin/BasicKotlinTests.kt | 11 ++++--- .../kotlin/rx/lang/kotlin/ExtensionTests.kt | 29 ++++++++----------- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt b/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt index 4237ab10d1..99d6b4278b 100644 --- a/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt +++ b/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt @@ -329,8 +329,8 @@ public class BasicKotlinTests : KotlinTests() { } - class AsyncObservable : OnSubscribeFunc { - override fun onSubscribe(op: Observer?): Subscription? { + class AsyncObservable : OnSubscribe { + override fun call(op: Subscriber?) { thread { Thread.sleep(50) op!!.onNext(1) @@ -338,15 +338,14 @@ public class BasicKotlinTests : KotlinTests() { op.onNext(3) op.onCompleted() } - return Subscriptions.empty() + } } - class TestOnSubscribe(val count: Int) : OnSubscribeFunc { - override fun onSubscribe(op: Observer?): Subscription? { + class TestOnSubscribe(val count: Int) : OnSubscribe { + override fun call(op: Subscriber?) { op!!.onNext("hello_$count") op.onCompleted() - return Subscriptions.empty()!! } } diff --git a/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt b/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt index 614dc9e970..c601bd54a1 100644 --- a/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt +++ b/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt @@ -18,13 +18,10 @@ package rx.lang.kotlin import rx.Observable import org.junit.Test -import rx.subscriptions.Subscriptions import org.mockito.Mockito.* import org.mockito.Matchers.* -import rx.Observer import org.junit.Assert.* import rx.Notification -import rx.Subscription import kotlin.concurrent.thread import rx.Subscriber @@ -213,7 +210,7 @@ public class ExtensionTests : KotlinTests() { [Test] public fun testForEach() { - asyncObservable.asObservableFunc().toBlockingObservable()!!.forEach(received()) + asyncObservable.asObservable().toBlockingObservable()!!.forEach(received()) verify(a, times(1))!!.received(1) verify(a, times(1))!!.received(2) verify(a, times(1))!!.received(3) @@ -221,7 +218,7 @@ public class ExtensionTests : KotlinTests() { [Test(expected = javaClass())] public fun testForEachWithError() { - asyncObservable.asObservableFunc().toBlockingObservable()!!.forEach { throw RuntimeException("err") } + asyncObservable.asObservable().toBlockingObservable()!!.forEach { throw RuntimeException("err") } fail("we expect an exception to be thrown") } @@ -256,21 +253,19 @@ public class ExtensionTests : KotlinTests() { assertEquals(listOf(3, 6, 9), values[2]) } - val funOnSubscribe: (Int, Observer) -> Subscription = { counter, observer -> - observer.onNext("hello_$counter") - observer.onCompleted() - Subscriptions.empty()!! + val funOnSubscribe: (Int, Subscriber) -> Unit = { counter, subscriber -> + subscriber.onNext("hello_$counter") + subscriber.onCompleted() } - val asyncObservable: (Observer) -> Subscription = { observer -> + val asyncObservable: (Subscriber) -> Unit = { subscriber -> thread { Thread.sleep(50) - observer.onNext(1) - observer.onNext(2) - observer.onNext(3) - observer.onCompleted() + subscriber.onNext(1) + subscriber.onNext(2) + subscriber.onNext(3) + subscriber.onCompleted() } - Subscriptions.empty()!! } /** @@ -288,14 +283,14 @@ public class ExtensionTests : KotlinTests() { return listOf(1, 3, 2, 5, 4).asObservable() } - val onSubscribe: (Observer) -> Subscription + val onSubscribe: (Subscriber) -> Unit get(){ return funOnSubscribe.partially1(counter++) } val observable: Observable get(){ - return onSubscribe.asObservableFunc() + return onSubscribe.asObservable() } } From a26a6d03e484efb3ffdbe8037a88ba09ae28cbd1 Mon Sep 17 00:00:00 2001 From: MarioAriasC Date: Wed, 16 Apr 2014 09:31:16 +0100 Subject: [PATCH 3/3] Update README --- language-adaptors/rxjava-kotlin/README.md | 32 +++++++++++++------ .../main/kotlin/rx/lang/kotlin/namespace.kt | 4 +-- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/language-adaptors/rxjava-kotlin/README.md b/language-adaptors/rxjava-kotlin/README.md index c0eb072c26..037d759654 100644 --- a/language-adaptors/rxjava-kotlin/README.md +++ b/language-adaptors/rxjava-kotlin/README.md @@ -3,25 +3,37 @@ Kotlin has support for SAM (Single Abstract Method) Interfaces as Functions (i.e. Java 8 Lambdas). So you could use Kotlin in RxJava whitout this adaptor ```kotlin -Observable.create(OnSubscribeFunc { - it!!.onNext("Hello") - it.onCompleted() +Observable.create(OnSubscribeFunc { observer -> + observer!!.onNext("Hello") + observer.onCompleted() Subscriptions.empty() })!!.subscribe { result -> a!!.received(result) } ``` -This adaptor exposes a set of Extension functions that allow a more idiomatic Kotlin usage +In RxJava [0.17.0](https://github.com/Netflix/RxJava/releases/tag/0.17.0) version a new Subscriber type was included ```kotlin -import rx.lang.kotlin.* +Observable.create(object:OnSubscribe { + override fun call(subscriber: Subscriber?) { + subscriber!!.onNext("Hello") + subscriber.onCompleted() + } +})!!.subscribe { result -> + a!!.received(result) +} +``` -{(observer: Observer) -> - observer.onNext("Hello") - observer.onCompleted() - Subscriptions.empty()!! -}.asObservableFunc().subscribe { result -> +(Due to a [bug in Kotlin's compiler](http://youtrack.jetbrains.com/issue/KT-4753) you can't use SAM with OnSubscribe) + +This adaptor exposes a set of Extension functions that allow a more idiomatic Kotlin usage + +```kotlin +{(subscriber: Subscriber) -> + subscriber.onNext("Hello") + subscriber.onCompleted() +}.asObservable().subscribe { result -> a!!.received(result) } ``` diff --git a/language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/namespace.kt b/language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/namespace.kt index a333c3062d..006f97e48a 100644 --- a/language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/namespace.kt +++ b/language-adaptors/rxjava-kotlin/src/main/kotlin/rx/lang/kotlin/namespace.kt @@ -25,15 +25,15 @@ import rx.Subscriber public fun Function1, Unit>.asObservable(): Observable { - val v = this return Observable.create(object:OnSubscribe { override fun call(t1: Subscriber?) { - v(t1!!) + this@asObservable(t1!!) } })!! } +[deprecated("Use Function1, Unit>.asObservable()")] public fun Function1, Subscription>.asObservableFunc(): Observable { return Observable.create(OnSubscribeFunc{ op -> this(op!!)