Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing some Behavior/Publish Relay usage in core artifacts with coroutines #544

Merged
merged 9 commits into from
Mar 30, 2023
3 changes: 3 additions & 0 deletions android/libraries/rib-android/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ dependencies {
implementation deps.androidx.annotations
implementation deps.androidx.appcompat
implementation deps.external.guavaAndroid
implementation deps.uber.autodisposeCoroutines
implementation deps.kotlin.coroutinesAndroid
implementation deps.kotlin.coroutinesRx2
testImplementation deps.external.roboelectricBase
testImplementation deps.androidx.appcompat
testImplementation deps.test.mockitoKotlin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import android.content.res.Configuration
import android.os.Build
import android.view.ViewGroup
import androidx.annotation.CallSuper
import com.jakewharton.rxrelay2.BehaviorRelay
import com.jakewharton.rxrelay2.PublishRelay
import com.jakewharton.rxrelay2.Relay
import com.uber.autodispose.lifecycle.CorrespondingEventsFunction
import com.uber.autodispose.lifecycle.LifecycleEndedException
import com.uber.autodispose.lifecycle.LifecycleScopeProvider
Expand All @@ -41,30 +38,32 @@ import com.uber.rib.core.lifecycle.ActivityLifecycleEvent.Companion.create
import com.uber.rib.core.lifecycle.ActivityLifecycleEvent.Companion.createOnCreateEvent
import io.reactivex.CompletableSource
import io.reactivex.Observable
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.rx2.asObservable

/** Base implementation for all VIP [android.app.Activity]s. */
abstract class RibActivity : CoreAppCompatActivity(), ActivityStarter, LifecycleScopeProvider<ActivityLifecycleEvent>, RxActivityEvents {
private var router: ViewRouter<*, *>? = null
private val lifecycleBehaviorRelay = BehaviorRelay.create<ActivityLifecycleEvent>()
private val lifecycleRelay: Relay<ActivityLifecycleEvent> = lifecycleBehaviorRelay.toSerialized()
private val callbacksRelay = PublishRelay.create<ActivityCallbackEvent>().toSerialized()
private val lifecycleFlow = MutableSharedFlow<ActivityLifecycleEvent>(1, 0, BufferOverflow.DROP_OLDEST)
jbarr21 marked this conversation as resolved.
Show resolved Hide resolved
private val callbacksFlow = MutableSharedFlow<ActivityCallbackEvent>(0, 1, BufferOverflow.DROP_OLDEST)

/** @return an observable of this activity's lifecycle events. */
override fun lifecycle(): Observable<ActivityLifecycleEvent> {
return lifecycleRelay.hide()
return lifecycleFlow.asObservable()
tyvsmith marked this conversation as resolved.
Show resolved Hide resolved
}

/** @return an observable of this activity's lifecycle events. */
override fun callbacks(): Observable<ActivityCallbackEvent> {
return callbacksRelay.hide()
return callbacksFlow.asObservable()
}

override fun correspondingEvents(): CorrespondingEventsFunction<ActivityLifecycleEvent> {
return ACTIVITY_LIFECYCLE
}

override fun peekLifecycle(): ActivityLifecycleEvent {
return lifecycleBehaviorRelay.value!!
return lifecycleFlow.replayCache.last()
}

override fun requestScope(): CompletableSource {
Expand All @@ -76,7 +75,7 @@ abstract class RibActivity : CoreAppCompatActivity(), ActivityStarter, Lifecycle
override fun onCreate(savedInstanceState: android.os.Bundle?) {
super.onCreate(savedInstanceState)
val rootViewGroup = findViewById<ViewGroup>(R.id.content)
lifecycleRelay.accept(createOnCreateEvent(savedInstanceState))
lifecycleFlow.tryEmit(createOnCreateEvent(savedInstanceState))
val wrappedBundle: Bundle? = if (savedInstanceState != null) Bundle(savedInstanceState) else null
router = createRouter(rootViewGroup)
router?.let {
Expand All @@ -89,49 +88,49 @@ abstract class RibActivity : CoreAppCompatActivity(), ActivityStarter, Lifecycle
@CallSuper
override fun onSaveInstanceState(outState: android.os.Bundle) {
super.onSaveInstanceState(outState)
callbacksRelay.accept(createOnSaveInstanceStateEvent(outState))
callbacksFlow.tryEmit(createOnSaveInstanceStateEvent(outState))
router?.saveInstanceStateInternal(Bundle(outState)) ?: throw NullPointerException("Router should not be null")
}

@CallSuper
override fun onStart() {
super.onStart()
lifecycleRelay.accept(create(ActivityLifecycleEvent.Type.START))
lifecycleFlow.tryEmit(create(ActivityLifecycleEvent.Type.START))
}

@CallSuper
override fun onResume() {
super.onResume()
lifecycleRelay.accept(create(ActivityLifecycleEvent.Type.RESUME))
lifecycleFlow.tryEmit(create(ActivityLifecycleEvent.Type.RESUME))
}

@CallSuper
override fun onNewIntent(intent: Intent) {
super.onNewIntent(intent)
callbacksRelay.accept(createNewIntent(intent))
callbacksFlow.tryEmit(createNewIntent(intent))
}

@CallSuper
override fun onActivityResult(requestCode: Int, resultCode: Int, data: Intent?) {
super.onActivityResult(requestCode, resultCode, data)
callbacksRelay.accept(createOnActivityResultEvent(requestCode, resultCode, data))
callbacksFlow.tryEmit(createOnActivityResultEvent(requestCode, resultCode, data))
}

@CallSuper
override fun onPause() {
lifecycleRelay.accept(create(ActivityLifecycleEvent.Type.PAUSE))
lifecycleFlow.tryEmit(create(ActivityLifecycleEvent.Type.PAUSE))
super.onPause()
}

@CallSuper
override fun onStop() {
lifecycleRelay.accept(create(ActivityLifecycleEvent.Type.STOP))
lifecycleFlow.tryEmit(create(ActivityLifecycleEvent.Type.STOP))
super.onStop()
}

@CallSuper
override fun onDestroy() {
lifecycleRelay.accept(create(ActivityLifecycleEvent.Type.DESTROY))
lifecycleFlow.tryEmit(create(ActivityLifecycleEvent.Type.DESTROY))
router?.let {
it.dispatchDetach()
RibEvents.getInstance().emitEvent(RibEventType.DETACHED, it, null)
Expand All @@ -143,20 +142,20 @@ abstract class RibActivity : CoreAppCompatActivity(), ActivityStarter, Lifecycle
@CallSuper
override fun onLowMemory() {
super.onLowMemory()
callbacksRelay.accept(create(ActivityCallbackEvent.Type.LOW_MEMORY))
callbacksFlow.tryEmit(create(ActivityCallbackEvent.Type.LOW_MEMORY))
}

@CallSuper
override fun onTrimMemory(level: Int) {
super.onTrimMemory(level)
callbacksRelay.accept(createTrimMemoryEvent(level))
callbacksFlow.tryEmit(createTrimMemoryEvent(level))
}

override fun onPictureInPictureModeChanged(
isInPictureInPictureMode: Boolean,
newConfig: Configuration
) {
callbacksRelay.accept(
callbacksFlow.tryEmit(
createPictureInPictureMode(isInPictureInPictureMode)
)
}
Expand All @@ -177,13 +176,13 @@ abstract class RibActivity : CoreAppCompatActivity(), ActivityStarter, Lifecycle
}

override fun onUserLeaveHint() {
lifecycleRelay.accept(create(ActivityLifecycleEvent.Type.USER_LEAVING))
lifecycleFlow.tryEmit(create(ActivityLifecycleEvent.Type.USER_LEAVING))
super.onUserLeaveHint()
}

override fun onWindowFocusChanged(hasFocus: Boolean) {
super.onWindowFocusChanged(hasFocus)
callbacksRelay.accept(createWindowFocusEvent(hasFocus))
callbacksFlow.tryEmit(createWindowFocusEvent(hasFocus))
}

/**
Expand Down
4 changes: 4 additions & 0 deletions android/libraries/rib-base/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ dependencies {
api deps.uber.autodisposeLifecycle
implementation deps.apt.javaxInject

implementation deps.uber.autodisposeCoroutines
implementation deps.kotlin.coroutinesAndroid
implementation deps.kotlin.coroutinesRx2

compileOnly deps.apt.daggerCompiler
compileOnly deps.androidx.annotations
compileOnly deps.apt.androidApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ package com.uber.rib.core

import androidx.annotation.CallSuper
import androidx.annotation.VisibleForTesting
import com.jakewharton.rxrelay2.BehaviorRelay
import com.uber.autodispose.lifecycle.CorrespondingEventsFunction
import com.uber.autodispose.lifecycle.LifecycleEndedException
import com.uber.autodispose.lifecycle.LifecycleScopeProvider
import com.uber.autodispose.lifecycle.LifecycleScopes
import com.uber.rib.core.lifecycle.InteractorEvent
import io.reactivex.CompletableSource
import io.reactivex.Observable
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.rx2.asObservable
import javax.inject.Inject
import kotlin.properties.ReadWriteProperty
import kotlin.reflect.KProperty
Expand All @@ -39,8 +41,7 @@ abstract class Interactor<P : Any, R : Router<*>> : LifecycleScopeProvider<Inter
@Inject
lateinit var injectedPresenter: P
internal var actualPresenter: P? = null
private val behaviorRelay = BehaviorRelay.create<InteractorEvent>()
private val lifecycleRelay = behaviorRelay.toSerialized()
private val lifecycleFlow = MutableSharedFlow<InteractorEvent>(1, 0, BufferOverflow.DROP_OLDEST)

private val routerDelegate = InitOnceProperty<R>()
/** @return the router for this interactor. */
Expand All @@ -55,11 +56,11 @@ abstract class Interactor<P : Any, R : Router<*>> : LifecycleScopeProvider<Inter

/** @return an observable of this controller's lifecycle events. */
override fun lifecycle(): Observable<InteractorEvent> {
return lifecycleRelay.hide()
return lifecycleFlow.asObservable()
}

/** @return true if the controller is attached, false if not. */
override fun isAttached() = behaviorRelay.value === InteractorEvent.ACTIVE
override fun isAttached() = lifecycleFlow.replayCache.last() == InteractorEvent.ACTIVE

/**
* Called when attached. The presenter will automatically be added when this happens.
Expand Down Expand Up @@ -97,15 +98,15 @@ abstract class Interactor<P : Any, R : Router<*>> : LifecycleScopeProvider<Inter
protected open fun onSaveInstanceState(outState: Bundle) {}

public open fun dispatchAttach(savedInstanceState: Bundle?) {
lifecycleRelay.accept(InteractorEvent.ACTIVE)
lifecycleFlow.tryEmit(InteractorEvent.ACTIVE)
(getPresenter() as? Presenter)?.dispatchLoad()
didBecomeActive(savedInstanceState)
}

public open fun dispatchDetach(): P {
(getPresenter() as? Presenter)?.dispatchUnload()
willResignActive()
lifecycleRelay.accept(InteractorEvent.INACTIVE)
lifecycleFlow.tryEmit(InteractorEvent.INACTIVE)
return getPresenter()
}

Expand Down Expand Up @@ -140,7 +141,7 @@ abstract class Interactor<P : Any, R : Router<*>> : LifecycleScopeProvider<Inter
}

override fun peekLifecycle(): InteractorEvent? {
return behaviorRelay.value
return lifecycleFlow.replayCache.last()
tyvsmith marked this conversation as resolved.
Show resolved Hide resolved
}

final override fun requestScope(): CompletableSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package com.uber.rib.core

import androidx.annotation.CallSuper
import com.jakewharton.rxrelay2.BehaviorRelay
import com.uber.autodispose.ScopeProvider
import com.uber.rib.core.lifecycle.PresenterEvent
import io.reactivex.CompletableSource
import io.reactivex.Observable
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.rx2.asObservable
import org.checkerframework.checker.guieffect.qual.UIEffect

/**
Expand All @@ -31,23 +33,22 @@ import org.checkerframework.checker.guieffect.qual.UIEffect
* it becomes unclear where you should write your bussiness logic.
*/
abstract class Presenter : ScopeProvider {
private val behaviorRelay = BehaviorRelay.create<PresenterEvent>()
private val lifecycleRelay = behaviorRelay.toSerialized()
private val lifecycleFlow = MutableSharedFlow<PresenterEvent>()
tyvsmith marked this conversation as resolved.
Show resolved Hide resolved
tyvsmith marked this conversation as resolved.
Show resolved Hide resolved

/** @return `true` if the presenter is loaded, `false` if not. */
protected var isLoaded = false
private set

public open fun dispatchLoad() {
isLoaded = true
lifecycleRelay.accept(PresenterEvent.LOADED)
lifecycleFlow.tryEmit(PresenterEvent.LOADED)
didLoad()
}

public open fun dispatchUnload() {
isLoaded = false
willUnload()
lifecycleRelay.accept(PresenterEvent.UNLOADED)
lifecycleFlow.tryEmit(PresenterEvent.UNLOADED)
}

/** Tells the presenter that it has finished loading. */
Expand All @@ -66,10 +67,10 @@ abstract class Presenter : ScopeProvider {

/** @return an observable of this controller's lifecycle events. */
open fun lifecycle(): Observable<PresenterEvent> {
return lifecycleRelay.hide()
return lifecycleFlow.asObservable()
}

override fun requestScope(): CompletableSource {
return lifecycleRelay.skip(1).firstElement().ignoreElement()
return lifecycleFlow.drop(1).asObservable().firstElement().ignoreElement()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
*/
package com.uber.rib.core

import com.jakewharton.rxrelay2.PublishRelay
import io.reactivex.Observable
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.rx2.asObservable

class RibEvents private constructor() {

private val eventRelay: PublishRelay<RibEvent> = PublishRelay.create()
private val eventFlow = MutableSharedFlow<RibEvent>(0, 1, BufferOverflow.DROP_OLDEST)

open val events: Observable<RibEvent> = eventRelay.hide()
open val events: Observable<RibEvent> = eventFlow.asObservable()

/**
* @param eventType [RibEventType]
Expand All @@ -31,7 +33,7 @@ class RibEvents private constructor() {
* RibActivity/Fragment
*/
open fun emitEvent(eventType: RibEventType, child: Router<*>, parent: Router<*>?) {
eventRelay.accept(RibEvent(eventType, child, parent))
eventFlow.tryEmit(RibEvent(eventType, child, parent))
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
package com.uber.rib.core

import androidx.annotation.VisibleForTesting
import com.jakewharton.rxrelay2.PublishRelay
import com.uber.autodispose.lifecycle.LifecycleScopeProvider
import com.uber.rib.core.lifecycle.InteractorEvent
import com.uber.rib.core.lifecycle.PresenterEvent
import com.uber.rib.core.lifecycle.WorkerEvent
import io.reactivex.Observable
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.transformWhile
import kotlinx.coroutines.rx2.asFlow
import kotlinx.coroutines.rx2.asObservable

/** Helper class to bind to an interactor's lifecycle to translate it to a [Worker] lifecycle. */
object WorkerBinder {
Expand Down Expand Up @@ -85,12 +90,14 @@ object WorkerBinder {
@JvmStatic
@VisibleForTesting
open fun bind(mappedLifecycle: Observable<WorkerEvent>, worker: Worker): WorkerUnbinder {
val unbindSubject = PublishRelay.create<WorkerEvent>()
val workerLifecycle = mappedLifecycle
.mergeWith(unbindSubject)
.takeUntil { workerEvent: WorkerEvent -> workerEvent === WorkerEvent.STOP }
val unbindFlow = MutableSharedFlow<WorkerEvent>(0, 1, BufferOverflow.DROP_OLDEST)

val workerLifecycle = merge(mappedLifecycle.asFlow(), unbindFlow)
.transformWhile { emit(it) ; it != WorkerEvent.STOP }
.asObservable()

bindToWorkerLifecycle(workerLifecycle, worker)
Copy link
Member Author

@tyvsmith tyvsmith Mar 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prototyped a copy of bindToWorkerLifecycle that has to launch to match events so it doesn't need to convert to observable and can be native coroutine, and we end up in race condtion since a subscribe on main thread is guaranteed to register before the testing code calls it. Need to look into if we should block for registering the lambda but not on the terminal event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came up with the following implementation that passes tests from WorkerBinderTest. However, some things are still unclear to me:

1. The scope of the coroutine / Rx subscription
it's still unclear to me what should be the scope of the coroutine -- I wonder the same about the current implementation by the way -- it feels leaky because the Disposable returned by the subscription in bindToWorkerLifecycle is never used. In the example I just used MainScope(), could also work with GlobalScope (assuming bind is called from the main thread).

2. Multithreading/concurrency
I feel like all of this binding/unbinding should be done on the main thread. But what if the WorkerUnbinder.unbind() is called from a worker thread? In the example below, worker.onStop() would run a non-main thread, and I think it's erroneous. If WorkerUnbinder.unbind() is called from a non-main-thread, it would require asynchronicity to run worker.onStop() on main thread (we'll need to dispatch the operation). The same doubt remains on current Rx implementation. I feel like binding and unbinding from a non-main thread should just be forbidden (throw exception) for the most predictable behavior: always synchronous (by the time workerUnbinder.unbind() finishes, worker.onStop() has also finished).

  open fun bind(mappedLifecycle: Observable<WorkerEvent>, worker: Worker): WorkerUnbinder {
    val job = MainScope().launch(Dispatchers.Unconfined) {
      try {
        mappedLifecycle.asFlow()
          .takeWhile { it == WorkerEvent.START }
          .collect { worker.onStart(WorkerScopeProvider(mappedLifecycle.hide())) }
      } finally {
        // we should ensure here that WorkerUnbinder.unbind() is called from a main thread,
        // or else worker.onStop() will be called on whatever thread called workerUnbinder.unbind().
        worker.onStop()
      }
    }
    return WorkerUnbinder { 
      job.cancel()
      // because we use Dispatchers.Unconfined, by the time this line is reached the finally block
      // above has already run.
    }
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some experimentation:

    withContext(newSingleThreadContext("MainThread")) {
        val job = launch(Dispatchers.Unconfined) {
            try {
                println("Running some work on ${Thread.currentThread().name}")
                awaitCancellation()
            } finally {
                println("Finally block on ${Thread.currentThread().name}")
                Thread.sleep(1000) // some long-running operation
            }
        }
        println("cancelling from thread ${Thread.currentThread().name}")
        job.cancel()
        println("cancelled")
    }

Output:

Running some work on MainThread
cancelling from thread MainThread
Finally block on MainThread
cancelled

Note the synchronous behavior here: job starts immediately and runs until collector installed. Also, finally blocks runs immediately on cancelation, and it only returns after the finally block finishes. I think we want the same behavior with WorkerUnbinder.unbind() and Worker.onStop()`.

Copy link
Contributor

@psteiger psteiger Mar 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More experimentation: Notice that with current Rx implementation, the issue of calling unbinder.unbind() on a worker thread is the same: we'll end up calling worker.onStop on the caller worker thread:

    val unbinder = PublishSubject.create<Int>()
    Observable.merge(Observable.just(1), unbinder).subscribe {
        // call worker.onStart or onStop
        println("$it ${Thread.currentThread().name}")
    }
    Dispatchers.Default {
        delay(100)
        // simulate unbinder.unbind on worker thread
        unbinder.onNext(2)
    }

Output:

1 main
2 DefaultDispatcher-worker-1

If we use observeOn before subscribe, this fixes it, but introduces unwanted asynchronicity, hence my proposal to forbid unbinding from a worker thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding 1, that's what I observed as well, and was thinking we'd need to use create a coroutinecontext on the spot. We can take one in as optional in the params for testing, but the current usage does "feel" leaky since it subscribes directly, but it looks like it ends up getting coupled to the parents lifecycle through the internal logic inside it.

Regarding 2, @FranAguilera has done some research on workers binding on main thread vs background and has a proposal to make an explicit API change. WIth his proposed changes, we'd need both options.

return WorkerUnbinder { unbindSubject.accept(WorkerEvent.STOP) }
return WorkerUnbinder { unbindFlow.tryEmit(WorkerEvent.STOP) }
}

@JvmStatic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@ import com.uber.autodispose.ScopeProvider
import com.uber.rib.core.lifecycle.WorkerEvent
import io.reactivex.CompletableSource
import io.reactivex.Observable
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.rx2.asFlow
import kotlinx.coroutines.rx2.asObservable

/** [ScopeProvider] for [Worker] instances. */
open class WorkerScopeProvider internal constructor(
private val workerLifecycleObservable: Observable<WorkerEvent>
private val workerLifecycle: Flow<WorkerEvent>
) : ScopeProvider {
internal constructor(workerLifecycleObservable: Observable<WorkerEvent>) : this(workerLifecycleObservable.asFlow())

override fun requestScope(): CompletableSource {
return workerLifecycleObservable.skip(1).firstElement().ignoreElement()
return workerLifecycle.drop(1).asObservable().firstElement().ignoreElement()
tyvsmith marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading