Skip to content
This repository has been archived by the owner on Nov 12, 2024. It is now read-only.

Lots of improvements around Coroutines usage #648

Merged
merged 2 commits into from
Jun 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions base-android/src/main/java/app/tivi/util/BroadcastReceiverFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,20 @@ import android.content.BroadcastReceiver
import android.content.Context
import android.content.Intent
import android.content.IntentFilter
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart

fun Context.flowBroadcasts(intentFilter: IntentFilter): Flow<Intent> {
val resultChannel = ConflatedBroadcastChannel<Intent>()
val resultChannel = MutableStateFlow(Intent())

val receiver = object : BroadcastReceiver() {
override fun onReceive(context: Context, intent: Intent) {
resultChannel.offer(intent)
resultChannel.value = intent
}
}

resultChannel.invokeOnClose {
unregisterReceiver(receiver)
}

registerReceiver(receiver, intentFilter)
return resultChannel.asFlow()
return resultChannel.onStart { registerReceiver(receiver, intentFilter) }
.onCompletion { unregisterReceiver(receiver) }
}
5 changes: 0 additions & 5 deletions base/src/main/java/app/tivi/inject/Annotations.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ annotation class Trakt
@MustBeDocumented
annotation class Tmdb

@Retention(AnnotationRetention.RUNTIME)
@Qualifier
@MustBeDocumented
annotation class ForStore

@Retention(AnnotationRetention.RUNTIME)
@Qualifier
@MustBeDocumented
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/src/main/java/app/tivi/buildsrc/dependencies.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object Libs {
}

object Coroutines {
private const val version = "1.3.5"
private const val version = "1.3.7"
const val core = "org.jetbrains.kotlinx:kotlinx-coroutines-core:$version"
const val android = "org.jetbrains.kotlinx:kotlinx-coroutines-android:$version"
const val test = "org.jetbrains.kotlinx:kotlinx-coroutines-test:$version"
Expand Down
23 changes: 11 additions & 12 deletions common-entrygrid/src/main/java/app/tivi/util/EntryViewModel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ import app.tivi.data.entities.TiviShow
import app.tivi.data.resultentities.EntryWithShow
import app.tivi.domain.PagingInteractor
import app.tivi.domain.interactors.ChangeShowFollowStatus
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
Expand All @@ -51,8 +50,8 @@ abstract class EntryViewModel<LI : EntryWithShow<out Entry>, PI : PagingInteract
protected abstract val logger: Logger
protected abstract val changeShowFollowStatus: ChangeShowFollowStatus

private val messages = ConflatedBroadcastChannel<UiStatus>(UiIdle)
private val loaded = ConflatedBroadcastChannel(false)
private val messages = MutableStateFlow<UiStatus>(UiIdle)
private val loaded = MutableStateFlow(false)

val pagedList: Flow<PagedList<LI>>
get() = pagingInteractor.observe()
Expand All @@ -70,23 +69,23 @@ abstract class EntryViewModel<LI : EntryWithShow<out Entry>, PI : PagingInteract
override fun onItemAtEndLoaded(itemAtEnd: LI) = onListScrolledToEnd()

override fun onItemAtFrontLoaded(itemAtFront: LI) {
loaded.offer(true)
loaded.value = true
}

override fun onZeroItemsLoaded() {
loaded.offer(true)
loaded.value = true
}
}

protected fun launchObserves() {
viewModelScope.launch {
messages.asFlow().execute {
messages.execute {
copy(status = it() ?: UiSuccess)
}
}

viewModelScope.launch {
loaded.asFlow().execute {
loaded.execute {
copy(isLoaded = it() ?: false)
}
}
Expand All @@ -107,7 +106,7 @@ abstract class EntryViewModel<LI : EntryWithShow<out Entry>, PI : PagingInteract
fun onListScrolledToEnd() {
viewModelScope.launch {
callLoadMore()
.catch { messages.send(UiError(it)) }
.catch { messages.value = UiError(it) }
.map {
when (it) {
InvokeSuccess -> UiSuccess
Expand All @@ -116,7 +115,7 @@ abstract class EntryViewModel<LI : EntryWithShow<out Entry>, PI : PagingInteract
else -> UiIdle
}
}
.collect { messages.send(it) }
.collect { messages.value = it }
}
}

Expand Down Expand Up @@ -150,15 +149,15 @@ abstract class EntryViewModel<LI : EntryWithShow<out Entry>, PI : PagingInteract
protected fun refresh(fromUser: Boolean) {
viewModelScope.launch {
callRefresh(fromUser)
.catch { messages.send(UiError(it)) }
.catch { messages.value = UiError(it) }
.map {
when (it) {
InvokeSuccess -> UiSuccess
InvokeStarted -> UiLoading(true)
else -> UiIdle
}
}
.collect { messages.send(it) }
.collect { messages.value = it }
}
}

Expand Down
2 changes: 2 additions & 0 deletions common-ui-view/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies {
api Libs.AndroidX.Lifecycle.livedata
implementation Libs.AndroidX.Lifecycle.viewmodel

implementation Libs.Coroutines.core

api Libs.AndroidX.appcompat
implementation Libs.AndroidX.recyclerview
implementation Libs.AndroidX.constraintlayout
Expand Down
65 changes: 20 additions & 45 deletions common-ui-view/src/main/java/app/tivi/ReduxViewModel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ import androidx.lifecycle.ViewModel
import androidx.lifecycle.asLiveData
import androidx.lifecycle.viewModelScope
import app.tivi.common.ui.BuildConfig
import app.tivi.extensions.observable
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
Expand All @@ -38,40 +36,25 @@ import kotlin.reflect.KProperty1
abstract class ReduxViewModel<S>(
initialState: S
) : ViewModel() {
private val stateChannel = ConflatedBroadcastChannel(initialState)

private val state = MutableStateFlow(initialState)
private val stateMutex = Mutex()
private var state: S by observable(initialState) {
stateChannel.offer(state)
}

/**
* Returns a snapshot of the current state.
*/
fun currentState(): S = state
fun currentState(): S = state.value

val liveData: LiveData<S>
get() = stateChannel.asFlow().asLiveData()

protected suspend fun <T> Flow<T>.execute(
reducer: S.(Async<T>) -> S
) = execute({ it }, reducer)
get() = state.asLiveData()

protected suspend fun <T, V> Flow<T>.execute(
mapper: (T) -> V,
reducer: S.(Async<V>) -> S
) {
protected suspend fun <T> Flow<T>.execute(reducer: S.(Async<T>) -> S) {
setState { reducer(Loading()) }

@Suppress("USELESS_CAST")
return map { Success(mapper(it)) as Async<V> }
return map { Success(it) as Async<T> }
.catch { e ->
if (BuildConfig.DEBUG) {
Log.e(
this@ReduxViewModel::class.java.simpleName,
"Exception during observe",
e
)
Log.e(this@ReduxViewModel::class.java.simpleName, "Exception during execute", e)
}
emit(Fail(e))
}
Expand All @@ -82,13 +65,9 @@ abstract class ReduxViewModel<S>(
return selectSubscribe(prop1).asLiveData()
}

protected fun subscribe(): Flow<S> {
return stateChannel.asFlow().distinctUntilChanged()
}

protected fun subscribe(block: (S) -> Unit) {
viewModelScope.launch {
subscribe().collect { block(it) }
state.collect { block(it) }
}
}

Expand All @@ -99,30 +78,26 @@ abstract class ReduxViewModel<S>(
}

private fun <A> selectSubscribe(prop1: KProperty1<S, A>): Flow<A> {
return stateChannel.asFlow()
.map { prop1.get(it) }
.distinctUntilChanged()
return state.map { prop1.get(it) }
}

protected suspend fun setStateMutexed(reducer: S.() -> S) {
protected suspend fun setState(reducer: S.() -> S) {
stateMutex.withLock {
state = reducer(state)
state.value = reducer(state.value)
}
}

protected fun setState(reducer: S.() -> S) {
viewModelScope.launch { setStateMutexed(reducer) }
protected fun CoroutineScope.setState(reducer: S.() -> S) {
launch { this@ReduxViewModel.setState(reducer) }
}

protected suspend fun withStateMutexed(block: (S) -> Unit) {
stateMutex.withLock { block(state) }
}

protected fun withState(block: (S) -> Unit) {
viewModelScope.launch { withStateMutexed(block) }
protected suspend fun withState(block: (S) -> Unit) {
stateMutex.withLock {
block(state.value)
}
}

override fun onCleared() {
stateChannel.close()
protected fun CoroutineScope.withState(block: (S) -> Unit) {
launch { this@ReduxViewModel.withState(block) }
}
}
51 changes: 31 additions & 20 deletions common-ui-view/src/main/java/app/tivi/ui/SnackbarManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,54 @@ package app.tivi.ui

import app.tivi.api.UiError
import app.tivi.extensions.delayFlow
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import org.threeten.bp.Duration
import javax.inject.Inject

class SnackbarManager {
class SnackbarManager @Inject constructor() {
// We want a maximum of 3 errors queued
private val pendingErrors = Channel<UiError>(3)
private val removeErrorSignal = Channel<Unit>(1)

suspend fun launch(onErrorVisibilityChanged: (UiError, Boolean) -> Unit) {
for (error in pendingErrors) {
// Set the error
onErrorVisibilityChanged(error, true)
fun launchInScope(
scope: CoroutineScope,
onErrorVisibilityChanged: (UiError, Boolean) -> Unit
) {
scope.launch {
pendingErrors.consumeAsFlow().collect { error ->
// Set the error
onErrorVisibilityChanged(error, true)

merge(
delayFlow(Duration.ofSeconds(6).toMillis(), Unit),
removeErrorSignal.receiveAsFlow()
).firstOrNull()
merge(
delayFlow(Duration.ofSeconds(6).toMillis(), Unit),
removeErrorSignal.receiveAsFlow()
).firstOrNull()

// Now remove the error
onErrorVisibilityChanged(error, false)
// Delay to allow the current error to disappear
delay(200)
// Now remove the error
onErrorVisibilityChanged(error, false)
// Delay to allow the current error to disappear
delay(200)
}
}
}

fun sendError(error: UiError) = pendingErrors.offer(error)

fun removeCurrentError() {
removeErrorSignal.offer(Unit)
fun sendError(error: UiError) {
if (!pendingErrors.isClosedForSend) {
pendingErrors.offer(error)
}
}

fun close() {
removeErrorSignal.close()
pendingErrors.close()
fun removeCurrentError() {
if (!removeErrorSignal.isClosedForSend) {
removeErrorSignal.offer(Unit)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,26 @@ import app.tivi.base.InvokeError
import app.tivi.base.InvokeStarted
import app.tivi.base.InvokeStatus
import app.tivi.base.InvokeSuccess
import kotlinx.coroutines.channels.ConflatedBroadcastChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import java.util.concurrent.atomic.AtomicInteger

class ObservableLoadingCounter {
private val count = AtomicInteger()
private val loadingState = ConflatedBroadcastChannel(count.get())
private val loadingState = MutableStateFlow(count.get())

val observable: Flow<Boolean>
get() = loadingState.asFlow()
.map { it > 0 }
.distinctUntilChanged()
get() = loadingState.map { it > 0 }.distinctUntilChanged()

fun addLoader() {
loadingState.offer(count.incrementAndGet())
loadingState.value = count.incrementAndGet()
}

fun removeLoader() {
loadingState.offer(count.decrementAndGet())
loadingState.value = count.decrementAndGet()
}
}

Expand Down
Loading