From 50e18894f570d7d7aad0f27f2921f5d82d0cb236 Mon Sep 17 00:00:00 2001 From: Travis Wyatt Date: Fri, 22 May 2020 19:39:05 -0700 Subject: [PATCH] Fix `connectAction` Exception handling --- keep-alive/README.md | 10 +- keep-alive/src/main/java/KeepAliveGatt.kt | 88 ++++++------ keep-alive/src/test/java/KeepAliveGattTest.kt | 126 ++++++++++++++++-- 3 files changed, 165 insertions(+), 59 deletions(-) diff --git a/keep-alive/README.md b/keep-alive/README.md index f998aa8..d31e98d 100644 --- a/keep-alive/README.md +++ b/keep-alive/README.md @@ -62,7 +62,7 @@ class GattClosed : Exception() suspend fun KeepAliveGatt.readCharacteristicWithRetry( characteristic: BluetoothGattCharacteristic, - retryCount: Int = Integer.MAX_VALUE + retryCount: Int = Int.MAX_VALUE ): OnCharacteristicRead { repeat(retryCount) { suspendUntilConnected() @@ -154,9 +154,11 @@ val gatt = scope.keepAliveGatt(...) When a failure occurs during the connection sequence, `KeepAliveGatt` will disconnect/close the in-flight connection and reconnect. If a connection attempt results in `GattConnectResult.Rejected`, -then the failure is considered unrecoverable and `KeepAliveGatt` will disconnect/close the in-flight -connection and finish in a `Closed` `State`. Once a `KeepAliveGatt` is `Closed` it cannot be -reconnected (calls to `connect` will throw `IllegalStateException`). +then the failure is considered unrecoverable and `KeepAliveGatt` will finish in a `Closed` `State`. +Additionally, a `ConnectionRejected` Exception is propagated to the parent [`CoroutineContext`]. + +Once a `KeepAliveGatt` is `Closed` it **cannot** be reconnected (calls to `connect` will throw +`IllegalStateException`); a new `KeepAliveGatt` must be created. # Setup diff --git a/keep-alive/src/main/java/KeepAliveGatt.kt b/keep-alive/src/main/java/KeepAliveGatt.kt index 8b196f3..fff1837 100644 --- a/keep-alive/src/main/java/KeepAliveGatt.kt +++ b/keep-alive/src/main/java/KeepAliveGatt.kt @@ -30,7 +30,6 @@ import java.util.UUID import java.util.concurrent.atomic.AtomicBoolean import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext -import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineStart @@ -38,7 +37,6 @@ import kotlinx.coroutines.CoroutineStart.UNDISPATCHED import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.NonCancellable -import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.BroadcastChannel import kotlinx.coroutines.channels.Channel.Factory.BUFFERED @@ -50,12 +48,14 @@ import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeoutOrNull typealias ConnectAction = suspend GattIo.() -> Unit class NotReady(message: String) : IllegalStateException(message) +class ConnectionRejected(cause: Throwable) : IllegalStateException(cause) sealed class State { object Connecting : State() @@ -90,10 +90,8 @@ class KeepAliveGatt( private val applicationContext = androidContext.applicationContext - private val job = SupervisorJob(parentCoroutineContext[Job]).apply { - invokeOnCompletion { - _state.value = if (it is CancellationException) Closed(it.cause) else Closed(it) - } + private val job = Job(parentCoroutineContext[Job]).apply { + invokeOnCompletion { cause -> _state.value = Closed(cause) } } private val scope = CoroutineScope(parentCoroutineContext + job) @@ -135,11 +133,7 @@ class KeepAliveGatt( scope.launch(CoroutineName("KeepAliveGatt@$bluetoothDevice")) { while (isActive) { - try { - spawnConnection() - } finally { - _state.value = Disconnected - } + spawnConnection() } } return true @@ -150,13 +144,9 @@ class KeepAliveGatt( isRunning.set(false) } - private fun cancel(cause: CancellationException?) { - job.cancel(cause) - _onCharacteristicChanged.cancel() - } - fun cancel() { - cancel(null) + job.cancel() + _onCharacteristicChanged.cancel() } suspend fun cancelAndJoin() { @@ -165,39 +155,45 @@ class KeepAliveGatt( } private suspend fun spawnConnection() { - _state.value = Connecting - - val gatt = when (val result = bluetoothDevice.connectGatt(applicationContext)) { - is Success -> result.gatt - is Failure.Rejected -> { - cancel(CancellationException("Connection request was rejected", result.cause)) - return - } - is Failure.Connection -> { - Able.error { "Failed to connect to device $bluetoothDevice due to ${result.cause}" } - return - } - } - try { - coroutineScope { - gatt.onCharacteristicChanged - .onEach(_onCharacteristicChanged::send) - .launchIn(this, start = UNDISPATCHED) - onConnectAction?.invoke(gatt) - _gatt = gatt - _state.value = Connected + _state.value = Connecting + + val gatt = when (val result = bluetoothDevice.connectGatt(applicationContext)) { + is Success -> result.gatt + is Failure.Rejected -> throw ConnectionRejected(result.cause) + is Failure.Connection -> { + Able.error { "Failed to connect to device $bluetoothDevice due to ${result.cause}" } + return + } } - } finally { - _gatt = null - _state.value = State.Disconnecting - withContext(NonCancellable) { - withTimeoutOrNull(disconnectTimeoutMillis) { - gatt.disconnect() - } ?: Able.warn { - "Timed out waiting ${disconnectTimeoutMillis}ms for disconnect" + + supervisorScope { + launch { + try { + coroutineScope { + gatt.onCharacteristicChanged + .onEach(_onCharacteristicChanged::send) + .launchIn(this, start = UNDISPATCHED) + onConnectAction?.invoke(gatt) + _gatt = gatt + _state.value = Connected + } + } finally { + _gatt = null + _state.value = State.Disconnecting + + withContext(NonCancellable) { + withTimeoutOrNull(disconnectTimeoutMillis) { + gatt.disconnect() + } ?: Able.warn { + "Timed out waiting ${disconnectTimeoutMillis}ms for disconnect" + } + } + } } } + } finally { + _state.value = Disconnected } } diff --git a/keep-alive/src/test/java/KeepAliveGattTest.kt b/keep-alive/src/test/java/KeepAliveGattTest.kt index 8808bb1..d7c4743 100644 --- a/keep-alive/src/test/java/KeepAliveGattTest.kt +++ b/keep-alive/src/test/java/KeepAliveGattTest.kt @@ -19,6 +19,7 @@ import com.juul.able.gatt.ConnectionLost import com.juul.able.gatt.Gatt import com.juul.able.gatt.OnCharacteristicChanged import com.juul.able.gatt.OnReadRemoteRssi +import com.juul.able.keepalive.ConnectionRejected import com.juul.able.keepalive.KeepAliveGatt import com.juul.able.keepalive.NotReady import com.juul.able.keepalive.State.Closed @@ -43,9 +44,11 @@ import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineStart.UNDISPATCHED import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.channels.BroadcastChannel +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel.Factory.BUFFERED import kotlinx.coroutines.delay import kotlinx.coroutines.flow.asFlow @@ -224,11 +227,8 @@ class KeepAliveGattTest { } @Test - fun `Does not retry connection when connection is rejected`() = runBlocking { - val bluetoothDevice = mockk { - every { this@mockk.toString() } returns MAC_ADDRESS - every { connectGatt(any(), false, any()) } returns null - } + fun `KeepAliveGatt is Closed when connection is rejected`() = runBlocking { + val bluetoothDevice = mockBluetoothDevice() val scope = CoroutineScope(Job()) val keepAlive = scope.keepAliveGatt( @@ -240,13 +240,62 @@ class KeepAliveGattTest { assertTrue(keepAlive.connect()) val closed = keepAlive.state.first { it is Closed } as Closed - assertEquals?>( - expected = RemoteException::class.java, - actual = closed.cause?.javaClass + assertThrowable(closed.cause) + assertThrowable(closed.cause?.cause) + + assertFalse(scope.isActive, "Failure should propagate to parent") + coVerify(exactly = 1) { bluetoothDevice.connectGatt(any(), false, any()) } + } + + @Test + fun `KeepAliveGatt with SupervisorJob parent does not fail siblings on connection rejection`() { + val bluetoothDevice = mockBluetoothDevice() + val done = Channel() + + runBlocking { + launch(SupervisorJob()) { + val keepAlive = keepAliveGatt( + androidContext = mockk(relaxed = true), + bluetoothDevice = bluetoothDevice, + disconnectTimeoutMillis = DISCONNECT_TIMEOUT + ) + + assertTrue(keepAlive.connect()) + keepAlive.state.first { it is Closed } + }.apply { + invokeOnCompletion { done.offer(Unit) } + } + + val job = launch { delay(Long.MAX_VALUE) } + + done.receive() // Wait for first job to complete (fail). + coVerify(exactly = 1) { bluetoothDevice.connectGatt(any(), false, any()) } + + // Verify sibling `launch` is still active. + assertTrue(job.isActive, "Launch with sibling SupervisorJob did not remain active") + + job.cancelAndJoin() + } + } + + @Test + fun `Cancelling KeepAliveGatt does not cancel parent`() = runBlocking { + val bluetoothDevice = mockBluetoothDevice() + val scope = CoroutineScope(Job()) + + val keepAlive = scope.keepAliveGatt( + androidContext = mockk(relaxed = true), + bluetoothDevice = bluetoothDevice, + disconnectTimeoutMillis = DISCONNECT_TIMEOUT ) + keepAlive.cancelAndJoin() + + assertFailsWith { + keepAlive.connect() + } assertTrue(scope.isActive, "KeepAlive cancellation should not cancel parent") - coVerify(exactly = 1) { bluetoothDevice.connectGatt(any(), false, any()) } + coVerify(exactly = 0) { bluetoothDevice.connectGatt(any(), false, any()) } } @Test @@ -414,6 +463,58 @@ class KeepAliveGattTest { actual = keepAlive.toString() ) } + + private class ExceptionFromConnectAction : Exception() + + @Test + fun `An Exception thrown from 'connectAction' causes reconnect`() { + val bluetoothDevice = mockBluetoothDevice() + val gatt1 = mockk { + every { onCharacteristicChanged } returns flow { delay(Long.MAX_VALUE) } + coEvery { disconnect() } returns Unit + } + val gatt2 = mockk { + every { onCharacteristicChanged } returns flow { delay(Long.MAX_VALUE) } + coEvery { disconnect() } returns Unit + } + var thrown: Throwable? = null + val exceptionHandler = CoroutineExceptionHandler { _, throwable -> + thrown = throwable + } + + mockkStatic(BLUETOOTH_DEVICE_CLASS) { + coEvery { + bluetoothDevice.connectGatt(any()) + } returnsMany listOf(gatt1, gatt2).map { ConnectGattResult.Success(it) } + + runBlocking(exceptionHandler) { + var shouldThrow = true + val keepAlive = keepAliveGatt( + androidContext = mockk(relaxed = true), + bluetoothDevice = bluetoothDevice, + disconnectTimeoutMillis = DISCONNECT_TIMEOUT + ) { + if (shouldThrow) { + shouldThrow = false // Only throw on the first connection attempt. + throw ExceptionFromConnectAction() + } + } + assertEquals( + expected = Disconnected, + actual = keepAlive.state.first() + ) + + keepAlive.connect() + keepAlive.state.first { it == Connected } // Wait until connected. + keepAlive.cancelAndJoin() + } + + assertThrowable(thrown) + coVerify(exactly = 2) { bluetoothDevice.connectGatt(any()) } + coVerify(exactly = 1) { gatt1.disconnect() } + coVerify(exactly = 1) { gatt2.disconnect() } + } + } } private fun mockBluetoothDevice(): BluetoothDevice = mockk { @@ -424,3 +525,10 @@ private fun mockBluetoothDevice(): BluetoothDevice = mockk { // function, so this mocked method usually isn't used. every { connectGatt(any(), any(), any()) } returns null } + +private inline fun assertThrowable(throwable: Throwable?) { + assertEquals?>( + expected = T::class.java, + actual = throwable?.javaClass + ) +}