Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
Have KeepAliveGatt state backed by StateFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
twyatt committed May 21, 2020
1 parent 874cbd3 commit a07a298
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 131 deletions.
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ext.versions = [
ext.deps = [
kotlin: [
stdlib: "org.jetbrains.kotlin:kotlin-stdlib",
coroutines: "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.5",
coroutines: "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7",
junit: "org.jetbrains.kotlin:kotlin-test-junit",
],

Expand Down
65 changes: 24 additions & 41 deletions keep-alive/src/main/java/KeepAliveGatt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import com.juul.able.gatt.OnDescriptorWrite
import com.juul.able.gatt.OnMtuChanged
import com.juul.able.gatt.OnReadRemoteRssi
import com.juul.able.gatt.WriteType
import com.juul.able.keepalive.KeepAliveGatt.Configuration
import com.juul.able.keepalive.State.Connected
import com.juul.able.keepalive.State.Connecting
import com.juul.able.keepalive.State.Disconnected
Expand All @@ -32,9 +31,10 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletionHandler
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
Expand All @@ -43,13 +43,11 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
Expand All @@ -70,58 +68,43 @@ enum class State {
fun CoroutineScope.keepAliveGatt(
androidContext: Context,
bluetoothDevice: BluetoothDevice,
configuration: Configuration
disconnectTimeoutMillis: Long,
onConnectAction: ConnectAction? = null
) = KeepAliveGatt(
parentCoroutineContext = coroutineContext,
androidContext = androidContext,
bluetoothDevice = bluetoothDevice,
configuration = configuration
disconnectTimeoutMillis = disconnectTimeoutMillis,
onConnectAction = onConnectAction
)

class KeepAliveGatt(
parentCoroutineContext: CoroutineContext = EmptyCoroutineContext,
androidContext: Context,
private val bluetoothDevice: BluetoothDevice,
configuration: Configuration
private val disconnectTimeoutMillis: Long,
private val onConnectAction: ConnectAction? = null
) : GattIo {

data class Configuration(
val disconnectTimeoutMillis: Long,
val exceptionHandler: CoroutineExceptionHandler? = null,
val onConnectAction: ConnectAction? = null,
internal val stateCapacity: Int = CONFLATED
)

private val applicationContext = androidContext.applicationContext

private val job = SupervisorJob(parentCoroutineContext[Job])
private val scope = CoroutineScope(
parentCoroutineContext + job + (configuration.exceptionHandler ?: EmptyCoroutineContext)
)
private val scope = CoroutineScope(parentCoroutineContext + job)

private val isRunning = AtomicBoolean()

private val disconnectTimeoutMillis = configuration.disconnectTimeoutMillis
private val onConnectAction = configuration.onConnectAction

@Volatile
private var _gatt: GattIo? = null
private val gatt: GattIo
inline get() = _gatt ?: throw NotReady(toString())

private val _state = BroadcastChannel<State>(configuration.stateCapacity)
private val _state = MutableStateFlow(Disconnected)

@FlowPreview
val state: Flow<State> = _state.asFlow()
val state: Flow<State> = _state

private val _onCharacteristicChanged = BroadcastChannel<OnCharacteristicChanged>(BUFFERED)

// todo: Replace with `trySend` when Kotlin/kotlinx.coroutines#974 is fixed.
// https://github.com/Kotlin/kotlinx.coroutines/issues/974
private fun setState(state: State) {
_state.offerCatching(state)
}

fun connect(): Boolean {
check(!job.isCancelled) { "Cannot connect, $this is closed" }
isRunning.compareAndSet(false, true) || return false
Expand All @@ -131,7 +114,7 @@ class KeepAliveGatt(
try {
spawnConnection()
} finally {
setState(Disconnected)
_state.value = Disconnected
}
}
}
Expand All @@ -146,7 +129,6 @@ class KeepAliveGatt(
private fun cancel(cause: CancellationException?) {
job.cancel(cause)
_onCharacteristicChanged.cancel()
_state.cancel()
}

fun cancel() {
Expand All @@ -156,7 +138,6 @@ class KeepAliveGatt(
suspend fun cancelAndJoin() {
job.cancelAndJoin()
_onCharacteristicChanged.cancel()
_state.cancel()
}

// todo: Fix `@see` documentation link when https://github.com/Kotlin/dokka/issues/80 is fixed.
Expand All @@ -166,7 +147,7 @@ class KeepAliveGatt(
): DisposableHandle = job.invokeOnCompletion(handler)

private suspend fun spawnConnection() {
setState(Connecting)
_state.value = Connecting

val gatt = when (val result = bluetoothDevice.connectGatt(applicationContext)) {
is Success -> result.gatt
Expand All @@ -184,14 +165,14 @@ class KeepAliveGatt(
coroutineScope {
gatt.onCharacteristicChanged
.onEach(_onCharacteristicChanged::send)
.launchIn(this)
.launchIn(this, start = UNDISPATCHED)
onConnectAction?.invoke(gatt)
_gatt = gatt
setState(Connected)
_state.value = Connected
}
} finally {
_gatt = null
setState(State.Disconnecting)
_state.value = State.Disconnecting
withContext(NonCancellable) {
withTimeoutOrNull(disconnectTimeoutMillis) {
gatt.disconnect()
Expand Down Expand Up @@ -236,10 +217,12 @@ class KeepAliveGatt(
override suspend fun readRemoteRssi(): OnReadRemoteRssi = gatt.readRemoteRssi()

override fun toString() =
"KeepAliveGatt(device=$bluetoothDevice, gatt=$_gatt, state=${_state.consume { poll() }})"
"KeepAliveGatt(device=$bluetoothDevice, gatt=$_gatt, state=${_state.value})"
}

// https://github.com/Kotlin/kotlinx.coroutines/issues/974
private fun <E> SendChannel<E>.offerCatching(element: E): Boolean {
return runCatching { offer(element) }.getOrDefault(false)
private fun <T> Flow<T>.launchIn(
scope: CoroutineScope,
start: CoroutineStart = CoroutineStart.DEFAULT
): Job = scope.launch(start = start) {
collect()
}
Loading

0 comments on commit a07a298

Please sign in to comment.