diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt index d8bc315a..d1052827 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncProgressTest.kt @@ -282,6 +282,61 @@ class SyncProgressTest { syncLines.close() } + @Test + fun interruptedWithDefrag() = + databaseTest { + database.connect(connector) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "10", + checksums = listOf(bucket("a", 10)), + ), + ), + ) + turbine.expectProgress(0 to 10) + + addDataLine("a", 5) + turbine.expectProgress(5 to 10) + + turbine.cancel() + } + + // Close and re-connect + database.close() + syncLines.close() + database = openDatabase() + syncLines = Channel() + database.connect(connector) + + turbineScope { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected && !it.downloading } + + // A sync rule deploy could reset buckets, making the new bucket smaller than the + // existing one. + syncLines.send( + SyncLine.FullCheckpoint( + Checkpoint( + lastOpId = "14", + checksums = listOf(bucket("a", 4)), + ), + ), + ) + + // In this special case, don't report 5/4 as progress + turbine.expectProgress(0 to 4) + turbine.cancel() + } + + database.close() + syncLines.close() + } + @Test fun differentPriorities() = databaseTest { diff --git a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt index 8c6b64e0..3b80875b 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/Progress.kt @@ -3,6 +3,7 @@ package com.powersync.sync import com.powersync.bucket.BucketPriority import com.powersync.bucket.Checkpoint import com.powersync.bucket.LocalOperationCounters +import kotlin.math.min /** * Information about a progressing download. @@ -83,18 +84,38 @@ public data class SyncDownloadProgress private constructor( */ internal constructor(localProgress: Map, target: Checkpoint) : this( buildMap { + var invalidated = false + for (entry in target.checksums) { val savedProgress = localProgress[entry.bucket] + val atLast = savedProgress?.atLast ?: 0 + val sinceLast = savedProgress?.sinceLast ?: 0 put( entry.bucket, BucketProgress( priority = entry.priority, - atLast = savedProgress?.atLast ?: 0, - sinceLast = savedProgress?.sinceLast ?: 0, targetCount = entry.count ?: 0, + atLast = atLast, + sinceLast = sinceLast, ), ) + + entry.count?.let { knownCount -> + if (knownCount < atLast + sinceLast) { + // Either due to a defrag / sync rule deploy or a compaction operation, the + // size of the bucket shrank so much that the local ops exceed the ops in + // the updated bucket. We can't possibly report progress in this case (it + // would overshoot 100%). + invalidated = true + } + } + } + + if (invalidated) { + for ((key, value) in entries) { + put(key, value.copy(sinceLast = 0, atLast = 0)) + } } }, ) @@ -120,7 +141,7 @@ public data class SyncDownloadProgress private constructor( put( bucket.bucket, previous.copy( - sinceLast = previous.sinceLast + bucket.data.size, + sinceLast = min(previous.sinceLast + bucket.data.size, previous.total), ), ) }