Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic threading strategy support #612

Merged
merged 16 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ tuulbox = "8.0.0"
androidx-core = { module = "androidx.core:core-ktx", version = "1.13.1" }
androidx-startup = { module = "androidx.startup:startup-runtime", version = "1.1.1" }
atomicfu = { module = "org.jetbrains.kotlinx:atomicfu", version.ref = "atomicfu" }
datetime = { module = "org.jetbrains.kotlinx:kotlinx-datetime", version = "0.6.1" }
khronicle = { module = "com.juul.khronicle:khronicle-core", version = "0.3.0" }
kotlinx-coroutines-android = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-android", version.ref = "coroutines" }
kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutines" }
Expand Down
30 changes: 30 additions & 0 deletions kable-core/api/android/kable-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ public final class com/juul/kable/ObservationExceptionPeripheral {
public abstract interface annotation class com/juul/kable/ObsoleteKableApi : java/lang/annotation/Annotation {
}

public final class com/juul/kable/OnDemandThreadingStrategy : com/juul/kable/ThreadingStrategy {
public static final field INSTANCE Lcom/juul/kable/OnDemandThreadingStrategy;
public fun acquire ()Lcom/juul/kable/Threading;
public fun release (Lcom/juul/kable/Threading;)V
}

public final class com/juul/kable/OutOfOrderGattCallbackException : java/lang/IllegalStateException {
}

Expand All @@ -310,11 +316,13 @@ public final class com/juul/kable/Peripheral$DefaultImpls {
public final class com/juul/kable/PeripheralBuilder {
public final fun autoConnectIf (Lkotlin/jvm/functions/Function0;)V
public final fun getPhy ()Lcom/juul/kable/Phy;
public final fun getThreadingStrategy ()Lcom/juul/kable/ThreadingStrategy;
public final fun getTransport ()Lcom/juul/kable/Transport;
public final fun logging (Lkotlin/jvm/functions/Function1;)V
public final fun observationExceptionHandler (Lkotlin/jvm/functions/Function3;)V
public final fun onServicesDiscovered (Lkotlin/jvm/functions/Function2;)V
public final fun setPhy (Lcom/juul/kable/Phy;)V
public final fun setThreadingStrategy (Lcom/juul/kable/ThreadingStrategy;)V
public final fun setTransport (Lcom/juul/kable/Transport;)V
}

Expand Down Expand Up @@ -352,6 +360,14 @@ public final class com/juul/kable/PlatformAdvertisement$BondState : java/lang/En
public static fun values ()[Lcom/juul/kable/PlatformAdvertisement$BondState;
}

public final class com/juul/kable/PooledThreadingStrategy : com/juul/kable/ThreadingStrategy {
public synthetic fun <init> (Lkotlinx/coroutines/CoroutineScope;JILkotlin/jvm/internal/DefaultConstructorMarker;)V
public synthetic fun <init> (Lkotlinx/coroutines/CoroutineScope;JLkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun acquire ()Lcom/juul/kable/Threading;
public final fun cancel ()V
public fun release (Lcom/juul/kable/Threading;)V
}

public final class com/juul/kable/ProfileKt {
public static final fun characteristicOf (Ljava/lang/String;Ljava/lang/String;)Lcom/juul/kable/Characteristic;
public static final fun descriptorOf (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Lcom/juul/kable/Descriptor;
Expand Down Expand Up @@ -513,6 +529,20 @@ public final class com/juul/kable/State$Disconnecting : com/juul/kable/State {
public static final field INSTANCE Lcom/juul/kable/State$Disconnecting;
}

public abstract class com/juul/kable/Threading {
}

public final class com/juul/kable/ThreadingKt {
public static final fun Threading (Lcom/juul/kable/ThreadingStrategy;Ljava/lang/String;)Lcom/juul/kable/Threading;
public static final fun getName (Lcom/juul/kable/Threading;)Ljava/lang/String;
public static final fun shutdown (Lcom/juul/kable/Threading;)V
}

public abstract interface class com/juul/kable/ThreadingStrategy {
public abstract fun acquire ()Lcom/juul/kable/Threading;
public abstract fun release (Lcom/juul/kable/Threading;)V
}

public final class com/juul/kable/Transport : java/lang/Enum {
public static final field Auto Lcom/juul/kable/Transport;
public static final field BrEdr Lcom/juul/kable/Transport;
Expand Down
1 change: 1 addition & 0 deletions kable-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ kotlin {
api(libs.kotlinx.coroutines.core)
api(libs.uuid)
api(project(":kable-exceptions"))
implementation(libs.datetime)
implementation(libs.tuulbox.collections)
}

Expand Down
82 changes: 23 additions & 59 deletions kable-core/src/androidMain/kotlin/BluetoothDevice.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,55 +12,11 @@ import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCallback
import android.content.Context
import android.os.Build
import android.os.Handler
import android.os.HandlerThread
import com.juul.kable.gatt.Callback
import com.juul.kable.logs.Logging
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.android.asCoroutineDispatcher
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.newSingleThreadContext

internal sealed class Threading {

abstract val dispatcher: CoroutineDispatcher

/** Available on Android O (API 26) and above. */
data class Handler(
val thread: HandlerThread,
val handler: android.os.Handler,
override val dispatcher: CoroutineDispatcher,
) : Threading()

/** Used on Android versions **lower** than Android O (API 26). */
data class SingleThreadContext(
override val dispatcher: ExecutorCoroutineDispatcher,
) : Threading()
}

internal fun Threading.close() {
when (this) {
is Threading.Handler -> thread.quit()
is Threading.SingleThreadContext -> dispatcher.close()
}
}

/**
* Creates the [Threading] that will be used for Bluetooth communication. The returned [Threading] is returned in a
* started state and must be shutdown when no longer needed.
*/
internal fun BluetoothDevice.threading(): Threading =
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
val thread = HandlerThread(threadName).apply { start() }
val handler = Handler(thread.looper)
val dispatcher = handler.asCoroutineDispatcher()
Threading.Handler(thread, handler, dispatcher)
} else {
Threading.SingleThreadContext(newSingleThreadContext(threadName))
}

/**
* @param transport is only used on API level >= 23.
Expand All @@ -76,24 +32,35 @@ internal fun BluetoothDevice.connect(
mtu: MutableStateFlow<Int?>,
onCharacteristicChanged: MutableSharedFlow<ObservationEvent<ByteArray>>,
logging: Logging,
threading: Threading,
threadingStrategy: ThreadingStrategy,
): Connection? {
val callback = Callback(state, mtu, onCharacteristicChanged, logging, address)
val threading = threadingStrategy.acquire()

val bluetoothGatt = when {
Build.VERSION.SDK_INT >= Build.VERSION_CODES.O -> {
val handler = (threading as Threading.Handler).handler
connectGatt(context, autoConnect, callback, transport.intValue, phy.intValue, handler)
}
val bluetoothGatt = try {
when {
Build.VERSION.SDK_INT >= Build.VERSION_CODES.O -> {
val handler = (threading as Threading.Handler).handler
connectGatt(context, autoConnect, callback, transport.intValue, phy.intValue, handler)
}

Build.VERSION.SDK_INT <= Build.VERSION_CODES.M && autoConnect ->
connectGattWithReflection(context, true, callback, transport.intValue)
?: connectGattCompat(context, true, callback, transport.intValue)
Build.VERSION.SDK_INT <= Build.VERSION_CODES.M && autoConnect ->
connectGattWithReflection(context, true, callback, transport.intValue)
?: connectGattCompat(context, true, callback, transport.intValue)

else -> connectGattCompat(context, autoConnect, callback, transport.intValue)
} ?: return null
else -> connectGattCompat(context, autoConnect, callback, transport.intValue)
}
} catch (t: Throwable) {
threading.release()
throw t
}

if (bluetoothGatt == null) {
threading.release()
return null
}

return Connection(scope, bluetoothGatt, threading.dispatcher, callback, logging)
return Connection(scope, bluetoothGatt, threading, callback, logging)
}

private fun BluetoothDevice.connectGattCompat(
Expand Down Expand Up @@ -121,6 +88,3 @@ private val Phy.intValue: Int
Phy.Le2M -> PHY_LE_2M_MASK
Phy.LeCoded -> PHY_LE_CODED_MASK
}

private val BluetoothDevice.threadName: String
get() = "Gatt@$this"
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ internal class BluetoothDeviceAndroidPeripheral(
private val autoConnectPredicate: () -> Boolean,
private val transport: Transport,
private val phy: Phy,
private val threadingStrategy: ThreadingStrategy,
observationExceptionHandler: ObservationExceptionHandler,
private val onServicesDiscovered: ServicesDiscoveredAction,
private val logging: Logging,
Expand All @@ -76,9 +77,6 @@ internal class BluetoothDeviceAndroidPeripheral(

override val identifier: String = bluetoothDevice.address

// todo: Spin up/down w/ connection, rather than matching lifecycle of peripheral.
private val threading = bluetoothDevice.threading()

private val _mtu = MutableStateFlow<Int?>(null)
override val mtu: StateFlow<Int?> = _mtu.asStateFlow()

Expand Down Expand Up @@ -131,7 +129,7 @@ internal class BluetoothDeviceAndroidPeripheral(
_mtu,
observers.characteristicChanges,
logging,
threading,
threadingStrategy,
) ?: throw ConnectionRejectedException()

suspendUntilOrThrow<State.Connecting.Services>()
Expand Down Expand Up @@ -197,17 +195,22 @@ internal class BluetoothDeviceAndroidPeripheral(
connectAction.cancelAndJoin(CancellationException(NotConnectedException()))
}
suspendUntil<Disconnected>()
releaseThread()
logger.info { message = "Disconnected" }
}

private fun releaseThread() {
_connection?.threading?.release()
}

private fun dispose(cause: Throwable?) {
closeConnection()
threading.close()
logger.info(cause) { message = "Disposed" }
}

private fun closeConnection() {
_connection?.bluetoothGatt?.close()
releaseThread()
setDisconnected()
}

Expand Down
4 changes: 3 additions & 1 deletion kable-core/src/androidMain/kotlin/Connection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private val GattSuccess = GattStatus(GATT_SUCCESS)
internal class Connection(
private val scope: CoroutineScope,
internal val bluetoothGatt: BluetoothGatt,
internal val dispatcher: CoroutineDispatcher,
internal val threading: Threading,
private val callback: Callback,
logging: Logging,
) {
Expand All @@ -34,6 +34,8 @@ internal class Connection(
private val lock = Mutex()
private var deferredResponse: Deferred<Response>? = null

internal val dispatcher = threading.dispatcher

/**
* Executes specified [BluetoothGatt] [action].
*
Expand Down
1 change: 1 addition & 0 deletions kable-core/src/androidMain/kotlin/Peripheral.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public fun CoroutineScope.peripheral(
builder.autoConnectPredicate,
builder.transport,
builder.phy,
builder.threadingStrategy,
builder.observationExceptionHandler,
builder.onServicesDiscovered,
builder.logging,
Expand Down
2 changes: 2 additions & 0 deletions kable-core/src/androidMain/kotlin/PeripheralBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,6 @@ public actual class PeripheralBuilder internal actual constructor() {

/** Preferred PHY for connections to remote LE device. */
public var phy: Phy = Phy.Le1M

public var threadingStrategy: ThreadingStrategy = OnDemandThreadingStrategy
}
64 changes: 64 additions & 0 deletions kable-core/src/androidMain/kotlin/Threading.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.juul.kable

import android.os.Build
import android.os.Handler
import android.os.HandlerThread
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.android.asCoroutineDispatcher
import kotlinx.coroutines.newSingleThreadContext

public sealed class Threading {

internal abstract val dispatcher: CoroutineDispatcher
internal abstract val strategy: ThreadingStrategy

/** Used on Android O (API 26) and above. */
internal data class Handler(
val thread: HandlerThread,
val handler: android.os.Handler,
override val dispatcher: CoroutineDispatcher,
override val strategy: ThreadingStrategy,
) : Threading()

/** Used on Android versions **lower** than Android O (API 26). */
internal data class SingleThreadContext(
val name: String,
override val dispatcher: ExecutorCoroutineDispatcher,
override val strategy: ThreadingStrategy,
) : Threading()
}

internal fun Threading.release() {
strategy.release(this)
}

public val Threading.name: String
get() = when (this) {
is Threading.Handler -> thread.name
is Threading.SingleThreadContext -> name
}

public fun Threading.shutdown() {
when (this) {
is Threading.Handler -> thread.quit()
is Threading.SingleThreadContext -> dispatcher.close()
}
}

/**
* Creates [Threading] that can be used for Bluetooth communication. The returned [Threading] is
* returned in a started state and must be [shutdown] when no longer needed.
*/
public fun ThreadingStrategy.Threading(name: String): Threading =
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
val thread = HandlerThread(name).apply { start() }
val handler = Handler(thread.looper)
val dispatcher = handler.asCoroutineDispatcher()
Threading.Handler(thread, handler, dispatcher, this)
} else { // Build.VERSION.SDK_INT < Build.VERSION_CODES.O
@OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class)
Threading.SingleThreadContext(name, newSingleThreadContext(name), this)
}
Loading