Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kotlin M7 and full compatibility with 0.17.0 #1042

Merged
merged 3 commits into from
Apr 16, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions language-adaptors/rxjava-kotlin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,37 @@
Kotlin has support for SAM (Single Abstract Method) Interfaces as Functions (i.e. Java 8 Lambdas). So you could use Kotlin in RxJava whitout this adaptor

```kotlin
Observable.create(OnSubscribeFunc<String> {
it!!.onNext("Hello")
it.onCompleted()
Observable.create(OnSubscribeFunc<String> { observer ->
observer!!.onNext("Hello")
observer.onCompleted()
Subscriptions.empty()
})!!.subscribe { result ->
a!!.received(result)
}
```

This adaptor exposes a set of Extension functions that allow a more idiomatic Kotlin usage
In RxJava [0.17.0](https://github.com/Netflix/RxJava/releases/tag/0.17.0) version a new Subscriber type was included

```kotlin
import rx.lang.kotlin.*
Observable.create(object:OnSubscribe<String> {
override fun call(subscriber: Subscriber<in String>?) {
subscriber!!.onNext("Hello")
subscriber.onCompleted()
}
})!!.subscribe { result ->
a!!.received(result)
}
```

{(observer: Observer<in String>) ->
observer.onNext("Hello")
observer.onCompleted()
Subscriptions.empty()!!
}.asObservableFunc().subscribe { result ->
(Due to a [bug in Kotlin's compiler](http://youtrack.jetbrains.com/issue/KT-4753) you can't use SAM with OnSubscribe)

This adaptor exposes a set of Extension functions that allow a more idiomatic Kotlin usage

```kotlin
{(subscriber: Subscriber<in String>) ->
subscriber.onNext("Hello")
subscriber.onCompleted()
}.asObservable().subscribe { result ->
a!!.received(result)
}
```
Expand Down
4 changes: 2 additions & 2 deletions language-adaptors/rxjava-kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ buildscript {
}

dependencies {
classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:0.6.1673'
classpath 'org.jetbrains.kotlin:kotlin-gradle-plugin:0.7.270'
}
}

Expand All @@ -13,7 +13,7 @@ apply plugin: 'osgi'

dependencies {
compile project(':rxjava-core')
compile 'org.jetbrains.kotlin:kotlin-stdlib:0.6.1673'
compile 'org.jetbrains.kotlin:kotlin-stdlib:0.7.270'
provided 'junit:junit-dep:4.10'
provided 'org.mockito:mockito-core:1.8.5'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@ import rx.Observable
import rx.Observable.OnSubscribe
import rx.Subscription
import rx.Observable.OnSubscribeFunc
import rx.Subscriber


public fun<T> Function1<Observer<in T>, Unit>.asObservable(): Observable<T> {
return Observable.create(OnSubscribe<T>{ t1 ->
this(t1!!)
public fun<T> Function1<Subscriber<in T>, Unit>.asObservable(): Observable<T> {
return Observable.create(object:OnSubscribe<T> {
override fun call(t1: Subscriber<in T>?) {
this@asObservable(t1!!)
}

})!!
}

[deprecated("Use Function1<Subscriber<in T>, Unit>.asObservable()")]
public fun<T> Function1<Observer<in T>, Subscription>.asObservableFunc(): Observable<T> {
return Observable.create(OnSubscribeFunc<T>{ op ->
this(op!!)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

package rx.lang.kotlin

import org.mockito.Mock
import rx.Observable
import org.junit.Before
import org.mockito.MockitoAnnotations
import org.junit.Test
import rx.subscriptions.Subscriptions
import org.mockito.Mockito.*
Expand All @@ -31,21 +28,23 @@ import rx.Subscription
import kotlin.concurrent.thread
import rx.Observable.OnSubscribeFunc
import rx.lang.kotlin.BasicKotlinTests.AsyncObservable
import rx.Observable.OnSubscribe
import rx.Subscriber

/**
* This class use plain Kotlin without extensions from the language adaptor
*/
public class BasicKotlinTests:KotlinTests() {

public class BasicKotlinTests : KotlinTests() {


[Test]
public fun testCreate() {

Observable.create(OnSubscribeFunc<String> {
it!!.onNext("Hello")
it.onCompleted()
Subscriptions.empty()
Observable.create(object:OnSubscribe<String> {
override fun call(subscriber: Subscriber<in String>?) {
subscriber!!.onNext("Hello")
subscriber.onCompleted()
}
})!!.subscribe { result ->
a!!.received(result)
}
Expand Down Expand Up @@ -310,7 +309,7 @@ public class BasicKotlinTests:KotlinTests() {



public class TestFactory(){
public class TestFactory() {
var counter = 1

val numbers: Observable<Int>
Expand All @@ -330,24 +329,23 @@ public class BasicKotlinTests:KotlinTests() {

}

class AsyncObservable : OnSubscribeFunc<Int>{
override fun onSubscribe(op: Observer<in Int>?): Subscription? {
class AsyncObservable : OnSubscribe<Int> {
override fun call(op: Subscriber<in Int>?) {
thread {
Thread.sleep(50)
op!!.onNext(1)
op.onNext(2)
op.onNext(3)
op.onCompleted()
}
return Subscriptions.empty()

}
}

class TestOnSubscribe(val count: Int) : OnSubscribeFunc<String>{
override fun onSubscribe(op: Observer<in String>?): Subscription? {
class TestOnSubscribe(val count: Int) : OnSubscribe<String> {
override fun call(op: Subscriber<in String>?) {
op!!.onNext("hello_$count")
op.onCompleted()
return Subscriptions.empty()!!
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@

package rx.lang.kotlin

import org.mockito.Mock
import rx.Observable
import org.junit.Before
import org.mockito.MockitoAnnotations
import org.junit.Test
import rx.subscriptions.Subscriptions
import org.mockito.Mockito.*
import org.mockito.Matchers.*
import rx.Observer
import org.junit.Assert.*
import rx.Notification
import rx.Subscription
import kotlin.concurrent.thread
import rx.Subscriber

/**
* This class contains tests using the extension functions provided by the language adaptor.
Expand All @@ -39,11 +34,10 @@ public class ExtensionTests : KotlinTests() {
[Test]
public fun testCreate() {

{(observer: Observer<in String>) ->
observer.onNext("Hello")
observer.onCompleted()
Subscriptions.empty()!!
}.asObservableFunc().subscribe { result ->
{(subscriber: Subscriber<in String>) ->
subscriber.onNext("Hello")
subscriber.onCompleted()
}.asObservable().subscribe { result ->
a!!.received(result)
}

Expand Down Expand Up @@ -216,15 +210,15 @@ public class ExtensionTests : KotlinTests() {

[Test]
public fun testForEach() {
asyncObservable.asObservableFunc().toBlockingObservable()!!.forEach(received())
asyncObservable.asObservable().toBlockingObservable()!!.forEach(received())
verify(a, times(1))!!.received(1)
verify(a, times(1))!!.received(2)
verify(a, times(1))!!.received(3)
}

[Test(expected = javaClass<RuntimeException>())]
public fun testForEachWithError() {
asyncObservable.asObservableFunc().toBlockingObservable()!!.forEach { throw RuntimeException("err") }
asyncObservable.asObservable().toBlockingObservable()!!.forEach { throw RuntimeException("err") }
fail("we expect an exception to be thrown")
}

Expand Down Expand Up @@ -259,21 +253,19 @@ public class ExtensionTests : KotlinTests() {
assertEquals(listOf(3, 6, 9), values[2])
}

val funOnSubscribe: (Int, Observer<in String>) -> Subscription = { counter, observer ->
observer.onNext("hello_$counter")
observer.onCompleted()
Subscriptions.empty()!!
val funOnSubscribe: (Int, Subscriber<in String>) -> Unit = { counter, subscriber ->
subscriber.onNext("hello_$counter")
subscriber.onCompleted()
}

val asyncObservable: (Observer<in Int>) -> Subscription = { observer ->
val asyncObservable: (Subscriber<in Int>) -> Unit = { subscriber ->
thread {
Thread.sleep(50)
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()
subscriber.onNext(1)
subscriber.onNext(2)
subscriber.onNext(3)
subscriber.onCompleted()
}
Subscriptions.empty()!!
}

/**
Expand All @@ -283,22 +275,22 @@ public class ExtensionTests : KotlinTests() {
return {(p2: P2) -> this(p1, p2) }
}

inner public class TestFactory(){
inner public class TestFactory() {
var counter = 1

val numbers: Observable<Int>
get(){
return listOf(1, 3, 2, 5, 4).asObservable()
}

val onSubscribe: (Observer<in String>) -> Subscription
val onSubscribe: (Subscriber<in String>) -> Unit
get(){
return funOnSubscribe.partially1(counter++)
}

val observable: Observable<String>
get(){
return onSubscribe.asObservableFunc()
return onSubscribe.asObservable()
}

}
Expand Down