Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
Fix connectAction Exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
twyatt committed May 23, 2020
1 parent d2e1a33 commit 04d625d
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 66 deletions.
10 changes: 6 additions & 4 deletions keep-alive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class GattClosed : Exception()

suspend fun KeepAliveGatt.readCharacteristicWithRetry(
characteristic: BluetoothGattCharacteristic,
retryCount: Int = Integer.MAX_VALUE
retryCount: Int = Int.MAX_VALUE
): OnCharacteristicRead {
repeat(retryCount) {
suspendUntilConnected()
Expand Down Expand Up @@ -154,9 +154,11 @@ val gatt = scope.keepAliveGatt(...)

When a failure occurs during the connection sequence, `KeepAliveGatt` will disconnect/close the
in-flight connection and reconnect. If a connection attempt results in `GattConnectResult.Rejected`,
then the failure is considered unrecoverable and `KeepAliveGatt` will disconnect/close the in-flight
connection and finish in a `Closed` `State`. Once a `KeepAliveGatt` is `Closed` it cannot be
reconnected (calls to `connect` will throw `IllegalStateException`).
then the failure is considered unrecoverable and `KeepAliveGatt` will finish in a `Closed` `State`.
Additionally, a `ConnectionRejected` Exception is propagated to the parent [`CoroutineContext`].

Once a `KeepAliveGatt` is `Closed` it **cannot** be reconnected (calls to `connect` will throw
`IllegalStateException`); a new `KeepAliveGatt` must be created.

# Setup

Expand Down
88 changes: 42 additions & 46 deletions keep-alive/src/main/java/KeepAliveGatt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@ import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
Expand All @@ -50,12 +48,14 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull

typealias ConnectAction = suspend GattIo.() -> Unit

class NotReady(message: String) : IllegalStateException(message)
class ConnectionRejected(cause: Throwable) : IllegalStateException(cause)

sealed class State {
object Connecting : State()
Expand Down Expand Up @@ -90,10 +90,8 @@ class KeepAliveGatt(

private val applicationContext = androidContext.applicationContext

private val job = SupervisorJob(parentCoroutineContext[Job]).apply {
invokeOnCompletion {
_state.value = if (it is CancellationException) Closed(it.cause) else Closed(it)
}
private val job = Job(parentCoroutineContext[Job]).apply {
invokeOnCompletion { cause -> _state.value = Closed(cause) }
}
private val scope = CoroutineScope(parentCoroutineContext + job)

Expand Down Expand Up @@ -135,11 +133,7 @@ class KeepAliveGatt(

scope.launch(CoroutineName("KeepAliveGatt@$bluetoothDevice")) {
while (isActive) {
try {
spawnConnection()
} finally {
_state.value = Disconnected
}
spawnConnection()
}
}
return true
Expand All @@ -150,13 +144,9 @@ class KeepAliveGatt(
isRunning.set(false)
}

private fun cancel(cause: CancellationException?) {
job.cancel(cause)
_onCharacteristicChanged.cancel()
}

fun cancel() {
cancel(null)
job.cancel()
_onCharacteristicChanged.cancel()
}

suspend fun cancelAndJoin() {
Expand All @@ -165,39 +155,45 @@ class KeepAliveGatt(
}

private suspend fun spawnConnection() {
_state.value = Connecting

val gatt = when (val result = bluetoothDevice.connectGatt(applicationContext)) {
is Success -> result.gatt
is Failure.Rejected -> {
cancel(CancellationException("Connection request was rejected", result.cause))
return
}
is Failure.Connection -> {
Able.error { "Failed to connect to device $bluetoothDevice due to ${result.cause}" }
return
}
}

try {
coroutineScope {
gatt.onCharacteristicChanged
.onEach(_onCharacteristicChanged::send)
.launchIn(this, start = UNDISPATCHED)
onConnectAction?.invoke(gatt)
_gatt = gatt
_state.value = Connected
_state.value = Connecting

val gatt = when (val result = bluetoothDevice.connectGatt(applicationContext)) {
is Success -> result.gatt
is Failure.Rejected -> throw ConnectionRejected(result.cause)
is Failure.Connection -> {
Able.error { "Failed to connect to device $bluetoothDevice due to ${result.cause}" }
return
}
}
} finally {
_gatt = null
_state.value = State.Disconnecting
withContext(NonCancellable) {
withTimeoutOrNull(disconnectTimeoutMillis) {
gatt.disconnect()
} ?: Able.warn {
"Timed out waiting ${disconnectTimeoutMillis}ms for disconnect"

supervisorScope {
launch {
try {
coroutineScope {
gatt.onCharacteristicChanged
.onEach(_onCharacteristicChanged::send)
.launchIn(this, start = UNDISPATCHED)
onConnectAction?.invoke(gatt)
_gatt = gatt
_state.value = Connected
}
} finally {
_gatt = null
_state.value = State.Disconnecting

withContext(NonCancellable) {
withTimeoutOrNull(disconnectTimeoutMillis) {
gatt.disconnect()
} ?: Able.warn {
"Timed out waiting ${disconnectTimeoutMillis}ms for disconnect"
}
}
}
}
}
} finally {
_state.value = Disconnected
}
}

Expand Down
140 changes: 124 additions & 16 deletions keep-alive/src/test/java/KeepAliveGattTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.juul.able.gatt.ConnectionLost
import com.juul.able.gatt.Gatt
import com.juul.able.gatt.OnCharacteristicChanged
import com.juul.able.gatt.OnReadRemoteRssi
import com.juul.able.keepalive.ConnectionRejected
import com.juul.able.keepalive.KeepAliveGatt
import com.juul.able.keepalive.NotReady
import com.juul.able.keepalive.State.Closed
Expand All @@ -32,20 +33,15 @@ import io.mockk.coVerify
import io.mockk.every
import io.mockk.mockk
import io.mockk.mockkStatic
import java.util.UUID
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow
Expand All @@ -55,6 +51,13 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import java.util.UUID
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertTrue

private const val BLUETOOTH_DEVICE_CLASS = "com.juul.able.android.BluetoothDeviceKt"
private const val MAC_ADDRESS = "00:11:22:33:FF:EE"
Expand Down Expand Up @@ -224,11 +227,8 @@ class KeepAliveGattTest {
}

@Test
fun `Does not retry connection when connection is rejected`() = runBlocking {
val bluetoothDevice = mockk<BluetoothDevice> {
every { this@mockk.toString() } returns MAC_ADDRESS
every { connectGatt(any(), false, any()) } returns null
}
fun `KeepAliveGatt is Closed when connection is rejected`() = runBlocking {
val bluetoothDevice = mockBluetoothDevice()

val scope = CoroutineScope(Job())
val keepAlive = scope.keepAliveGatt(
Expand All @@ -240,13 +240,62 @@ class KeepAliveGattTest {
assertTrue(keepAlive.connect())
val closed = keepAlive.state.first { it is Closed } as Closed

assertEquals<Class<out Throwable>?>(
expected = RemoteException::class.java,
actual = closed.cause?.javaClass
assertThrowable<ConnectionRejected>(closed.cause)
assertThrowable<RemoteException>(closed.cause?.cause)

assertFalse(scope.isActive, "Failure should propagate to parent")
coVerify(exactly = 1) { bluetoothDevice.connectGatt(any(), false, any()) }
}

@Test
fun `KeepAliveGatt with SupervisorJob parent does not fail siblings on connection rejection`() {
val bluetoothDevice = mockBluetoothDevice()
val done = Channel<Unit>()

runBlocking {
launch(SupervisorJob()) {
val keepAlive = keepAliveGatt(
androidContext = mockk(relaxed = true),
bluetoothDevice = bluetoothDevice,
disconnectTimeoutMillis = DISCONNECT_TIMEOUT
)

assertTrue(keepAlive.connect())
keepAlive.state.first { it is Closed }
}.apply {
invokeOnCompletion { done.offer(Unit) }
}

val job = launch { delay(Long.MAX_VALUE) }

done.receive() // Wait for first job to complete (fail).
coVerify(exactly = 1) { bluetoothDevice.connectGatt(any(), false, any()) }

// Verify sibling `launch` is still active.
assertTrue(job.isActive, "Launch with sibling SupervisorJob did not remain active")

job.cancelAndJoin()
}
}

@Test
fun `Cancelling KeepAliveGatt does not cancel parent`() = runBlocking {
val bluetoothDevice = mockBluetoothDevice()
val scope = CoroutineScope(Job())

val keepAlive = scope.keepAliveGatt(
androidContext = mockk(relaxed = true),
bluetoothDevice = bluetoothDevice,
disconnectTimeoutMillis = DISCONNECT_TIMEOUT
)
keepAlive.cancelAndJoin()

assertFailsWith<IllegalStateException> {
keepAlive.connect()
}

assertTrue(scope.isActive, "KeepAlive cancellation should not cancel parent")
coVerify(exactly = 1) { bluetoothDevice.connectGatt(any(), false, any()) }
coVerify(exactly = 0) { bluetoothDevice.connectGatt(any(), false, any()) }
}

@Test
Expand Down Expand Up @@ -414,6 +463,58 @@ class KeepAliveGattTest {
actual = keepAlive.toString()
)
}

private class ExceptionFromConnectAction : Exception()

@Test
fun `An Exception thrown from 'connectAction' causes reconnect`() {
val bluetoothDevice = mockBluetoothDevice()
val gatt1 = mockk<Gatt> {
every { onCharacteristicChanged } returns flow { delay(Long.MAX_VALUE) }
coEvery { disconnect() } returns Unit
}
val gatt2 = mockk<Gatt> {
every { onCharacteristicChanged } returns flow { delay(Long.MAX_VALUE) }
coEvery { disconnect() } returns Unit
}
var thrown: Throwable? = null
val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
thrown = throwable
}

mockkStatic(BLUETOOTH_DEVICE_CLASS) {
coEvery {
bluetoothDevice.connectGatt(any())
} returnsMany listOf(gatt1, gatt2).map { ConnectGattResult.Success(it) }

runBlocking(exceptionHandler) {
var shouldThrow = true
val keepAlive = keepAliveGatt(
androidContext = mockk(relaxed = true),
bluetoothDevice = bluetoothDevice,
disconnectTimeoutMillis = DISCONNECT_TIMEOUT
) {
if (shouldThrow) {
shouldThrow = false // Only throw on the first connection attempt.
throw ExceptionFromConnectAction()
}
}
assertEquals(
expected = Disconnected,
actual = keepAlive.state.first()
)

keepAlive.connect()
keepAlive.state.first { it == Connected } // Wait until connected.
keepAlive.cancelAndJoin()
}

assertThrowable<ExceptionFromConnectAction>(thrown)
coVerify(exactly = 2) { bluetoothDevice.connectGatt(any()) }
coVerify(exactly = 1) { gatt1.disconnect() }
coVerify(exactly = 1) { gatt2.disconnect() }
}
}
}

private fun mockBluetoothDevice(): BluetoothDevice = mockk {
Expand All @@ -424,3 +525,10 @@ private fun mockBluetoothDevice(): BluetoothDevice = mockk {
// function, so this mocked method usually isn't used.
every { connectGatt(any(), any(), any()) } returns null
}

private inline fun <reified T : Throwable> assertThrowable(throwable: Throwable?) {
assertEquals<Class<out Throwable>?>(
expected = T::class.java,
actual = throwable?.javaClass
)
}

0 comments on commit 04d625d

Please sign in to comment.