Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing some Behavior/Publish Relay usage in core artifacts with coroutines #544

Merged
merged 9 commits into from
Mar 30, 2023

Conversation

tyvsmith
Copy link
Member

@tyvsmith tyvsmith commented Mar 25, 2023

Beginning internal addressing of Rx usages in core artifacts.

  • Currently replacing heavy use of Behavior/Publish Relays on internal code to track lifecycle state.

  • Public APIs are still wrapped to observables.

  • Verify against internal test suite

  • Verify against app core flow functional expectations

#542

@tyvsmith tyvsmith changed the title Ty/explore flow internals Replacing some Behavior/Publish Relay usage in core artifacts with coroutiens Mar 25, 2023
@tyvsmith tyvsmith requested review from jbarr21 and psteiger March 25, 2023 00:09
val workerLifecycle = merge(mappedLifecycle.asFlow(), unbindFlow)
.transformWhile { emit(it) ; it != WorkerEvent.STOP }
.asObservable()

bindToWorkerLifecycle(workerLifecycle, worker)
Copy link
Member Author

@tyvsmith tyvsmith Mar 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prototyped a copy of bindToWorkerLifecycle that has to launch to match events so it doesn't need to convert to observable and can be native coroutine, and we end up in race condtion since a subscribe on main thread is guaranteed to register before the testing code calls it. Need to look into if we should block for registering the lambda but not on the terminal event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came up with the following implementation that passes tests from WorkerBinderTest. However, some things are still unclear to me:

1. The scope of the coroutine / Rx subscription
it's still unclear to me what should be the scope of the coroutine -- I wonder the same about the current implementation by the way -- it feels leaky because the Disposable returned by the subscription in bindToWorkerLifecycle is never used. In the example I just used MainScope(), could also work with GlobalScope (assuming bind is called from the main thread).

2. Multithreading/concurrency
I feel like all of this binding/unbinding should be done on the main thread. But what if the WorkerUnbinder.unbind() is called from a worker thread? In the example below, worker.onStop() would run a non-main thread, and I think it's erroneous. If WorkerUnbinder.unbind() is called from a non-main-thread, it would require asynchronicity to run worker.onStop() on main thread (we'll need to dispatch the operation). The same doubt remains on current Rx implementation. I feel like binding and unbinding from a non-main thread should just be forbidden (throw exception) for the most predictable behavior: always synchronous (by the time workerUnbinder.unbind() finishes, worker.onStop() has also finished).

  open fun bind(mappedLifecycle: Observable<WorkerEvent>, worker: Worker): WorkerUnbinder {
    val job = MainScope().launch(Dispatchers.Unconfined) {
      try {
        mappedLifecycle.asFlow()
          .takeWhile { it == WorkerEvent.START }
          .collect { worker.onStart(WorkerScopeProvider(mappedLifecycle.hide())) }
      } finally {
        // we should ensure here that WorkerUnbinder.unbind() is called from a main thread,
        // or else worker.onStop() will be called on whatever thread called workerUnbinder.unbind().
        worker.onStop()
      }
    }
    return WorkerUnbinder { 
      job.cancel()
      // because we use Dispatchers.Unconfined, by the time this line is reached the finally block
      // above has already run.
    }
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some experimentation:

    withContext(newSingleThreadContext("MainThread")) {
        val job = launch(Dispatchers.Unconfined) {
            try {
                println("Running some work on ${Thread.currentThread().name}")
                awaitCancellation()
            } finally {
                println("Finally block on ${Thread.currentThread().name}")
                Thread.sleep(1000) // some long-running operation
            }
        }
        println("cancelling from thread ${Thread.currentThread().name}")
        job.cancel()
        println("cancelled")
    }

Output:

Running some work on MainThread
cancelling from thread MainThread
Finally block on MainThread
cancelled

Note the synchronous behavior here: job starts immediately and runs until collector installed. Also, finally blocks runs immediately on cancelation, and it only returns after the finally block finishes. I think we want the same behavior with WorkerUnbinder.unbind() and Worker.onStop()`.

Copy link
Contributor

@psteiger psteiger Mar 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More experimentation: Notice that with current Rx implementation, the issue of calling unbinder.unbind() on a worker thread is the same: we'll end up calling worker.onStop on the caller worker thread:

    val unbinder = PublishSubject.create<Int>()
    Observable.merge(Observable.just(1), unbinder).subscribe {
        // call worker.onStart or onStop
        println("$it ${Thread.currentThread().name}")
    }
    Dispatchers.Default {
        delay(100)
        // simulate unbinder.unbind on worker thread
        unbinder.onNext(2)
    }

Output:

1 main
2 DefaultDispatcher-worker-1

If we use observeOn before subscribe, this fixes it, but introduces unwanted asynchronicity, hence my proposal to forbid unbinding from a worker thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding 1, that's what I observed as well, and was thinking we'd need to use create a coroutinecontext on the spot. We can take one in as optional in the params for testing, but the current usage does "feel" leaky since it subscribes directly, but it looks like it ends up getting coupled to the parents lifecycle through the internal logic inside it.

Regarding 2, @FranAguilera has done some research on workers binding on main thread vs background and has a proposal to make an explicit API change. WIth his proposed changes, we'd need both options.

@tyvsmith tyvsmith changed the title Replacing some Behavior/Publish Relay usage in core artifacts with coroutiens Replacing some Behavior/Publish Relay usage in core artifacts with coroutines Mar 25, 2023
val workerLifecycle = merge(mappedLifecycle.asFlow(), unbindFlow)
.transformWhile { emit(it) ; it != WorkerEvent.STOP }
.asObservable()

bindToWorkerLifecycle(workerLifecycle, worker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came up with the following implementation that passes tests from WorkerBinderTest. However, some things are still unclear to me:

1. The scope of the coroutine / Rx subscription
it's still unclear to me what should be the scope of the coroutine -- I wonder the same about the current implementation by the way -- it feels leaky because the Disposable returned by the subscription in bindToWorkerLifecycle is never used. In the example I just used MainScope(), could also work with GlobalScope (assuming bind is called from the main thread).

2. Multithreading/concurrency
I feel like all of this binding/unbinding should be done on the main thread. But what if the WorkerUnbinder.unbind() is called from a worker thread? In the example below, worker.onStop() would run a non-main thread, and I think it's erroneous. If WorkerUnbinder.unbind() is called from a non-main-thread, it would require asynchronicity to run worker.onStop() on main thread (we'll need to dispatch the operation). The same doubt remains on current Rx implementation. I feel like binding and unbinding from a non-main thread should just be forbidden (throw exception) for the most predictable behavior: always synchronous (by the time workerUnbinder.unbind() finishes, worker.onStop() has also finished).

  open fun bind(mappedLifecycle: Observable<WorkerEvent>, worker: Worker): WorkerUnbinder {
    val job = MainScope().launch(Dispatchers.Unconfined) {
      try {
        mappedLifecycle.asFlow()
          .takeWhile { it == WorkerEvent.START }
          .collect { worker.onStart(WorkerScopeProvider(mappedLifecycle.hide())) }
      } finally {
        // we should ensure here that WorkerUnbinder.unbind() is called from a main thread,
        // or else worker.onStop() will be called on whatever thread called workerUnbinder.unbind().
        worker.onStop()
      }
    }
    return WorkerUnbinder { 
      job.cancel()
      // because we use Dispatchers.Unconfined, by the time this line is reached the finally block
      // above has already run.
    }
  }

val workerLifecycle = merge(mappedLifecycle.asFlow(), unbindFlow)
.transformWhile { emit(it) ; it != WorkerEvent.STOP }
.asObservable()

bindToWorkerLifecycle(workerLifecycle, worker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some experimentation:

    withContext(newSingleThreadContext("MainThread")) {
        val job = launch(Dispatchers.Unconfined) {
            try {
                println("Running some work on ${Thread.currentThread().name}")
                awaitCancellation()
            } finally {
                println("Finally block on ${Thread.currentThread().name}")
                Thread.sleep(1000) // some long-running operation
            }
        }
        println("cancelling from thread ${Thread.currentThread().name}")
        job.cancel()
        println("cancelled")
    }

Output:

Running some work on MainThread
cancelling from thread MainThread
Finally block on MainThread
cancelled

Note the synchronous behavior here: job starts immediately and runs until collector installed. Also, finally blocks runs immediately on cancelation, and it only returns after the finally block finishes. I think we want the same behavior with WorkerUnbinder.unbind() and Worker.onStop()`.

@jbarr21 jbarr21 self-requested a review March 27, 2023 18:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants