Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

Improve HotDataFlow behavior (EXPOSUREAPP-3777) #1612

Merged
merged 3 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.plus
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import timber.log.Timber
import kotlin.coroutines.CoroutineContext

Expand All @@ -36,19 +38,24 @@ class HotDataFlow<T : Any>(
extraBufferCapacity = Int.MAX_VALUE,
onBufferOverflow = BufferOverflow.SUSPEND
)
private val valueGuard = Mutex()

private val internalFlow = channelFlow {
var currentValue = startValueProvider().also {
Timber.tag(tag).v("startValue=%s", it)
send(it)
var currentValue = valueGuard.withLock {
startValueProvider().also { send(it) }
}
Timber.tag(tag).v("startValue=%s", currentValue)

updateActions.collect { updateAction ->
currentValue = updateAction(currentValue).also {
currentValue = it
send(it)
updateActions
.onCompletion {
Timber.tag(tag).v("updateActions resetReplayCache().")
updateActions.resetReplayCache()
Copy link
Member Author

@d4rken d4rken Nov 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we unsubscribe, and the HotDataFlow goes cold, and then resubscribe, we would replay all previous update actions on the new value.

I don't think this affected the current issue, but it's a bug nonetheless. We should reset the cache here to prevent the actions from being replayed.

}
.collect { updateAction ->
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm suspicious that our access of currentValue in .collect { } was not thread-safe, which is why I added the mutex.

currentValue = valueGuard.withLock {
updateAction(currentValue).also { send(it) }
}
}
}
}

val data: Flow<T> = internalFlow
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.rki.coronawarnapp.util.flow

import de.rki.coronawarnapp.util.mutate
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.instanceOf
import io.mockk.coEvery
Expand Down Expand Up @@ -122,6 +123,48 @@ class HotDataFlowTest : BaseTest() {
coVerify(exactly = 1) { valueProvider.invoke(any()) }
}

data class TestData(
val number: Long = 1
)

@Test
fun `value updates 2`() {
d4rken marked this conversation as resolved.
Show resolved Hide resolved
val testScope = TestCoroutineScope()
val valueProvider = mockk<suspend CoroutineScope.() -> Map<String, TestData>>()
coEvery { valueProvider.invoke(any()) } returns mapOf("data" to TestData())

val hotData = HotDataFlow(
loggingTag = "tag",
scope = testScope,
startValueProvider = valueProvider,
sharingBehavior = SharingStarted.Lazily
)

val testCollector = hotData.data.test(startOnScope = testScope)
testCollector.silent = true

(1..10).forEach { _ ->
thread {
(1..400).forEach { _ ->
hotData.updateSafely {
mutate {
this["data"] = getValue("data").copy(
number = getValue("data").number + 1
)
}
}
}
}
}

runBlocking {
testCollector.await { list, l -> list.size == 4001 }
testCollector.latestValues.map { it.values.single().number } shouldBe (1L..4001L).toList()
}

coVerify(exactly = 1) { valueProvider.invoke(any()) }
}

@Test
fun `only emit new values if they actually changed updates`() {
val testScope = TestCoroutineScope()
Expand Down Expand Up @@ -181,4 +224,48 @@ class HotDataFlowTest : BaseTest() {
}
coVerify(exactly = 1) { valueProvider.invoke(any()) }
}

@Test
fun `update queue is wiped on completion`() {
val testScope = TestCoroutineScope()
val valueProvider = mockk<suspend CoroutineScope.() -> Long>()
coEvery { valueProvider.invoke(any()) } returns 1

val hotData = HotDataFlow(
loggingTag = "tag",
scope = testScope,
startValueProvider = valueProvider,
sharingBehavior = SharingStarted.WhileSubscribed(replayExpirationMillis = 0)
)

val testCollector1 = hotData.data.test(startOnScope = testScope)
testCollector1.silent = false

(1..10).forEach { _ ->
hotData.updateSafely {
this + 1L
}
}

testScope.advanceUntilIdle()

runBlocking {
testCollector1.await { list, l -> list.size == 11 }
testCollector1.latestValues shouldBe (1L..11L).toList()
}

testCollector1.cancel()

val testCollector2 = hotData.data.test(startOnScope = testScope)
testCollector2.silent = false

testScope.advanceUntilIdle()

runBlocking {
testCollector2.await { list, l -> list.size == 1 }
testCollector2.latestValues shouldBe listOf(1L)
}

coVerify(exactly = 2) { valueProvider.invoke(any()) }
}
}
22 changes: 19 additions & 3 deletions Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withTimeout
import org.joda.time.Duration
import timber.log.Timber

fun <T> Flow<T>.test(
Expand Down Expand Up @@ -74,13 +76,21 @@ class TestCollector<T>(
val latestValues: List<T>
get() = collectedValues

fun await(condition: (List<T>, T) -> Boolean): T = runBlocking {
emissions().first {
condition(collectedValues, it)
fun await(
timeout: Duration = Duration.standardSeconds(10),
condition: (List<T>, T) -> Boolean
): T = runBlocking {
requireStillAlive()

withTimeout(timeMillis = timeout.millis) {
emissions().first {
condition(collectedValues, it)
}
}
}

suspend fun awaitFinal() = apply {
requireStillAlive()
try {
job.join()
} catch (e: Exception) {
Expand All @@ -94,8 +104,14 @@ class TestCollector<T>(
}

fun cancel() {
requireStillAlive()

runBlocking {
job.cancelAndJoin()
}
}

private fun requireStillAlive() {
if (job.isCompleted) throw IllegalStateException("Flow is already canceled.")
}
}