Skip to content

Commit

Permalink
Kotlin 1.3 - Step 1: Upgrade deps, fix coroutine imports, fix tests.
Browse files Browse the repository at this point in the history
Some error handling behavior changed which required changes to how
the library handles exceptions and cancellation, as well as a few
unit test changes.

Closes #35.
  • Loading branch information
zach-klippenstein committed Mar 18, 2019
1 parent f8c9eb9 commit 8d79702
Show file tree
Hide file tree
Showing 40 changed files with 277 additions and 253 deletions.
4 changes: 2 additions & 2 deletions kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ buildscript {
'hamcrest': '1.3',
'intellijAnnotations': '13.0',
'junit': '4.12',
'kotlin': '1.2.71',
'kotlinCoroutines': '0.26.1',
'kotlin': '1.3.21',
'kotlinCoroutines': '1.1.1',
'ktlintPlugin': '5.1.0',
'mavenPublishPlugin': '0.6.0',
'mockito': '2.7.5',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
*/
package com.squareup.workflow.legacy

import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.channels.BroadcastChannel
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.experimental.channels.ClosedSendChannelException
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.EmptyCoroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* Starts a coroutine in the given [context] that runs [block] to produce states and the result
Expand All @@ -36,7 +36,7 @@ import kotlin.coroutines.experimental.EmptyCoroutineContext
* it can use to receive [events][E] from the workflow's [WorkflowInput]. The block's return value
* is used as the workflow's result.
* The state channel is a
* [ConflatedBroadcastChannel][kotlinx.coroutines.experimental.channels.ConflatedBroadcastChannel],
* [ConflatedBroadcastChannel][kotlinx.coroutines.channels.ConflatedBroadcastChannel],
* which means that sends will never suspend or fail, and if observers of the workflow aren't
* receiving fast enough, they will miss intermediate states.
* The event channel has [unlimited capacity][UNLIMITED].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
*/
package com.squareup.workflow.legacy

import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CoroutineName
import kotlinx.coroutines.experimental.Dispatchers.Unconfined
import kotlinx.coroutines.experimental.GlobalScope
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.EmptyCoroutineContext
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers.Unconfined
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.ReceiveChannel
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

/**
* A factory for [Workflow]s implemented as state machines that:
Expand Down Expand Up @@ -69,7 +69,7 @@ import kotlin.coroutines.experimental.EmptyCoroutineContext
*
* [onReact] is not limited to using the given [ReceiveChannel] to calculate its
* next state. For example, a service call might be handled this way, mapping
* a [Deferred][kotlinx.coroutines.experimental.Deferred] generated by Retrofit to the
* a [Deferred][kotlinx.coroutines.Deferred] generated by Retrofit to the
* appropriate [Reaction].
*
* override suspend fun onReact(
Expand All @@ -87,7 +87,7 @@ import kotlin.coroutines.experimental.EmptyCoroutineContext
* }
*
* If you need to mix such command-like `Deferred`s with workflow events, use
* [select][kotlinx.coroutines.experimental.selects.select]:
* [select][kotlinx.coroutines.selects.select]:
*
* override suspend fun onReact(
* state: MyState,
Expand Down Expand Up @@ -223,10 +223,10 @@ interface Reactor<S : Any, E : Any, out O : Any> : WorkflowPool.Launcher<S, E, O
* ## Dispatchers
*
* If [context] contains a
* [CoroutineDispatcher][kotlinx.coroutines.experimental.CoroutineDispatcher], it is not used.
* [CoroutineDispatcher][kotlinx.coroutines.CoroutineDispatcher], it is not used.
* The [onReact][Reactor.onReact] method is always invoked from the [Unconfined] dispatcher. If your
* `onReact` actually requires a particular dispatcher, it should use
* [withContext][kotlinx.coroutines.experimental.withContext].
* [withContext][kotlinx.coroutines.withContext].
*/
fun <S : Any, E : Any, O : Any> Reactor<S, E, O>.doLaunch(
initialState: S,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.squareup.workflow.legacy

import com.squareup.workflow.legacy.WorkflowPool.Type
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.Deferred

/**
* Defines a discrete task that a [Workflow] can execute asynchronously via [WorkflowPool],
Expand Down Expand Up @@ -76,9 +76,9 @@ import kotlinx.coroutines.experimental.Deferred
* ## Note on Dispatchers
*
* Workers are always invoked with the
* [Unconfined][kotlinx.coroutines.experimental.Dispatchers.Unconfined] dispatcher. If a worker
* [Unconfined][kotlinx.coroutines.Dispatchers.Unconfined] dispatcher. If a worker
* needs a specific dispatcher (e.g. to do IO), it should use
* [withContext][kotlinx.coroutines.experimental.withContext].
* [withContext][kotlinx.coroutines.withContext].
*/
interface Worker<in I : Any, out O : Any> {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package com.squareup.workflow.legacy

import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.channels.ReceiveChannel

/**
* Models a process in the app as a stream of [states][openSubscriptionToState] of type [S],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,26 @@
*/
package com.squareup.workflow.legacy

import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.Dispatchers.Unconfined
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.channels.produce
import kotlinx.coroutines.experimental.channels.toChannel
import kotlinx.coroutines.experimental.launch
import kotlin.coroutines.experimental.CoroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers.Unconfined
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.channels.toChannel
import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext

/**
* Defines the [CoroutineContext] used by [Workflow] operators below.
*
* See this module's README for an explanation of why Unconfined is used.
*/
private val operatorScope = CoroutineScope(Unconfined)
private val operatorContext = Unconfined

/**
* [Transforms][https://stackoverflow.com/questions/15457015/explain-contramap]
Expand All @@ -56,7 +57,7 @@ fun <S1 : Any, S2 : Any, E : Any, O : Any> Workflow<S1, E, O>.mapState(
Deferred<O> by this,
WorkflowInput<E> by this {
override fun openSubscriptionToState(): ReceiveChannel<S2> =
operatorScope.produce {
GlobalScope.produce(operatorContext) {
val source = this@mapState.openSubscriptionToState()
source.consumeEach {
send(transform(it))
Expand All @@ -76,7 +77,7 @@ fun <S1 : Any, S2 : Any, E : Any, O : Any> Workflow<S1, E, O>.switchMapState(
Deferred<O> by this,
WorkflowInput<E> by this {
override fun openSubscriptionToState(): ReceiveChannel<S2> =
operatorScope.produce(capacity = CONFLATED) {
GlobalScope.produce(operatorContext, capacity = CONFLATED) {
val upstreamChannel = this@switchMapState.openSubscriptionToState()
val downstreamChannel = channel
var transformerJob: Job? = null
Expand Down Expand Up @@ -105,7 +106,7 @@ fun <S : Any, E : Any, O1 : Any, O2 : Any> Workflow<S, E, O1>.mapResult(
// We can't just make the downstream a child of the upstream workflow to propagate cancellation,
// since the downstream's call to `await` would never return (parent waits for all its children
// to complete).
val transformedResult = operatorScope.async {
val transformedResult = GlobalScope.async(operatorContext) {
transform(this@mapResult.await())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package com.squareup.workflow.legacy
import com.squareup.workflow.legacy.WorkflowPool.Handle
import com.squareup.workflow.legacy.WorkflowPool.Launcher
import com.squareup.workflow.legacy.WorkflowPool.Type
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.Dispatchers.Unconfined
import kotlinx.coroutines.experimental.GlobalScope
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.channels.consume
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers.Unconfined
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.consume
import org.jetbrains.annotations.TestOnly
import kotlin.reflect.KClass

Expand Down Expand Up @@ -154,7 +154,7 @@ class WorkflowPool {
* running, call [awaitWorkflowUpdate] again with the [Running.handle] returned from the
* previous call.
*
* @throws kotlinx.coroutines.experimental.CancellationException If the nested workflow is
* @throws kotlinx.coroutines.CancellationException If the nested workflow is
* [abandoned][abandonWorkflow].
*
* @see workflowUpdate
Expand Down Expand Up @@ -183,7 +183,7 @@ class WorkflowPool {
* starts it with the given [input]. The caller must either consume the result, or
* else call [abandonWorker].
*
* @throws kotlinx.coroutines.experimental.CancellationException If the worker is
* @throws kotlinx.coroutines.CancellationException If the worker is
* [abandoned][abandonWorker].
*
* @see workerResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package com.squareup.workflow.legacy

import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CoroutineName
import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.Dispatchers.Unconfined
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consume
import kotlinx.coroutines.experimental.suspendCancellableCoroutine
import kotlinx.coroutines.experimental.test.TestCoroutineContext
import kotlin.coroutines.experimental.suspendCoroutine
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers.Unconfined
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.test.TestCoroutineContext
import kotlin.coroutines.suspendCoroutine
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ package com.squareup.workflow.legacy

import com.squareup.workflow.legacy.TestState.FirstState
import com.squareup.workflow.legacy.TestState.SecondState
import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CompletableDeferred
import kotlinx.coroutines.experimental.CoroutineExceptionHandler
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.Dispatchers.Unconfined
import kotlinx.coroutines.experimental.GlobalScope
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.suspendCancellableCoroutine
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers.Unconfined
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.suspendCancellableCoroutine
import org.junit.Test
import kotlin.coroutines.experimental.suspendCoroutine
import kotlin.coroutines.suspendCoroutine
import kotlin.coroutines.resumeWithException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
Expand Down Expand Up @@ -116,7 +117,7 @@ class ReactorAsWorkflowIntegrationTest {
val workflow = reactor.doLaunch(SecondState("hello"), WorkflowPool())
subscribeToResult(workflow)
workflow.cancel()
assertTrue(resultSub.isCompletedExceptionally)
assertTrue(resultSub.isCancelled && resultSub.isCompleted)
}

@Test fun `single state change and then finish`() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import com.squareup.workflow.legacy.ReactorIntegrationTest.OuterState.RunningEch
import com.squareup.workflow.legacy.ReactorIntegrationTest.OuterState.RunningImmediateJob
import com.squareup.workflow.legacy.ReactorIntegrationTest.StringEchoer
import com.squareup.workflow.legacy.WorkflowPool.Handle
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.selects.select
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.selects.select
import org.junit.Test
import kotlin.test.assertEquals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package com.squareup.workflow.legacy

import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CompletableDeferred
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import org.junit.Test
import java.io.IOException
import kotlin.test.assertEquals
Expand Down
Loading

0 comments on commit 8d79702

Please sign in to comment.