-
Notifications
You must be signed in to change notification settings - Fork 15
Fix constant cancellation & cancellation race condition acquiring a resource #232
Conversation
return Either.catch(fr).flatMap { resource -> | ||
scope.acquired { ex: ExitCase -> release(resource, ex) }.map { registered -> | ||
state.modify { | ||
if (conn.isCancelled() && registered) Pair(it, suspend { release(resource, ExitCase.Cancelled) }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to check for cancellation while registering the resource into the scope, if while registering the resource a cancellation signal occurs than we need to release the resource immediately with ExitCase.Cancelled
.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
val conn = coroutineContext.connection() | ||
return state.modify { s -> | ||
when { | ||
conn.isCancelled() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to check for cancellation while registering the resource into the scope, if while registering the resource a cancellation signal occurs than we need to release the resource immediately with ExitCase.Cancelled
.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isCancelled
can actually be called multiple times here, that's why the connection should be cached.
All operations on atomic
and Atomic
that take a function are prone to be called multiple times until it was able to succesfully update using compareAndSet
since set(f(get()))
is not an atomic safe operation.
That's also why Atomic
(and Ref
) don't take suspend
(or F
) signatures. It's advised against to call effectful code inside atomic updates, since slow updates would have a very low chance of a succesful atomic update.
Therefore we also use modify
here and use a suspend
lambda to defer the operation to happen outside the atomic update.
@@ -118,7 +119,7 @@ internal fun <O> interruptBoundary( | |||
} | |||
|
|||
internal suspend inline fun interruptGuard(scope: Scope): Result<Any?>? = | |||
when (val isInterrupted = scope.isInterrupted()) { | |||
when (val isInterrupted = scope.isInterrupted().also { cancelBoundary() }) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check for cancellation in interruptGuard
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some nits (no blockers), looking good! Nice catches 👌
val conn = coroutineContext.connection() | ||
return state.modify { s -> | ||
when { | ||
conn.isCancelled() -> { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
return Either.catch(fr).flatMap { resource -> | ||
scope.acquired { ex: ExitCase -> release(resource, ex) }.map { registered -> | ||
state.modify { | ||
if (conn.isCancelled() && registered) Pair(it, suspend { release(resource, ExitCase.Cancelled) }) |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
} | ||
|
||
@JvmName("assertStreamCancellable") | ||
suspend fun <A> assertCancellable(fa: (latch: Promise<Unit>) -> Stream<A>): Unit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we would have this functions here as utility functions I think they should be generic enough to use them in more places and exclude the assertions from here, otherwise we're hiding the actual testing from the test classes.
Alternatively you could move them to the CancellationTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you make it more generic? You can still add your own assertions arround, as done here.
We can definitely move them to CancellationTest
. We haven't discussed how we want to organise the test suite, now it's still mixed between a bunch of stuff in StreamTest
, ParJoinTest
, BracketTest
, etc
I personally like grouping tests per combinator (or the Concurrent
onces) so it can serve as a specification of it's behavior, like a law test suite.
If we'd do that I think we should add the cancellation
/interruption
tests there as well for it to be a full spec of it's behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This 2 are currently quite specific, so for now I'd say to have them in CancellationTest
. Later we can think on organize them all better :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR adds a test suite for cancellation.
It exposed two bugs: