Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
Remove Messenger actor and use withContext instead
Browse files Browse the repository at this point in the history
Inspired by [comment] by elizarov (on Jun 15) in
Kotlin/kotlinx.coroutines#87:

> when you ask and actor and want a result back the proper design would
> be to have a `suspend fun` with a normal (non-deferred) `Result`.
> However, please note that this whole ask & wait pattern is an
> anti-pattern in actor-based systems, since it limits scalability.

[comment]: Kotlin/kotlinx.coroutines#87 (comment)
  • Loading branch information
twyatt committed Mar 3, 2019
1 parent 3fc4f79 commit edef027
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 323 deletions.
1 change: 0 additions & 1 deletion core/src/main/java/ConnectionStateMonitor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package com.juul.able.experimental

import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothProfile
import com.juul.able.experimental.messenger.OnConnectionStateChange
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.sync.Mutex
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/java/CoroutinesDevice.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ import android.os.RemoteException
import com.juul.able.experimental.ConnectGattResult.Canceled
import com.juul.able.experimental.ConnectGattResult.Failure
import com.juul.able.experimental.ConnectGattResult.Success
import com.juul.able.experimental.messenger.GattCallback
import com.juul.able.experimental.messenger.GattCallbackConfig
import com.juul.able.experimental.messenger.Messenger
import kotlinx.coroutines.CancellationException

class CoroutinesDevice(
Expand All @@ -31,8 +28,7 @@ class CoroutinesDevice(
private fun requestConnectGatt(context: Context, autoConnect: Boolean): CoroutinesGatt? {
val callback = GattCallback(callbackConfig)
val bluetoothGatt = device.connectGatt(context, autoConnect, callback) ?: return null
val messenger = Messenger(bluetoothGatt, callback)
return CoroutinesGatt(bluetoothGatt, messenger)
return CoroutinesGatt(bluetoothGatt, callback)
}

/**
Expand Down
206 changes: 67 additions & 139 deletions core/src/main/java/CoroutinesGatt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,16 @@ import android.bluetooth.BluetoothProfile.STATE_CONNECTED
import android.bluetooth.BluetoothProfile.STATE_DISCONNECTED
import android.bluetooth.BluetoothProfile.STATE_DISCONNECTING
import android.os.RemoteException
import com.juul.able.experimental.messenger.Message.DiscoverServices
import com.juul.able.experimental.messenger.Message.ReadCharacteristic
import com.juul.able.experimental.messenger.Message.RequestMtu
import com.juul.able.experimental.messenger.Message.WriteCharacteristic
import com.juul.able.experimental.messenger.Message.WriteDescriptor
import com.juul.able.experimental.messenger.Messenger
import com.juul.able.experimental.messenger.OnCharacteristicChanged
import com.juul.able.experimental.messenger.OnCharacteristicRead
import com.juul.able.experimental.messenger.OnCharacteristicWrite
import com.juul.able.experimental.messenger.OnConnectionStateChange
import com.juul.able.experimental.messenger.OnDescriptorWrite
import com.juul.able.experimental.messenger.OnMtuChanged
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.filter
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import java.util.UUID
import kotlin.coroutines.CoroutineContext

Expand All @@ -41,7 +32,8 @@ class GattConnectionLost : Exception()

class CoroutinesGatt(
private val bluetoothGatt: BluetoothGatt,
private val messenger: Messenger
private val callback: GattCallback,
private val dispatcher: CoroutineDispatcher = newSingleThreadContext("Gatt")
) : Gatt {

private val job = Job()
Expand All @@ -51,10 +43,10 @@ class CoroutinesGatt(
private val connectionStateMonitor by lazy { ConnectionStateMonitor(this) }

override val onConnectionStateChange: BroadcastChannel<OnConnectionStateChange>
get() = messenger.callback.onConnectionStateChange
get() = callback.onConnectionStateChange

override val onCharacteristicChanged: BroadcastChannel<OnCharacteristicChanged>
get() = messenger.callback.onCharacteristicChanged
get() = callback.onCharacteristicChanged

override fun requestConnect(): Boolean = bluetoothGatt.connect()
override fun requestDisconnect(): Unit = bluetoothGatt.disconnect()
Expand All @@ -77,7 +69,18 @@ class CoroutinesGatt(
Able.verbose { "close → Begin" }
job.cancel()
connectionStateMonitor.close()
messenger.close()
callback.close()

if (dispatcher is ExecutorCoroutineDispatcher) {
/**
* Explicitly close context (this is needed until #261 is fixed).
*
* [Kotlin Coroutines Issue #261](https://github.com/Kotlin/kotlinx.coroutines/issues/261)
* [Coroutines actor test Gist](https://gist.github.com/twyatt/c51f81d763a6ee39657233fa725f5435)
*/
dispatcher.close()
}

bluetoothGatt.close()
Able.verbose { "close → End" }
}
Expand All @@ -89,62 +92,22 @@ class CoroutinesGatt(
* @throws [RemoteException] if underlying [BluetoothGatt.discoverServices] returns `false`.
* @throws [GattClosed] if [Gatt] is closed while method is executing.
*/
override suspend fun discoverServices(): GattStatus {
Able.debug { "discoverServices → send(DiscoverServices)" }

val response = CompletableDeferred<Boolean>()
messenger.send(DiscoverServices(response))

val call = "BluetoothGatt.discoverServices()"
Able.verbose { "discoverServices → Waiting for $call" }
if (!response.await()) {
throw RemoteException("$call returned false.")
override suspend fun discoverServices(): GattStatus =
performBluetoothAction("discoverServices", callback.onServicesDiscovered) {
bluetoothGatt.discoverServices()
}

Able.verbose { "discoverServices → Waiting for BluetoothGattCallback" }
return try {
messenger.callback.onServicesDiscovered.receiveRequiringConnection().also { status ->
Able.info { "discoverServices, status=${status.asGattStatusString()}" }
}
} catch (e: ClosedReceiveChannelException) {
throw GattClosed("Gatt closed during discoverServices", e)
}
}

/**
* @throws [RemoteException] if underlying [BluetoothGatt.readCharacteristic] returns `false`.
* @throws [GattClosed] if [Gatt] is closed while method is executing.
*/
override suspend fun readCharacteristic(
characteristic: BluetoothGattCharacteristic
): OnCharacteristicRead {
val uuid = characteristic.uuid
Able.debug { "readCharacteristic → send(ReadCharacteristic[uuid=$uuid])" }

val response = CompletableDeferred<Boolean>()
messenger.send(ReadCharacteristic(characteristic, response))

val call = "BluetoothGatt.readCharacteristic(BluetoothGattCharacteristic[uuid=$uuid])"
Able.verbose { "readCharacteristic → Waiting for $call" }
if (!response.await()) {
throw RemoteException("Failed to read characteristic with UUID $uuid.")
): OnCharacteristicRead =
performBluetoothAction("readCharacteristic", callback.onCharacteristicRead) {
bluetoothGatt.readCharacteristic(characteristic)
}

Able.verbose { "readCharacteristic → Waiting for BluetoothGattCallback" }
return try {
messenger.callback.onCharacteristicRead.receiveRequiringConnection()
.also { (_, value, status) ->
Able.info {
val bytesString = value.size.bytesString
val statusString = status.asGattStatusString()
"← readCharacteristic $uuid ($bytesString), status=$statusString"
}
}
} catch (e: ClosedReceiveChannelException) {
throw GattClosed("Gatt closed during readCharacteristic[uuid=$uuid]", e)
}
}

/**
* @param value applied to [characteristic] when characteristic is written.
* @param writeType applied to [characteristic] when characteristic is written.
Expand All @@ -155,103 +118,70 @@ class CoroutinesGatt(
characteristic: BluetoothGattCharacteristic,
value: ByteArray,
writeType: WriteType
): OnCharacteristicWrite {
val uuid = characteristic.uuid
Able.debug { "writeCharacteristic → send(WriteCharacteristic[uuid=$uuid])" }

val response = CompletableDeferred<Boolean>()
messenger.send(WriteCharacteristic(characteristic, value, writeType, response))

val call = "BluetoothGatt.writeCharacteristic(BluetoothGattCharacteristic[uuid=$uuid])"
Able.verbose { "writeCharacteristic → Waiting for $call" }
if (!response.await()) {
throw RemoteException("$call returned false.")
): OnCharacteristicWrite =
performBluetoothAction("writeCharacteristic", callback.onCharacteristicWrite) {
characteristic.value = value
characteristic.writeType = writeType
bluetoothGatt.writeCharacteristic(characteristic)
}

Able.verbose { "writeCharacteristic → Waiting for BluetoothGattCallback" }
return try {
messenger.callback.onCharacteristicWrite.receiveRequiringConnection()
.also { (_, status) ->
Able.info {
val bytesString = value.size.bytesString
val typeString = writeType.asWriteTypeString()
val statusString = status.asGattStatusString()
"→ writeCharacteristic $uuid ($bytesString), type=$typeString, status=$statusString"
}
}
} catch (e: ClosedReceiveChannelException) {
throw GattClosed("Gatt closed during writeCharacteristic[uuid=$uuid]", e)
}
}

/**
* @param value applied to [descriptor] when descriptor is written.
* @throws [RemoteException] if underlying [BluetoothGatt.writeDescriptor] returns `false`.
* @throws [GattClosed] if [Gatt] is closed while method is executing.
*/
override suspend fun writeDescriptor(
descriptor: BluetoothGattDescriptor, value: ByteArray
): OnDescriptorWrite {
val uuid = descriptor.uuid
Able.debug { "writeDescriptor → send(WriteDescriptor[uuid=$uuid])" }

val response = CompletableDeferred<Boolean>()
messenger.send(WriteDescriptor(descriptor, value, response))

val call = "BluetoothGatt.writeDescriptor(BluetoothGattDescriptor[uuid=$uuid])"
Able.verbose { "writeDescriptor → Waiting for $call" }
if (!response.await()) {
throw RemoteException("$call returned false.")
}

Able.verbose { "writeDescriptor → Waiting for BluetoothGattCallback" }
return try {
messenger.callback.onDescriptorWrite.receiveRequiringConnection().also { (_, status) ->
Able.info {
val bytesString = value.size.bytesString
val statusString = status.asGattStatusString()
"→ writeDescriptor $uuid ($bytesString), status=$statusString"
}
}
} catch (e: ClosedReceiveChannelException) {
throw GattClosed("Gatt closed during writeDescriptor[uuid=$uuid]", e)
descriptor: BluetoothGattDescriptor,
value: ByteArray
): OnDescriptorWrite =
performBluetoothAction("writeDescriptor", callback.onDescriptorWrite) {
descriptor.value = value
bluetoothGatt.writeDescriptor(descriptor)
}
}

/**
* @throws [RemoteException] if underlying [BluetoothGatt.requestMtu] returns `false`.
* @throws [GattClosed] if [Gatt] is closed while method is executing.
*/
override suspend fun requestMtu(mtu: Int): OnMtuChanged {
Able.debug { "requestMtu → send(RequestMtu[mtu=$mtu])" }

val response = CompletableDeferred<Boolean>()
messenger.send(RequestMtu(mtu, response))

val call = "BluetoothGatt.requestMtu($mtu)"
Able.verbose { "requestMtu → Waiting for $call" }
if (!response.await()) {
throw RemoteException("$call returned false.")
}

Able.verbose { "requestMtu → Waiting for BluetoothGattCallback" }
return try {
messenger.callback.onMtuChanged.receiveRequiringConnection().also { (mtu, status) ->
Able.info { "requestMtu $mtu, status=${status.asGattStatusString()}" }
}
} catch (e: ClosedReceiveChannelException) {
throw GattClosed("Gatt closed during requestMtu[mtu=$mtu]", e)
override suspend fun requestMtu(mtu: Int): OnMtuChanged =
performBluetoothAction("requestMtu", callback.onMtuChanged) {
bluetoothGatt.requestMtu(mtu)
}
}

override fun setCharacteristicNotification(
characteristic: BluetoothGattCharacteristic,
enable: Boolean
): Boolean {
Able.info { "setCharacteristicNotification ${characteristic.uuid} enable=$enable" }
Able.info { "setCharacteristicNotification → uuid=${characteristic.uuid}, enable=$enable" }
return bluetoothGatt.setCharacteristicNotification(characteristic, enable)
}

private suspend fun <T> performBluetoothAction(
methodName: String,
responseChannel: ReceiveChannel<T>,
action: () -> Boolean
): T {
Able.debug { "$methodName → Acquiring Gatt lock" }
callback.waitForGattReady()

Able.verbose { "$methodName → withContext" }
withContext(dispatcher) {
if (!action.invoke()) {
throw RemoteException("BluetoothGatt.$methodName returned false.")
}
}

Able.verbose { "$methodName ← Waiting for BluetoothGattCallback" }
val response = try {
responseChannel.receiveRequiringConnection()
} catch (e: ClosedReceiveChannelException) {
throw GattClosed("Gatt closed during $methodName", e)
}

Able.info { "$methodName$response" }
return response
}

private suspend fun <T> ReceiveChannel<T>.receiveRequiringConnection(): T = select {
onReceive { it }

Expand All @@ -263,5 +193,3 @@ class CoroutinesGatt(
.onReceive { throw GattConnectionLost() }
}
}

private val Int.bytesString get() = if (this == 1) "$this byte" else "$this bytes"
9 changes: 2 additions & 7 deletions core/src/main/java/Gatt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ import android.bluetooth.BluetoothGattCharacteristic.WRITE_TYPE_DEFAULT
import android.bluetooth.BluetoothGattDescriptor
import android.bluetooth.BluetoothGattService
import android.bluetooth.BluetoothProfile
import com.juul.able.experimental.messenger.OnCharacteristicChanged
import com.juul.able.experimental.messenger.OnCharacteristicRead
import com.juul.able.experimental.messenger.OnCharacteristicWrite
import com.juul.able.experimental.messenger.OnConnectionStateChange
import com.juul.able.experimental.messenger.OnDescriptorWrite
import com.juul.able.experimental.messenger.OnMtuChanged
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BroadcastChannel
import java.io.Closeable
Expand Down Expand Up @@ -97,5 +91,6 @@ interface Gatt : Closeable, CoroutineScope {
}

suspend fun Gatt.writeCharacteristic(
characteristic: BluetoothGattCharacteristic, value: ByteArray
characteristic: BluetoothGattCharacteristic,
value: ByteArray
): OnCharacteristicWrite = writeCharacteristic(characteristic, value, WRITE_TYPE_DEFAULT)
Loading

0 comments on commit edef027

Please sign in to comment.