diff --git a/.github/workflows/gradle-wrapper-validation.yml b/.github/workflows/gradle-wrapper-validation.yml index 13dca3db33d..8cbe448f6ff 100644 --- a/.github/workflows/gradle-wrapper-validation.yml +++ b/.github/workflows/gradle-wrapper-validation.yml @@ -3,10 +3,10 @@ name: "Validate Gradle Wrapper" on: push: branches: - - master + - main pull_request: branches: - - master + - main jobs: validation: diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorage.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorage.kt index 1aaf4d63b9f..1d907fcc6c7 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorage.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorage.kt @@ -2,6 +2,8 @@ package de.rki.coronawarnapp.nearby.modules.calculationtracker import android.content.Context import com.google.gson.Gson +import de.rki.coronawarnapp.exception.ExceptionCategory +import de.rki.coronawarnapp.exception.reporting.report import de.rki.coronawarnapp.util.di.AppContext import de.rki.coronawarnapp.util.gson.fromJson import de.rki.coronawarnapp.util.gson.toJson @@ -36,11 +38,13 @@ class CalculationTrackerStorage @Inject constructor( if (!storageFile.exists()) return@withLock emptyMap() gson.fromJson>(storageFile).also { + require(it.size >= 0) Timber.v("Loaded calculation data: %s", it) lastCalcuationData = it } } catch (e: Exception) { Timber.e(e, "Failed to load tracked calculations.") + if (storageFile.delete()) Timber.w("Storage file was deleted.") emptyMap() } } @@ -55,6 +59,7 @@ class CalculationTrackerStorage @Inject constructor( gson.toJson(data, storageFile) } catch (e: Exception) { Timber.e(e, "Failed to save tracked calculations.") + e.report(ExceptionCategory.INTERNAL) } } } diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt index 359c2e948ed..a52a476a6c9 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/flow/HotDataFlow.kt @@ -6,7 +6,6 @@ import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharingStarted -import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.distinctUntilChanged @@ -15,6 +14,8 @@ import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.plus +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import timber.log.Timber import kotlin.coroutines.CoroutineContext @@ -36,29 +37,35 @@ class HotDataFlow( extraBufferCapacity = Int.MAX_VALUE, onBufferOverflow = BufferOverflow.SUSPEND ) + private val valueGuard = Mutex() private val internalFlow = channelFlow { - var currentValue = startValueProvider().also { - Timber.tag(tag).v("startValue=%s", it) - send(it) + var currentValue = valueGuard.withLock { + startValueProvider().also { send(it) } } + Timber.tag(tag).v("startValue=%s", currentValue) - updateActions.collect { updateAction -> - currentValue = updateAction(currentValue).also { - currentValue = it - send(it) + updateActions + .onCompletion { + Timber.tag(tag).v("updateActions onCompletion -> resetReplayCache()") + updateActions.resetReplayCache() } - } + .collect { updateAction -> + currentValue = valueGuard.withLock { + updateAction(currentValue).also { send(it) } + } + } + + Timber.tag(tag).v("internal channelFlow finished.") } val data: Flow = internalFlow .distinctUntilChanged() .onStart { Timber.tag(tag).v("internal onStart") } - .catch { - Timber.tag(tag).e(it, "internal Error") - throw it + .onCompletion { err -> + if (err != null) Timber.tag(tag).w(err, "internal onCompletion due to error") + else Timber.tag(tag).v("internal onCompletion") } - .onCompletion { Timber.tag(tag).v("internal onCompletion") } .shareIn( scope = scope + coroutineContext, replay = 1, diff --git a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/gson/GsonExtensions.kt b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/gson/GsonExtensions.kt index b79e725c5d9..79f055e93fc 100644 --- a/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/gson/GsonExtensions.kt +++ b/Corona-Warn-App/src/main/java/de/rki/coronawarnapp/util/gson/GsonExtensions.kt @@ -9,10 +9,11 @@ inline fun Gson.fromJson(json: String): T = fromJson( object : TypeToken() {}.type ) -inline fun Gson.fromJson(file: File): T = file.reader().use { +inline fun Gson.fromJson(file: File): T = file.bufferedReader().use { fromJson(it, object : TypeToken() {}.type) } -inline fun Gson.toJson(data: T, file: File) = file.writer().use { writer -> +inline fun Gson.toJson(data: T, file: File) = file.bufferedWriter().use { writer -> toJson(data, writer) + writer.flush() } diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorageTest.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorageTest.kt index 5b1be36e8be..4ab6acc1f3e 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorageTest.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/nearby/modules/calculationtracker/CalculationTrackerStorageTest.kt @@ -130,4 +130,20 @@ class CalculationTrackerStorageTest : BaseIOTest() { storedData.getValue("b2b98400-058d-43e6-b952-529a5255248b").isCalculating shouldBe true storedData.getValue("aeb15509-fb34-42ce-8795-7a9ae0c2f389").isCalculating shouldBe false } + + @Test + fun `we catch empty json data and prevent unsafely initialized maps`() = runBlockingTest { + storageDir.mkdirs() + storageFile.writeText("") + + storageFile.exists() shouldBe true + + createStorage().apply { + val value = load() + value.size shouldBe 0 + value shouldBe emptyMap() + + storageFile.exists() shouldBe false + } + } } diff --git a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt index 0a5702aa451..0d6642641c8 100644 --- a/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt +++ b/Corona-Warn-App/src/test/java/de/rki/coronawarnapp/util/flow/HotDataFlowTest.kt @@ -1,5 +1,6 @@ package de.rki.coronawarnapp.util.flow +import de.rki.coronawarnapp.util.mutate import io.kotest.matchers.shouldBe import io.kotest.matchers.types.instanceOf import io.mockk.coEvery @@ -122,6 +123,48 @@ class HotDataFlowTest : BaseTest() { coVerify(exactly = 1) { valueProvider.invoke(any()) } } + data class TestData( + val number: Long = 1 + ) + + @Test + fun `check multi threading value updates with more complex data`() { + val testScope = TestCoroutineScope() + val valueProvider = mockk Map>() + coEvery { valueProvider.invoke(any()) } returns mapOf("data" to TestData()) + + val hotData = HotDataFlow( + loggingTag = "tag", + scope = testScope, + startValueProvider = valueProvider, + sharingBehavior = SharingStarted.Lazily + ) + + val testCollector = hotData.data.test(startOnScope = testScope) + testCollector.silent = true + + (1..10).forEach { _ -> + thread { + (1..400).forEach { _ -> + hotData.updateSafely { + mutate { + this["data"] = getValue("data").copy( + number = getValue("data").number + 1 + ) + } + } + } + } + } + + runBlocking { + testCollector.await { list, l -> list.size == 4001 } + testCollector.latestValues.map { it.values.single().number } shouldBe (1L..4001L).toList() + } + + coVerify(exactly = 1) { valueProvider.invoke(any()) } + } + @Test fun `only emit new values if they actually changed updates`() { val testScope = TestCoroutineScope() @@ -163,9 +206,9 @@ class HotDataFlowTest : BaseTest() { testScope.runBlockingTest2(permanentJobs = true) { - val sub1 = hotData.data.test().start(scope = this) - val sub2 = hotData.data.test().start(scope = this) - val sub3 = hotData.data.test().start(scope = this) + val sub1 = hotData.data.test(tag = "sub1", startOnScope = this) + val sub2 = hotData.data.test(tag = "sub2", startOnScope = this) + val sub3 = hotData.data.test(tag = "sub3", startOnScope = this) hotData.updateSafely { "A" } hotData.updateSafely { "B" } @@ -181,4 +224,47 @@ class HotDataFlowTest : BaseTest() { } coVerify(exactly = 1) { valueProvider.invoke(any()) } } + + @Test + fun `update queue is wiped on completion`() = runBlockingTest2(permanentJobs = true) { + val valueProvider = mockk Long>() + coEvery { valueProvider.invoke(any()) } returns 1 + + val hotData = HotDataFlow( + loggingTag = "tag", + scope = this, + coroutineContext = this.coroutineContext, + startValueProvider = valueProvider, + sharingBehavior = SharingStarted.WhileSubscribed(replayExpirationMillis = 0) + ) + + val testCollector1 = hotData.data.test(tag = "collector1", startOnScope = this) + testCollector1.silent = false + + (1..10).forEach { _ -> + hotData.updateSafely { + this + 1L + } + } + + advanceUntilIdle() + + testCollector1.await { list, _ -> list.size == 11 } + testCollector1.latestValues shouldBe (1L..11L).toList() + + testCollector1.cancel() + testCollector1.awaitFinal() + + val testCollector2 = hotData.data.test(tag = "collector2", startOnScope = this) + testCollector2.silent = false + + advanceUntilIdle() + + testCollector2.cancel() + testCollector2.awaitFinal() + + testCollector2.latestValues shouldBe listOf(1L) + + coVerify(exactly = 2) { valueProvider.invoke(any()) } + } } diff --git a/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt b/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt index 975a8c48dd5..d874d9ea36a 100644 --- a/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt +++ b/Corona-Warn-App/src/test/java/testhelpers/coroutines/FlowTest.kt @@ -16,14 +16,17 @@ import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.test.TestCoroutineScope +import kotlinx.coroutines.withTimeout +import org.joda.time.Duration import timber.log.Timber fun Flow.test( tag: String? = null, - startOnScope: CoroutineScope -): TestCollector = test(tag ?: "FlowTest").start(scope = startOnScope) + startOnScope: CoroutineScope = TestCoroutineScope() +): TestCollector = createTest(tag ?: "FlowTest").start(scope = startOnScope) -fun Flow.test( +fun Flow.createTest( tag: String? = null ): TestCollector = TestCollector(this, tag ?: "FlowTest") @@ -74,9 +77,14 @@ class TestCollector( val latestValues: List get() = collectedValues - fun await(condition: (List, T) -> Boolean): T = runBlocking { - emissions().first { - condition(collectedValues, it) + fun await( + timeout: Duration = Duration.standardSeconds(10), + condition: (List, T) -> Boolean + ): T = runBlocking { + withTimeout(timeMillis = timeout.millis) { + emissions().first { + condition(collectedValues, it) + } } } @@ -94,6 +102,8 @@ class TestCollector( } fun cancel() { + if (job.isCompleted) throw IllegalStateException("Flow is already canceled.") + runBlocking { job.cancelAndJoin() } diff --git a/gradle.properties b/gradle.properties index 35e2e5d5c28..9d6eecb9a7b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -19,5 +19,5 @@ org.gradle.dependency.verification.console=verbose # Versioning, this is used by the app & pipelines to calculate the current versionCode & versionName VERSION_MAJOR=1 VERSION_MINOR=6 -VERSION_PATCH=0 -VERSION_BUILD=8 +VERSION_PATCH=1 +VERSION_BUILD=1