Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Fix issue SuspendConnection #191

Merged
merged 3 commits into from
Jun 22, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package arrow.fx.coroutines
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.startCoroutine

/**
* Inline marker to mark a [CancelToken],
Expand Down Expand Up @@ -83,7 +86,8 @@ internal sealed class SuspendConnection : AbstractCoroutineContextElement(Suspen

override tailrec fun push(token: CancelToken): Unit = when (val list = state.value) {
// If connection is already cancelled cancel token immediately.
null -> Platform.unsafeRunSync { token.invoke() }
null -> token.cancel
.startCoroutine(Continuation(EmptyCoroutineContext) { })
else ->
if (state.compareAndSet(list, listOf(token) + list)) Unit
else push(token)
Expand Down