diff --git a/build.gradle b/build.gradle index a4df6f128716..ac5a2a4b040f 100644 --- a/build.gradle +++ b/build.gradle @@ -72,6 +72,7 @@ configure(allprojects) { project -> ext.junitVersion = "4.12" ext.junitJupiterVersion = '5.0.0-M4' ext.junitPlatformVersion = '1.0.0-M4' + ext.kotlinCoroutinesVersion= "0.14.1" ext.kotlinVersion = "1.1.1" // also change kotlin-gradle-plugin version when upgrading ext.log4jVersion = '2.8.1' ext.nettyVersion = "4.1.9.Final" @@ -367,6 +368,7 @@ project("spring-core") { compile("commons-logging:commons-logging:1.2") optional("net.sf.jopt-simple:jopt-simple:5.0.3") optional("org.aspectj:aspectjweaver:${aspectjVersion}") + optional("org.jetbrains.kotlinx:kotlinx-coroutines-core:${kotlinCoroutinesVersion}") optional("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}") optional("org.jetbrains.kotlin:kotlin-stdlib-jre8:${kotlinVersion}") optional("org.reactivestreams:reactive-streams") diff --git a/spring-core/src/main/kotlin/org/springframework/util/experimental/ListenableFutureExtensions.kt b/spring-core/src/main/kotlin/org/springframework/util/experimental/ListenableFutureExtensions.kt new file mode 100644 index 000000000000..f1fa9d7520bf --- /dev/null +++ b/spring-core/src/main/kotlin/org/springframework/util/experimental/ListenableFutureExtensions.kt @@ -0,0 +1,137 @@ +/* + * Copyright 2002-2017 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.experimental + +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutionException + +import kotlin.coroutines.experimental.Continuation +import kotlin.coroutines.experimental.CoroutineContext +import kotlin.coroutines.experimental.startCoroutine +import kotlinx.coroutines.experimental.CancellableContinuation +import kotlinx.coroutines.experimental.CancellationException +import kotlinx.coroutines.experimental.CommonPool +import kotlinx.coroutines.experimental.CoroutineDispatcher +import kotlinx.coroutines.experimental.Deferred +import kotlinx.coroutines.experimental.Job +import kotlinx.coroutines.experimental.cancelFutureOnCompletion +import kotlinx.coroutines.experimental.newCoroutineContext +import kotlinx.coroutines.experimental.suspendCancellableCoroutine + +import org.springframework.util.concurrent.ListenableFuture +import org.springframework.util.concurrent.ListenableFutureCallback +import org.springframework.util.concurrent.SettableListenableFuture + +/** + * Starts new coroutine and returns its results an an implementation of [ListenableFuture]. + * This coroutine builder uses [CommonPool] context by default and is conceptually + * similar to [CompletableFuture.supplyAsync]. + * + * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed. + * If the [context] for the new coroutine is explicitly specified, then it must include [CoroutineDispatcher] element. + * See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`. + * The specified context is added to the context of the parent running coroutine (if any) + * inside which this function is invoked. The [Job] of the resulting coroutine is + * a child of the job of the parent coroutine (if any). + * + * See [newCoroutineContext] for a description of debugging facilities that are + * available for newly created coroutine. + * + * @author Konrad Kamiński + * @author Roman Elizarov + * @since 5.0 + */ +fun listenableFuture(context: CoroutineContext = CommonPool, block: suspend () -> T): ListenableFuture { + val newContext = newCoroutineContext(CommonPool + context) + val job = Job(newContext[Job]) + + return ListenableFutureCoroutine(newContext + job).apply { + job.cancelFutureOnCompletion(this) + addCallback(job.asJobCancellingCallback()) + block.startCoroutine(this) + } +} + +/** + * Converts this deferred value to the instance of [ListenableFuture]. + * The deferred value is cancelled when the resulting future is cancelled or otherwise completed. + * + * @author Konrad Kamiński + * @author Roman Elizarov + * @since 5.0 + */ +fun Deferred.asListenableFuture(): ListenableFuture = + SettableListenableFuture().apply { + addCallback(this@asListenableFuture.asJobCancellingCallback()) + + invokeOnCompletion { + try { + set(getCompleted()) + } + catch (exception: Exception) { + setException(exception) + } + } + } + +/** + * Awaits for completion of the future without blocking a thread. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is completed while this suspending function is waiting, + * this function immediately resumes with [CancellationException] . + * + * @author Konrad Kamiński + * @author Roman Elizarov + * @since 5.0 + */ +suspend fun ListenableFuture.await(): T = + if (isDone) { + try { + get() + } + catch (e: ExecutionException) { + throw e.cause ?: e + } + } else { + suspendCancellableCoroutine { cont: CancellableContinuation -> + addCallback(object: ListenableFutureCallback { + override fun onFailure(exception: Throwable) = cont.resumeWithException(exception) + override fun onSuccess(result: T) = cont.resume(result) + }) + cont.cancelFutureOnCompletion(this) + } + } + +private class ListenableFutureCoroutine( + override val context: CoroutineContext +): SettableListenableFuture(), Continuation { + + override fun resume(value: T) { + set(value) + } + + override fun resumeWithException(exception: Throwable) { + setException(exception) + } +} + +private fun Job.asJobCancellingCallback(): ListenableFutureCallback = + object: ListenableFutureCallback { + override fun onFailure(exception: Throwable) { cancel(exception) } + override fun onSuccess(result: T) {} + } \ No newline at end of file diff --git a/spring-core/src/test/kotlin/org/springframework/util/experimental/ListenableFutureExtensionsTests.kt b/spring-core/src/test/kotlin/org/springframework/util/experimental/ListenableFutureExtensionsTests.kt new file mode 100644 index 000000000000..f1b3ba26a52c --- /dev/null +++ b/spring-core/src/test/kotlin/org/springframework/util/experimental/ListenableFutureExtensionsTests.kt @@ -0,0 +1,222 @@ +/* + * Copyright 2002-2017 the original author or authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.experimental + +import kotlinx.coroutines.experimental.CancellationException +import kotlinx.coroutines.experimental.CommonPool +import kotlinx.coroutines.experimental.async +import kotlinx.coroutines.experimental.runBlocking +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue +import org.junit.Assert.fail +import org.junit.Test +import org.springframework.util.concurrent.ListenableFuture +import org.springframework.util.concurrent.SettableListenableFuture +import java.util.concurrent.CountDownLatch +import java.util.concurrent.ExecutionException + +/** + * Tests for coroutine support for [ListenableFuture]. + * + * @author Konrad Kamiński + */ +class ListenableFutureExtensionsTests { + @Test + fun `listenableFuture from successful suspending lambda`() { + val string = "OK" + val listenableFuture = listenableFuture { string } + + listenableFuture.assertThat(value = string, cancelled = false, done = true) + } + + @Test + fun `listenableFuture from failing suspending lambda`() { + val exception = RuntimeException("message") + val listenableFuture = listenableFuture { throw exception } + + listenableFuture.assertThat(exception = exception, cancelled = false, done = true) + } + + @Test + fun `listenableFuture waiting for successful completion`() { + val string = "OK" + val latch = CountDownLatch(1) + + val listenableFuture = listenableFuture { + latch.await() + string + } + + listenableFuture.assertThat(cancelled = false, done = false) + + latch.countDown() + listenableFuture.assertThat(value = string, cancelled = false, done = true) + } + + @Test + fun `listenableFuture waiting for failing completion`() { + val exception = RuntimeException("message") + val latch = CountDownLatch(1) + + val listenableFuture = listenableFuture { + latch.await() + throw exception + } + + listenableFuture.assertThat(cancelled = false, done = false) + + latch.countDown() + listenableFuture.assertThat(exception = exception, cancelled = false, done = true) + } + + @Test + fun `listenableFuture waiting for cancellation`() { + val latch = CountDownLatch(1) + val listenableFuture = listenableFuture { + latch.await() + } + + listenableFuture.cancel(true) + + listenableFuture.assertThat(exceptionAssert = { it is CancellationException}, cancelled = true, done = true) + latch.countDown() + } + + @Test + fun `Successful Deferred converted to ListenableFuture`() { + val string = "OK" + val deferred = async(CommonPool) { + string + } + + val listenableFuture = deferred.asListenableFuture() + + listenableFuture.assertThat(value = string, cancelled = false, done = true) + } + + @Test + fun `Failing Deferred converted to ListenableFuture`() { + val exception = RuntimeException("message") + val deferred = async(CommonPool) { + throw exception + } + + val listenableFuture = deferred.asListenableFuture() + + listenableFuture.assertThat(exception = exception, cancelled = false, done = true) + } + + @Test + fun `Deferred waiting for success converted to ListenableFuture`() { + val string = "OK" + val latch = CountDownLatch(1) + val deferred = async(CommonPool) { + latch.await() + string + } + + val listenableFuture = deferred.asListenableFuture() + listenableFuture.assertThat(cancelled = false, done = false) + + latch.countDown() + listenableFuture.assertThat(value = string, cancelled = false, done = true) + } + + @Test + fun `Deferred waiting for failure converted to ListenableFuture`() { + val exception = RuntimeException("message") + val latch = CountDownLatch(1) + val deferred = async(CommonPool) { + latch.await() + throw exception + } + + val listenableFuture = deferred.asListenableFuture() + + listenableFuture.assertThat(cancelled = false, done = false) + + latch.countDown() + + listenableFuture.assertThat(exception = exception, cancelled = false, done = true) + } + + @Test + fun `awaiting successful ListenableFuture`() = runBlocking { + val string = "OK" + val listenableFuture = SettableListenableFuture().apply { + set(string) + } + + assertEquals(string, listenableFuture.await()) + } + + @Test + fun `awaiting failing ListenableFuture`() = runBlocking { + val exception = RuntimeException("message") + val listenableFuture = SettableListenableFuture().apply { + setException(exception) + } + + try { + listenableFuture.await() + fail("Expected Exception") + } + catch (e: Exception) { + assertEquals(exception, e) + } + } + + @Test + fun `awaiting cancelled ListenableFuture`() = runBlocking { + val listenableFuture = SettableListenableFuture().apply { + cancel(true) + } + + try { + listenableFuture.await() + fail("Expected Exception") + } + catch (e: Exception) { + assertTrue(e is CancellationException) + } + } + + private fun ListenableFuture.assertThat(cancelled: Boolean, done: Boolean, + value: T? = null, exception: Exception? = null, exceptionAssert: ((Exception) -> Unit)? = null) { + + if (value != null) { + assertEquals(value, get()) + } + if (exception != null || exceptionAssert != null) { + try { + get() + fail("Expected ExecutionException") + } + catch (e: Exception) { + if (exceptionAssert != null) { + exceptionAssert(e) + } + if (e is ExecutionException) { + assertEquals(exception, e.cause) + } + } + } + + assertEquals(cancelled, isCancelled) + assertEquals(done, isDone) + } +} \ No newline at end of file