Skip to content

Commit

Permalink
Add basic threading strategy support
Browse files Browse the repository at this point in the history
  • Loading branch information
twyatt committed Dec 18, 2023
1 parent 5c2e544 commit e6fd150
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 55 deletions.
29 changes: 29 additions & 0 deletions core/api/core.api
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ public final class com/juul/kable/ObservationExceptionPeripheral {
public abstract interface annotation class com/juul/kable/ObsoleteKableApi : java/lang/annotation/Annotation {
}

public final class com/juul/kable/OnDemandThreadingStrategy : com/juul/kable/ThreadingStrategy {
public static final field INSTANCE Lcom/juul/kable/OnDemandThreadingStrategy;
public fun acquire ()Lcom/juul/kable/Threading;
public fun release (Lcom/juul/kable/Threading;)V
}

public final class com/juul/kable/OutOfOrderGattCallbackException : java/lang/IllegalStateException {
}

Expand All @@ -289,11 +295,13 @@ public final class com/juul/kable/Peripheral$DefaultImpls {
public final class com/juul/kable/PeripheralBuilder {
public final fun autoConnectIf (Lkotlin/jvm/functions/Function0;)V
public final fun getPhy ()Lcom/juul/kable/Phy;
public final fun getThreadingStrategy ()Lcom/juul/kable/ThreadingStrategy;
public final fun getTransport ()Lcom/juul/kable/Transport;
public final fun logging (Lkotlin/jvm/functions/Function1;)V
public final fun observationExceptionHandler (Lkotlin/jvm/functions/Function3;)V
public final fun onServicesDiscovered (Lkotlin/jvm/functions/Function2;)V
public final fun setPhy (Lcom/juul/kable/Phy;)V
public final fun setThreadingStrategy (Lcom/juul/kable/ThreadingStrategy;)V
public final fun setTransport (Lcom/juul/kable/Transport;)V
}

Expand All @@ -316,6 +324,13 @@ public final class com/juul/kable/Phy : java/lang/Enum {
public static fun values ()[Lcom/juul/kable/Phy;
}

public final class com/juul/kable/PooledThreadingStrategy : com/juul/kable/ThreadingStrategy {
public synthetic fun <init> (Lkotlinx/coroutines/CoroutineScope;JILkotlin/jvm/internal/DefaultConstructorMarker;)V
public synthetic fun <init> (Lkotlinx/coroutines/CoroutineScope;JLkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun acquire ()Lcom/juul/kable/Threading;
public fun release (Lcom/juul/kable/Threading;)V
}

public final class com/juul/kable/ProfileKt {
public static final fun characteristicOf (Ljava/lang/String;Ljava/lang/String;)Lcom/juul/kable/Characteristic;
public static final fun descriptorOf (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Lcom/juul/kable/Descriptor;
Expand Down Expand Up @@ -469,6 +484,20 @@ public final class com/juul/kable/State$Disconnecting : com/juul/kable/State {
public static final field INSTANCE Lcom/juul/kable/State$Disconnecting;
}

public abstract class com/juul/kable/Threading {
}

public final class com/juul/kable/ThreadingKt {
public static final fun Threading (Ljava/lang/String;)Lcom/juul/kable/Threading;
public static final fun getName (Lcom/juul/kable/Threading;)Ljava/lang/String;
public static final fun shutdown (Lcom/juul/kable/Threading;)V
}

public abstract interface class com/juul/kable/ThreadingStrategy {
public abstract fun acquire ()Lcom/juul/kable/Threading;
public abstract fun release (Lcom/juul/kable/Threading;)V
}

public final class com/juul/kable/Transport : java/lang/Enum {
public static final field Auto Lcom/juul/kable/Transport;
public static final field BrEdr Lcom/juul/kable/Transport;
Expand Down
1 change: 1 addition & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ kotlin {
api(libs.kotlinx.coroutines.core)
api(libs.uuid)
implementation(libs.tuulbox.collections)
implementation(libs.datetime)

}

Expand Down
52 changes: 3 additions & 49 deletions core/src/androidMain/kotlin/BluetoothDevice.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,55 +12,11 @@ import android.bluetooth.BluetoothGatt
import android.bluetooth.BluetoothGattCallback
import android.content.Context
import android.os.Build
import android.os.Handler
import android.os.HandlerThread
import com.juul.kable.gatt.Callback
import com.juul.kable.logs.Logging
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.android.asCoroutineDispatcher
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.newSingleThreadContext

internal sealed class Threading {

abstract val dispatcher: CoroutineDispatcher

/** Available on Android O (API 26) and above. */
data class Handler(
val thread: HandlerThread,
val handler: android.os.Handler,
override val dispatcher: CoroutineDispatcher,
) : Threading()

/** Used on Android versions **lower** than Android O (API 26). */
data class SingleThreadContext(
override val dispatcher: ExecutorCoroutineDispatcher,
) : Threading()
}

internal fun Threading.close() {
when (this) {
is Threading.Handler -> thread.quit()
is Threading.SingleThreadContext -> dispatcher.close()
}
}

/**
* Creates the [Threading] that will be used for Bluetooth communication. The returned [Threading] is returned in a
* started state and must be shutdown when no longer needed.
*/
internal fun BluetoothDevice.threading(): Threading =
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
val thread = HandlerThread(threadName).apply { start() }
val handler = Handler(thread.looper)
val dispatcher = handler.asCoroutineDispatcher()
Threading.Handler(thread, handler, dispatcher)
} else {
Threading.SingleThreadContext(newSingleThreadContext(threadName))
}

/**
* @param transport is only used on API level >= 23.
Expand All @@ -76,9 +32,10 @@ internal fun BluetoothDevice.connect(
mtu: MutableStateFlow<Int?>,
onCharacteristicChanged: MutableSharedFlow<ObservationEvent<ByteArray>>,
logging: Logging,
threading: Threading,
threadingStrategy: ThreadingStrategy,
): Connection? {
val callback = Callback(state, mtu, onCharacteristicChanged, logging, address)
val threading = threadingStrategy.acquire()

val bluetoothGatt = when {
Build.VERSION.SDK_INT >= Build.VERSION_CODES.O -> {
Expand All @@ -93,7 +50,7 @@ internal fun BluetoothDevice.connect(
else -> connectGattCompat(context, autoConnect, callback, transport.intValue)
} ?: return null

return Connection(scope, bluetoothGatt, threading.dispatcher, callback, logging)
return Connection(scope, bluetoothGatt, threading, callback, logging)
}

private fun BluetoothDevice.connectGattCompat(
Expand Down Expand Up @@ -121,6 +78,3 @@ private val Phy.intValue: Int
Phy.Le2M -> PHY_LE_2M_MASK
Phy.LeCoded -> PHY_LE_CODED_MASK
}

private val BluetoothDevice.threadName: String
get() = "Gatt@$this"
15 changes: 10 additions & 5 deletions core/src/androidMain/kotlin/BluetoothDeviceAndroidPeripheral.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ internal class BluetoothDeviceAndroidPeripheral(
private val autoConnectPredicate: () -> Boolean,
private val transport: Transport,
private val phy: Phy,
private val threadingStrategy: ThreadingStrategy,
observationExceptionHandler: ObservationExceptionHandler,
private val onServicesDiscovered: ServicesDiscoveredAction,
private val logging: Logging,
Expand All @@ -82,9 +83,6 @@ internal class BluetoothDeviceAndroidPeripheral(

override val identifier: String = bluetoothDevice.address

// todo: Spin up/down w/ connection, rather than matching lifecycle of peripheral.
private val threading = bluetoothDevice.threading()

private val _mtu = MutableStateFlow<Int?>(null)
override val mtu: StateFlow<Int?> = _mtu.asStateFlow()

Expand Down Expand Up @@ -126,7 +124,7 @@ internal class BluetoothDeviceAndroidPeripheral(
_mtu,
observers.characteristicChanges,
logging,
threading,
threadingStrategy,
) ?: throw ConnectionRejectedException()

suspendUntilOrThrow<State.Connecting.Services>()
Expand Down Expand Up @@ -192,17 +190,24 @@ internal class BluetoothDeviceAndroidPeripheral(
connectAction.cancelAndJoin(CancellationException(NotConnectedException()))
}
suspendUntil<Disconnected>()
releaseThread()
logger.info { message = "Disconnected" }
}

private fun releaseThread() {
_connection?.threading?.let {
threadingStrategy.release(it)
}
}

private fun dispose(cause: Throwable?) {
closeConnection()
threading.close()
logger.info(cause) { message = "Disposed" }
}

private fun closeConnection() {
_connection?.bluetoothGatt?.close()
releaseThread()
setDisconnected()
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/androidMain/kotlin/Connection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private val GattSuccess = GattStatus(GATT_SUCCESS)
internal class Connection(
private val scope: CoroutineScope,
internal val bluetoothGatt: BluetoothGatt,
internal val dispatcher: CoroutineDispatcher,
internal val threading: Threading,
private val callback: Callback,
logging: Logging,
) {
Expand All @@ -34,6 +34,8 @@ internal class Connection(
private val lock = Mutex()
private var deferredResponse: Deferred<Response>? = null

internal val dispatcher = threading.dispatcher

/**
* Executes specified [BluetoothGatt] [action].
*
Expand Down
1 change: 1 addition & 0 deletions core/src/androidMain/kotlin/Peripheral.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public fun CoroutineScope.peripheral(
builder.autoConnectPredicate,
builder.transport,
builder.phy,
builder.threadingStrategy,
builder.observationExceptionHandler,
builder.onServicesDiscovered,
builder.logging,
Expand Down
2 changes: 2 additions & 0 deletions core/src/androidMain/kotlin/PeripheralBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,6 @@ public actual class PeripheralBuilder internal actual constructor() {

/** Preferred PHY for connections to remote LE device. */
public var phy: Phy = Phy.Le1M

public var threadingStrategy: ThreadingStrategy = OnDemandThreadingStrategy
}
57 changes: 57 additions & 0 deletions core/src/androidMain/kotlin/Threading.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.juul.kable

import android.os.Build
import android.os.Handler
import android.os.HandlerThread
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.ExecutorCoroutineDispatcher
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.android.asCoroutineDispatcher
import kotlinx.coroutines.newSingleThreadContext

public sealed class Threading {

internal abstract val dispatcher: CoroutineDispatcher

/** Used on Android O (API 26) and above. */
internal data class Handler(
val thread: HandlerThread,
val handler: android.os.Handler,
override val dispatcher: CoroutineDispatcher,
) : Threading()

/** Used on Android versions **lower** than Android O (API 26). */
internal data class SingleThreadContext(
val name: String,
override val dispatcher: ExecutorCoroutineDispatcher,
) : Threading()
}

public val Threading.name: String
get() = when (this) {
is Threading.Handler -> thread.name
is Threading.SingleThreadContext -> name
}

public fun Threading.shutdown() {
when (this) {
is Threading.Handler -> thread.quit()
is Threading.SingleThreadContext -> dispatcher.close()
}
}

/**
* Creates [Threading] that can be used for Bluetooth communication. The returned [Threading] is
* returned in a started state and must be [shutdown] when no longer needed.
*/
public fun Threading(name: String): Threading =
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
val thread = HandlerThread(name).apply { start() }
val handler = Handler(thread.looper)
val dispatcher = handler.asCoroutineDispatcher()
Threading.Handler(thread, handler, dispatcher)
} else { // Build.VERSION.SDK_INT < Build.VERSION_CODES.O
@OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class)
Threading.SingleThreadContext(name, newSingleThreadContext(name))
}
62 changes: 62 additions & 0 deletions core/src/androidMain/kotlin/ThreadingStrategy.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.juul.kable

import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.locks.reentrantLock
import kotlinx.atomicfu.locks.withLock
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.LAZY
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
import kotlin.time.TimeMark
import kotlin.time.TimeSource

private val threadNumber = atomic(0)
private fun generateThreadName() = "Kable#${threadNumber.incrementAndGet()}"

public interface ThreadingStrategy {
public fun acquire(): Threading
public fun release(threading: Threading)
}

public object OnDemandThreadingStrategy : ThreadingStrategy {

override fun acquire(): Threading = Threading(generateThreadName())

override fun release(threading: Threading) {
threading.shutdown()
}
}

public class PooledThreadingStrategy(
scope: CoroutineScope = GlobalScope,
private val evictAfter: Duration = 1.minutes,
) : ThreadingStrategy {

private val pool = mutableListOf<Pair<TimeMark, Threading>>()
private val guard = reentrantLock()

private val evictionJob = scope.launch(start = LAZY) {
while (true) {
guard.withLock {
pool.removeAll { (timeMark) -> timeMark.hasPassedNow() }
}
delay(evictAfter / 2)
}
}

internal fun start(): Boolean = evictionJob.start()

override fun acquire(): Threading = guard.withLock {
pool.removeFirstOrNull()?.second
} ?: Threading(generateThreadName())

override fun release(threading: Threading) {
guard.withLock {
val evictAt = TimeSource.Monotonic.markNow() + evictAfter
pool.add(evictAt to threading)
}
}
}
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ tuulbox = "7.0.0"
androidx-core = { module = "androidx.core:core-ktx", version = "1.12.0" }
androidx-startup = { module = "androidx.startup:startup-runtime", version = "1.1.1" }
atomicfu = { module = "org.jetbrains.kotlinx:atomicfu", version.ref = "atomicfu" }
datetime = { module = "org.jetbrains.kotlinx:kotlinx-datetime", version = "0.5.0" }
kotlinx-coroutines-android = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-android", version.ref = "coroutines" }
kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutines" }
kotlinx-coroutines-test = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "coroutines" }
Expand Down
5 changes: 5 additions & 0 deletions kotlin-js-store/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
"@jridgewell/resolve-uri" "^3.0.3"
"@jridgewell/sourcemap-codec" "^1.4.10"

"@js-joda/core@3.2.0":
version "3.2.0"
resolved "https://registry.yarnpkg.com/@js-joda/core/-/core-3.2.0.tgz#3e61e21b7b2b8a6be746df1335cf91d70db2a273"
integrity sha512-PMqgJ0sw5B7FKb2d5bWYIoxjri+QlW/Pys7+Rw82jSH0QN3rB05jZ/VrrsUdh1w4+i2kw9JOejXGq/KhDOX7Kg==

"@types/component-emitter@^1.2.10":
version "1.2.11"
resolved "https://registry.yarnpkg.com/@types/component-emitter/-/component-emitter-1.2.11.tgz#50d47d42b347253817a39709fef03ce66a108506"
Expand Down

0 comments on commit e6fd150

Please sign in to comment.