Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
Have KeepAliveGatt state backed by StateFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
twyatt committed May 21, 2020
1 parent 874cbd3 commit ee29e2d
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 142 deletions.
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ext.versions = [
ext.deps = [
kotlin: [
stdlib: "org.jetbrains.kotlin:kotlin-stdlib",
coroutines: "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.5",
coroutines: "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7",
junit: "org.jetbrains.kotlin:kotlin-test-junit",
],

Expand Down
73 changes: 28 additions & 45 deletions keep-alive/src/main/java/KeepAliveGatt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@ import com.juul.able.gatt.OnDescriptorWrite
import com.juul.able.gatt.OnMtuChanged
import com.juul.able.gatt.OnReadRemoteRssi
import com.juul.able.gatt.WriteType
import com.juul.able.keepalive.KeepAliveGatt.Configuration
import com.juul.able.keepalive.State.Connected
import com.juul.able.keepalive.State.Connecting
import com.juul.able.keepalive.State.Disconnected
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletionHandler
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
Expand All @@ -43,18 +39,20 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.consume
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext

typealias ConnectAction = suspend GattIo.() -> Unit

Expand All @@ -70,58 +68,43 @@ enum class State {
fun CoroutineScope.keepAliveGatt(
androidContext: Context,
bluetoothDevice: BluetoothDevice,
configuration: Configuration
disconnectTimeoutMillis: Long,
onConnectAction: ConnectAction? = null
) = KeepAliveGatt(
parentCoroutineContext = coroutineContext,
androidContext = androidContext,
bluetoothDevice = bluetoothDevice,
configuration = configuration
disconnectTimeoutMillis = disconnectTimeoutMillis,
onConnectAction = onConnectAction
)

class KeepAliveGatt(
parentCoroutineContext: CoroutineContext = EmptyCoroutineContext,
androidContext: Context,
private val bluetoothDevice: BluetoothDevice,
configuration: Configuration
private val disconnectTimeoutMillis: Long,
private val onConnectAction: ConnectAction? = null
) : GattIo {

data class Configuration(
val disconnectTimeoutMillis: Long,
val exceptionHandler: CoroutineExceptionHandler? = null,
val onConnectAction: ConnectAction? = null,
internal val stateCapacity: Int = CONFLATED
)

private val applicationContext = androidContext.applicationContext

private val job = SupervisorJob(parentCoroutineContext[Job])
private val scope = CoroutineScope(
parentCoroutineContext + job + (configuration.exceptionHandler ?: EmptyCoroutineContext)
)
private val scope = CoroutineScope(parentCoroutineContext + job)

private val isRunning = AtomicBoolean()

private val disconnectTimeoutMillis = configuration.disconnectTimeoutMillis
private val onConnectAction = configuration.onConnectAction

@Volatile
private var _gatt: GattIo? = null
private val gatt: GattIo
inline get() = _gatt ?: throw NotReady(toString())

private val _state = BroadcastChannel<State>(configuration.stateCapacity)
private val _state = MutableStateFlow(Disconnected)

@FlowPreview
val state: Flow<State> = _state.asFlow()
val state: Flow<State> = _state

private val _onCharacteristicChanged = BroadcastChannel<OnCharacteristicChanged>(BUFFERED)

// todo: Replace with `trySend` when Kotlin/kotlinx.coroutines#974 is fixed.
// https://github.com/Kotlin/kotlinx.coroutines/issues/974
private fun setState(state: State) {
_state.offerCatching(state)
}

fun connect(): Boolean {
check(!job.isCancelled) { "Cannot connect, $this is closed" }
isRunning.compareAndSet(false, true) || return false
Expand All @@ -131,7 +114,7 @@ class KeepAliveGatt(
try {
spawnConnection()
} finally {
setState(Disconnected)
_state.value = Disconnected
}
}
}
Expand All @@ -146,7 +129,6 @@ class KeepAliveGatt(
private fun cancel(cause: CancellationException?) {
job.cancel(cause)
_onCharacteristicChanged.cancel()
_state.cancel()
}

fun cancel() {
Expand All @@ -156,7 +138,6 @@ class KeepAliveGatt(
suspend fun cancelAndJoin() {
job.cancelAndJoin()
_onCharacteristicChanged.cancel()
_state.cancel()
}

// todo: Fix `@see` documentation link when https://github.com/Kotlin/dokka/issues/80 is fixed.
Expand All @@ -166,7 +147,7 @@ class KeepAliveGatt(
): DisposableHandle = job.invokeOnCompletion(handler)

private suspend fun spawnConnection() {
setState(Connecting)
_state.value = Connecting

val gatt = when (val result = bluetoothDevice.connectGatt(applicationContext)) {
is Success -> result.gatt
Expand All @@ -184,14 +165,14 @@ class KeepAliveGatt(
coroutineScope {
gatt.onCharacteristicChanged
.onEach(_onCharacteristicChanged::send)
.launchIn(this)
.launchIn(this, start = UNDISPATCHED)
onConnectAction?.invoke(gatt)
_gatt = gatt
setState(Connected)
_state.value = Connected
}
} finally {
_gatt = null
setState(State.Disconnecting)
_state.value = State.Disconnecting
withContext(NonCancellable) {
withTimeoutOrNull(disconnectTimeoutMillis) {
gatt.disconnect()
Expand Down Expand Up @@ -236,10 +217,12 @@ class KeepAliveGatt(
override suspend fun readRemoteRssi(): OnReadRemoteRssi = gatt.readRemoteRssi()

override fun toString() =
"KeepAliveGatt(device=$bluetoothDevice, gatt=$_gatt, state=${_state.consume { poll() }})"
"KeepAliveGatt(device=$bluetoothDevice, gatt=$_gatt, state=${_state.value})"
}

// https://github.com/Kotlin/kotlinx.coroutines/issues/974
private fun <E> SendChannel<E>.offerCatching(element: E): Boolean {
return runCatching { offer(element) }.getOrDefault(false)
private fun <T> Flow<T>.launchIn(
scope: CoroutineScope,
start: CoroutineStart = CoroutineStart.DEFAULT
): Job = scope.launch(start = start) {
collect()
}
Loading

0 comments on commit ee29e2d

Please sign in to comment.