Skip to content

SPR-15413: Add Kotlin coroutines for ListenableFuture #1375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ configure(allprojects) { project ->
ext.junitVersion = "4.12"
ext.junitJupiterVersion = '5.0.0-M4'
ext.junitPlatformVersion = '1.0.0-M4'
ext.kotlinCoroutinesVersion= "0.14.1"
ext.kotlinVersion = "1.1.1" // also change kotlin-gradle-plugin version when upgrading
ext.log4jVersion = '2.8.1'
ext.nettyVersion = "4.1.9.Final"
Expand Down Expand Up @@ -367,6 +368,7 @@ project("spring-core") {
compile("commons-logging:commons-logging:1.2")
optional("net.sf.jopt-simple:jopt-simple:5.0.3")
optional("org.aspectj:aspectjweaver:${aspectjVersion}")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-core:${kotlinCoroutinesVersion}")
optional("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
optional("org.jetbrains.kotlin:kotlin-stdlib-jre8:${kotlinVersion}")
optional("org.reactivestreams:reactive-streams")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright 2002-2017 the original author or authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.util.experimental

import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException

import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.startCoroutine
import kotlinx.coroutines.experimental.CancellableContinuation
import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.CoroutineDispatcher
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.cancelFutureOnCompletion
import kotlinx.coroutines.experimental.newCoroutineContext
import kotlinx.coroutines.experimental.suspendCancellableCoroutine

import org.springframework.util.concurrent.ListenableFuture
import org.springframework.util.concurrent.ListenableFutureCallback
import org.springframework.util.concurrent.SettableListenableFuture

/**
* Starts new coroutine and returns its results an an implementation of [ListenableFuture].
* This coroutine builder uses [CommonPool] context by default and is conceptually
* similar to [CompletableFuture.supplyAsync].
*
* The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
* If the [context] for the new coroutine is explicitly specified, then it must include [CoroutineDispatcher] element.
* See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
* The specified context is added to the context of the parent running coroutine (if any)
* inside which this function is invoked. The [Job] of the resulting coroutine is
* a child of the job of the parent coroutine (if any).
*
* See [newCoroutineContext] for a description of debugging facilities that are
* available for newly created coroutine.
*
* @author Konrad Kamiński
* @author Roman Elizarov
* @since 5.0
*/
fun <T> listenableFuture(context: CoroutineContext = CommonPool, block: suspend () -> T): ListenableFuture<T> {
val newContext = newCoroutineContext(CommonPool + context)
val job = Job(newContext[Job])

return ListenableFutureCoroutine<T>(newContext + job).apply {
job.cancelFutureOnCompletion(this)
addCallback(job.asJobCancellingCallback())
block.startCoroutine(this)
}
}

/**
* Converts this deferred value to the instance of [ListenableFuture].
* The deferred value is cancelled when the resulting future is cancelled or otherwise completed.
*
* @author Konrad Kamiński
* @author Roman Elizarov
* @since 5.0
*/
fun <T> Deferred<T>.asListenableFuture(): ListenableFuture<T> =
SettableListenableFuture<T>().apply {
addCallback(this@asListenableFuture.asJobCancellingCallback())

invokeOnCompletion {
try {
set(getCompleted())
}
catch (exception: Exception) {
setException(exception)
}
}
}

/**
* Awaits for completion of the future without blocking a thread.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException] .
*
* @author Konrad Kamiński
* @author Roman Elizarov
* @since 5.0
*/
suspend fun <T> ListenableFuture<T>.await(): T =
if (isDone) {
try {
get()
}
catch (e: ExecutionException) {
throw e.cause ?: e
}
} else {
suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
addCallback(object: ListenableFutureCallback<T> {
override fun onFailure(exception: Throwable) = cont.resumeWithException(exception)
override fun onSuccess(result: T) = cont.resume(result)
})
cont.cancelFutureOnCompletion(this)
}
}

private class ListenableFutureCoroutine<T>(
override val context: CoroutineContext
): SettableListenableFuture<T>(), Continuation<T> {

override fun resume(value: T) {
set(value)
}

override fun resumeWithException(exception: Throwable) {
setException(exception)
}
}

private fun <T> Job.asJobCancellingCallback(): ListenableFutureCallback<T> =
object: ListenableFutureCallback<T> {
override fun onFailure(exception: Throwable) { cancel(exception) }
override fun onSuccess(result: T) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Copyright 2002-2017 the original author or authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.util.experimental

import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CommonPool
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Assert.fail
import org.junit.Test
import org.springframework.util.concurrent.ListenableFuture
import org.springframework.util.concurrent.SettableListenableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException

/**
* Tests for coroutine support for [ListenableFuture].
*
* @author Konrad Kamiński
*/
class ListenableFutureExtensionsTests {
@Test
fun `listenableFuture from successful suspending lambda`() {
val string = "OK"
val listenableFuture = listenableFuture { string }

listenableFuture.assertThat(value = string, cancelled = false, done = true)
}

@Test
fun `listenableFuture from failing suspending lambda`() {
val exception = RuntimeException("message")
val listenableFuture = listenableFuture { throw exception }

listenableFuture.assertThat(exception = exception, cancelled = false, done = true)
}

@Test
fun `listenableFuture waiting for successful completion`() {
val string = "OK"
val latch = CountDownLatch(1)

val listenableFuture = listenableFuture {
latch.await()
string
}

listenableFuture.assertThat(cancelled = false, done = false)

latch.countDown()
listenableFuture.assertThat(value = string, cancelled = false, done = true)
}

@Test
fun `listenableFuture waiting for failing completion`() {
val exception = RuntimeException("message")
val latch = CountDownLatch(1)

val listenableFuture = listenableFuture {
latch.await()
throw exception
}

listenableFuture.assertThat(cancelled = false, done = false)

latch.countDown()
listenableFuture.assertThat(exception = exception, cancelled = false, done = true)
}

@Test
fun `listenableFuture waiting for cancellation`() {
val latch = CountDownLatch(1)
val listenableFuture = listenableFuture {
latch.await()
}

listenableFuture.cancel(true)

listenableFuture.assertThat(exceptionAssert = { it is CancellationException}, cancelled = true, done = true)
latch.countDown()
}

@Test
fun `Successful Deferred converted to ListenableFuture`() {
val string = "OK"
val deferred = async(CommonPool) {
string
}

val listenableFuture = deferred.asListenableFuture()

listenableFuture.assertThat(value = string, cancelled = false, done = true)
}

@Test
fun `Failing Deferred converted to ListenableFuture`() {
val exception = RuntimeException("message")
val deferred = async(CommonPool) {
throw exception
}

val listenableFuture = deferred.asListenableFuture()

listenableFuture.assertThat(exception = exception, cancelled = false, done = true)
}

@Test
fun `Deferred waiting for success converted to ListenableFuture`() {
val string = "OK"
val latch = CountDownLatch(1)
val deferred = async(CommonPool) {
latch.await()
string
}

val listenableFuture = deferred.asListenableFuture()
listenableFuture.assertThat(cancelled = false, done = false)

latch.countDown()
listenableFuture.assertThat(value = string, cancelled = false, done = true)
}

@Test
fun `Deferred waiting for failure converted to ListenableFuture`() {
val exception = RuntimeException("message")
val latch = CountDownLatch(1)
val deferred = async(CommonPool) {
latch.await()
throw exception
}

val listenableFuture = deferred.asListenableFuture()

listenableFuture.assertThat(cancelled = false, done = false)

latch.countDown()

listenableFuture.assertThat(exception = exception, cancelled = false, done = true)
}

@Test
fun `awaiting successful ListenableFuture`() = runBlocking {
val string = "OK"
val listenableFuture = SettableListenableFuture<String>().apply {
set(string)
}

assertEquals(string, listenableFuture.await())
}

@Test
fun `awaiting failing ListenableFuture`() = runBlocking {
val exception = RuntimeException("message")
val listenableFuture = SettableListenableFuture<String>().apply {
setException(exception)
}

try {
listenableFuture.await()
fail("Expected Exception")
}
catch (e: Exception) {
assertEquals(exception, e)
}
}

@Test
fun `awaiting cancelled ListenableFuture`() = runBlocking {
val listenableFuture = SettableListenableFuture<String>().apply {
cancel(true)
}

try {
listenableFuture.await()
fail("Expected Exception")
}
catch (e: Exception) {
assertTrue(e is CancellationException)
}
}

private fun <T> ListenableFuture<T>.assertThat(cancelled: Boolean, done: Boolean,
value: T? = null, exception: Exception? = null, exceptionAssert: ((Exception) -> Unit)? = null) {

if (value != null) {
assertEquals(value, get())
}
if (exception != null || exceptionAssert != null) {
try {
get()
fail("Expected ExecutionException")
}
catch (e: Exception) {
if (exceptionAssert != null) {
exceptionAssert(e)
}
if (e is ExecutionException) {
assertEquals(exception, e.cause)
}
}
}

assertEquals(cancelled, isCancelled)
assertEquals(done, isDone)
}
}