Skip to content

Commit

Permalink
Handle when peripheral services change
Browse files Browse the repository at this point in the history
  • Loading branch information
twyatt committed Sep 19, 2024
1 parent dc0d94f commit 8159a2f
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ import kotlinx.coroutines.flow.onEach
import kotlin.coroutines.cancellation.CancellationException
import kotlin.time.Duration

// Number of service discovery attempts to make if no services are discovered.
// https://github.com/JuulLabs/kable/issues/295
private const val DISCOVER_SERVICES_RETRIES = 5

internal class BluetoothDeviceAndroidPeripheral(
private val bluetoothDevice: BluetoothDevice,
private val autoConnectPredicate: () -> Boolean,
Expand Down Expand Up @@ -163,7 +167,7 @@ internal class BluetoothDeviceAndroidPeripheral(
}.rssi

private suspend fun discoverServices() {
connectionOrThrow().discoverServices()
connectionOrThrow().discoverServices(retries = DISCOVER_SERVICES_RETRIES)
unwrapCancellationExceptions {
onServicesDiscovered(ServicesDiscoveredPeripheral(this))
}
Expand Down
24 changes: 14 additions & 10 deletions kable-core/src/androidMain/kotlin/Connection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
Expand All @@ -39,10 +42,6 @@ import kotlin.reflect.KClass
import kotlin.time.Duration
import kotlin.time.Duration.Companion.ZERO

// Number of service discovery attempts to make if no services are discovered.
// https://github.com/JuulLabs/kable/issues/295
private const val DISCOVER_SERVICES_RETRIES = 5

private val GattSuccess = GattStatus(GATT_SUCCESS)

/**
Expand Down Expand Up @@ -87,6 +86,7 @@ internal class Connection(
require(disconnectTimeout > ZERO) { "Disconnect timeout must be >0, was $disconnectTimeout" }

onDispose(::disconnect)
onServiceChanged(::discoverServices)

on<Disconnected> {
val state = it.toString()
Expand All @@ -96,26 +96,23 @@ internal class Connection(
}
dispose(NotConnectedException("Disconnect detected"))
}

// todo: Monitor onServicesChanged event to re-`discoverServices`.
// https://github.com/JuulLabs/kable/issues/662
}

private val dispatcher = connectionScope.coroutineContext + threading.dispatcher
private val guard = Mutex()

suspend fun discoverServices() {
suspend fun discoverServices(retries: Int = 1) {
logger.verbose { message = "Discovering services" }

repeat(DISCOVER_SERVICES_RETRIES) { attempt ->
repeat(retries) { attempt ->
val discoveredServices = execute<OnServicesDiscovered> {
discoverServicesOrThrow()
}.services.map(::DiscoveredService)

if (discoveredServices.isEmpty()) {
logger.warn {
message = "Empty services"
detail("attempt", "${attempt + 1} of $DISCOVER_SERVICES_RETRIES")
detail("attempt", "${attempt + 1} of $retries")
}
} else {
logger.verbose { message = "Discovered ${discoveredServices.count()} services" }
Expand Down Expand Up @@ -253,6 +250,13 @@ internal class Connection(
}
}

private fun onServiceChanged(action: suspend () -> Unit) {
callback.onServiceChanged
.receiveAsFlow()
.onEach { action() }
.launchIn(taskScope)
}

private fun onDispose(action: suspend () -> Unit) {
@Suppress("OPT_IN_USAGE")
connectionScope.launch(start = ATOMIC) {
Expand Down
3 changes: 2 additions & 1 deletion kable-core/src/androidMain/kotlin/gatt/Callback.kt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ internal class Callback(

val onResponse = Channel<Response>(CONFLATED)
val onMtuChanged = Channel<OnMtuChanged>(CONFLATED)
val onServiceChanged = Channel<OnServiceChanged>(CONFLATED)

override fun onPhyUpdate(
gatt: BluetoothGatt,
Expand Down Expand Up @@ -275,7 +276,7 @@ internal class Callback(

override fun onServiceChanged(gatt: BluetoothGatt) {
logger.debug { message = "onServiceChanged" }
// todo: https://github.com/JuulLabs/kable/issues/662
onServiceChanged.trySendOrLog(OnServiceChanged)
}

private fun <E> SendChannel<E>.trySendOrLog(element: E) {
Expand Down
2 changes: 2 additions & 0 deletions kable-core/src/androidMain/kotlin/gatt/Response.kt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ internal data class OnMtuChanged(
override val status: GattStatus,
) : Response()

internal object OnServiceChanged

/**
* Represents the possible GATT statuses as defined in [BluetoothGatt]:
*
Expand Down
22 changes: 15 additions & 7 deletions kable-core/src/appleMain/kotlin/Connection.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.juul.kable

import com.benasher44.uuid.Uuid
import com.juul.kable.PeripheralDelegate.Response
import com.juul.kable.PeripheralDelegate.Response.DidDiscoverCharacteristicsForService
import com.juul.kable.PeripheralDelegate.Response.DidDiscoverDescriptorsForCharacteristic
Expand All @@ -22,6 +21,9 @@ import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
Expand Down Expand Up @@ -66,6 +68,9 @@ internal class Connection(

init {
onDispose(::disconnect)
onServiceChanged { invalidatedServices ->
discoverServices(invalidatedServices.map { cbService -> cbService.UUID })
}

on<Disconnected> {
val state = it.toString()
Expand All @@ -80,15 +85,11 @@ internal class Connection(
private val dispatcher = connectionScope.coroutineContext + central.dispatcher
internal val guard = Mutex()

suspend fun discoverServices(): Unit = discoverServices(serviceUuids = null)

/** @param serviceUuids to discover (list of service UUIDs), or `null` for all. */
suspend fun discoverServices(serviceUuids: List<Uuid>?) {
suspend fun discoverServices(serviceUuids: List<CBUUID>? = null) {
logger.verbose { message = "discoverServices" }
val cbUuids = serviceUuids?.map { uuid -> CBUUID.UUIDWithNSUUID(uuid.toNSUUID()) }

execute<DidDiscoverServices> {
peripheral.discoverServices(cbUuids)
peripheral.discoverServices(serviceUuids)
}

// Cast should be safe since `CBPeripheral.services` type is `[CBService]?`, according to:
Expand Down Expand Up @@ -211,6 +212,13 @@ internal class Connection(
}
}

private fun onServiceChanged(action: suspend (List<CBService>) -> Unit) {
delegate.onServiceChanged
.receiveAsFlow()
.onEach { (invalidatedServices) -> action(invalidatedServices) }
.launchIn(taskScope)
}

private fun onDispose(action: suspend () -> Unit) {
@Suppress("OPT_IN_USAGE")
connectionScope.launch(start = ATOMIC) {
Expand Down
15 changes: 14 additions & 1 deletion kable-core/src/appleMain/kotlin/PeripheralDelegate.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.juul.kable.logs.detail
import kotlinx.cinterop.ObjCSignatureOverride
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
Expand Down Expand Up @@ -84,9 +85,15 @@ internal class PeripheralDelegate(
) : Response()
}

data class OnServiceChanged(
val cbServices: List<CBService>,
)

private val _response = Channel<Response>(BUFFERED)
val response: ReceiveChannel<Response> = _response

val onServiceChanged = Channel<OnServiceChanged>(CONFLATED)

private val logger = Logger(logging, tag = "Kable/Delegate", identifier = identifier)

/* Discovering Services */
Expand Down Expand Up @@ -288,10 +295,16 @@ internal class PeripheralDelegate(
peripheral: CBPeripheral,
didModifyServices: List<*>,
) {
// Cast should be safe since `didModifyServices` type is `[CBService]`, according to:
// https://developer.apple.com/documentation/corebluetooth/cbperipheraldelegate/peripheral(_:didmodifyservices:)
@Suppress("UNCHECKED_CAST")
val invalidatedServices = didModifyServices as List<CBService>

logger.debug {
message = "didModifyServices"
detail("invalidatedServices", invalidatedServices.toString())
}
// todo
onServiceChanged.sendBlocking(OnServiceChanged(invalidatedServices))
}

/* Monitoring L2CAP Channels */
Expand Down
6 changes: 6 additions & 0 deletions kable-core/src/commonMain/kotlin/PeripheralBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ internal val defaultDisconnectTimeout = 5.seconds

public expect class PeripheralBuilder internal constructor() {
public fun logging(init: LoggingBuilder)

/**
* Registers a [ServicesDiscoveredAction] for the [Peripheral] that is invoked after initial
* service discover (upon establishing a connection). Is **not** invoked upon subsequent service
* re-discoveries (due to peripheral service database changing while connected).
*/
public fun onServicesDiscovered(action: ServicesDiscoveredAction)

/**
Expand Down

0 comments on commit 8159a2f

Please sign in to comment.