Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ObservableValue<T>.asFlow() #1789

Merged
merged 6 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
public final class kotlinx/coroutines/javafx/JavaFxConvertKt {
public static final fun asFlow (Ljavafx/beans/value/ObservableValue;)Lkotlinx/coroutines/flow/Flow;
}

public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay {
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
Expand Down
41 changes: 41 additions & 0 deletions ui/kotlinx-coroutines-javafx/src/JavaFxConvert.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.javafx
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved

import javafx.beans.value.ChangeListener
import javafx.beans.value.ObservableValue
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.*

/**
* Creates an instance of a cold [Flow] that subscribes to the given [ObservableValue] and emits
* its values as they change. The resulting flow is conflated, meaning that if several values arrive in quick
* succession, only the last one will be emitted.
* Since this implementation uses [ObservableValue.addListener], even if this [ObservableValue]
* supports lazy evaluation, eager computation will be enforced while the flow is being collected.
* All the calls to JavaFX API are performed in [Dispatchers.JavaFx].
* This flow emits at least the initial value.
*
* ### Operator fusion
*
* Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `asFlow` are fused.
* [conflate] has no effect, as this flow is already conflated; one can use [buffer] to change that instead.
*/
@ExperimentalCoroutinesApi
public fun <T> ObservableValue<T>.asFlow(): Flow<T> = callbackFlow<T> {
val listener = ChangeListener<T> { _, _, newValue ->
try {
offer(newValue)
} catch (e: CancellationException) {
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
// In case the event fires after the channel is closed
}
}
addListener(listener)
send(value)
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
awaitClose {
removeListener(listener)
}
}.flowOn(Dispatchers.JavaFx).conflate()
2 changes: 1 addition & 1 deletion ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private class PulseTimer : AnimationTimer() {
}
}

/** @return [true] if initialized successfully, and [false] if no display is detected */
/** @return true if initialized successfully, and false if no display is detected */
internal fun initPlatform(): Boolean = PlatformInitializer.success

// Lazily try to initialize JavaFx platform just once
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import javafx.application.*
import kotlinx.coroutines.*
import org.junit.*

class JavaFxTest : TestBase() {
class JavaFxDispatcherTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
Expand Down
86 changes: 86 additions & 0 deletions ui/kotlinx-coroutines-javafx/test/JavaFxObservableAsFlowTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package kotlinx.coroutines.javafx

import javafx.beans.property.SimpleIntegerProperty
import kotlinx.coroutines.TestBase
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.Before
import org.junit.Test
import kotlin.test.*


class JavaFxObservableAsFlowTest : TestBase() {

@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
}

@Test
fun testFlowOrder() = runTest {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return@runTest // ignore test in headless environments
}

val integerProperty = SimpleIntegerProperty(0)
val n = 1000
val flow = integerProperty.asFlow().takeWhile { j -> j != n }
newSingleThreadContext("setter").use { pool ->
launch(pool) {
for (i in 1..n) {
launch(Dispatchers.JavaFx) {
integerProperty.set(i)
}
}
}
var i = -1
flow.collect { j ->
assertTrue(i < (j as Int), "Elements are neither repeated nor shuffled")
i = j
}
}
}

@Test
fun testConflation() = runTest {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return@runTest // ignore test in headless environments
}

withContext(Dispatchers.JavaFx) {
val END_MARKER = -1
val integerProperty = SimpleIntegerProperty(0)
val flow = integerProperty.asFlow().takeWhile { j -> j != END_MARKER }
launch {
yield() // to subscribe to [integerProperty]
yield() // send 0
integerProperty.set(1)
expect(3)
yield() // send 1
expect(5)
integerProperty.set(2)
for (i in (-100..-2)) {
integerProperty.set(i) // should be skipped due to conflation
}
integerProperty.set(3)
expect(6)
yield() // send 2 and 3
integerProperty.set(-1)
}
expect(1)
flow.collect { i ->
when (i) {
0 -> expect(2)
1 -> expect(4)
2 -> expect(7)
3 -> expect(8)
else -> fail("i is $i")
}
}
finish(9)
}
}

}
39 changes: 39 additions & 0 deletions ui/kotlinx-coroutines-javafx/test/JavaFxStressTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package kotlinx.coroutines.javafx

import javafx.beans.property.SimpleIntegerProperty
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.first
import org.junit.*

class JavaFxStressTest : TestBase() {

@Before
fun setup() {
ignoreLostThreads("JavaFX Application Thread", "Thread-", "QuantumRenderer-", "InvokeLaterDispatcher")
}

@get:Rule
val pool = ExecutorRule(1)

@Test
fun testCancellationRace() = runTest {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return@runTest // ignore test in headless environments
}

val integerProperty = SimpleIntegerProperty(0)
val flow = integerProperty.asFlow()
var i = 1
val n = 1000 * stressTestMultiplier
repeat (n) {
launch(pool) {
flow.first()
}
withContext(Dispatchers.JavaFx) {
integerProperty.set(i)
}
i += 1
}
}
}
101 changes: 101 additions & 0 deletions ui/kotlinx-coroutines-javafx/test/examples/FxAsFlow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package examples

import javafx.application.Application
import javafx.scene.Scene
import javafx.scene.control.*
import javafx.scene.layout.GridPane
import javafx.stage.Stage
import javafx.beans.property.SimpleStringProperty
import javafx.event.EventHandler
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.javafx.*
import kotlin.coroutines.CoroutineContext

fun main(args: Array<String>) {
Application.launch(FxAsFlowApp::class.java, *args)
}

/**
* Adapted from
* https://github.com/ReactiveX/RxJavaFX/blob/a78ca7d15f7d82d201df8fafb6eba732ec17e327/src/test/java/io/reactivex/rxjavafx/RxJavaFXTest.java
*/
class FxAsFlowApp: Application(), CoroutineScope {

private var job = Job()
override val coroutineContext: CoroutineContext
get() = JavaFx + job

private val incrementButton = Button("Increment")
private val incrementLabel = Label("")
private val textInput = TextField()
private val flippedTextLabel = Label()
private val spinner = Spinner<Int>()
private val spinnerChangesLabel = Label()

public override fun start( primaryStage: Stage) {
val gridPane = GridPane()
gridPane.apply {
hgap = 10.0
vgap = 10.0
add(incrementButton, 0, 0)
add(incrementLabel, 1, 0)
add(textInput, 0, 1)
add(flippedTextLabel, 1, 1)
add(spinner, 0, 2)
add(spinnerChangesLabel, 1, 2)
}
val scene = Scene(gridPane)
primaryStage.apply {
width = 275.0
setScene(scene)
show()
}
}

public override fun stop() {
super.stop()
job.cancel()
job = Job()
}

init {
// Initializing the "Increment" button
val stringProperty = SimpleStringProperty()
var i = 0
incrementButton.onAction = EventHandler {
i += 1
stringProperty.set(i.toString())
}
launch {
stringProperty.asFlow().collect {
if (it != null) {
stringProperty.set(it)
}
}
}
incrementLabel.textProperty().bind(stringProperty)
// Initializing the reversed text field
val stringProperty2 = SimpleStringProperty()
launch {
textInput.textProperty().asFlow().collect {
if (it != null) {
stringProperty2.set(it.reversed())
}
}
}
flippedTextLabel.textProperty().bind(stringProperty2)
// Initializing the spinner
spinner.valueFactory = SpinnerValueFactory.IntegerSpinnerValueFactory(0, 100)
spinner.isEditable = true
val stringProperty3 = SimpleStringProperty()
launch {
spinner.valueProperty().asFlow().collect {
if (it != null) {
stringProperty3.set("NEW: $it")
}
}
}
spinnerChangesLabel.textProperty().bind(stringProperty3)
}
}