Skip to content

Fix reported progress around compaction #197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,61 @@ class SyncProgressTest {
syncLines.close()
}

@Test
fun interruptedWithDefrag() =
databaseTest {
database.connect(connector)

turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
turbine.waitFor { it.connected && !it.downloading }
syncLines.send(
SyncLine.FullCheckpoint(
Checkpoint(
lastOpId = "10",
checksums = listOf(bucket("a", 10)),
),
),
)
turbine.expectProgress(0 to 10)

addDataLine("a", 5)
turbine.expectProgress(5 to 10)

turbine.cancel()
}

// Close and re-connect
database.close()
syncLines.close()
database = openDatabase()
syncLines = Channel()
database.connect(connector)

turbineScope {
val turbine = database.currentStatus.asFlow().testIn(this)
turbine.waitFor { it.connected && !it.downloading }

// A sync rule deploy could reset buckets, making the new bucket smaller than the
// existing one.
syncLines.send(
SyncLine.FullCheckpoint(
Checkpoint(
lastOpId = "14",
checksums = listOf(bucket("a", 4)),
),
),
)

// In this special case, don't report 5/4 as progress
turbine.expectProgress(0 to 4)
turbine.cancel()
}

database.close()
syncLines.close()
}

@Test
fun differentPriorities() =
databaseTest {
Expand Down
27 changes: 24 additions & 3 deletions core/src/commonMain/kotlin/com/powersync/sync/Progress.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.powersync.sync
import com.powersync.bucket.BucketPriority
import com.powersync.bucket.Checkpoint
import com.powersync.bucket.LocalOperationCounters
import kotlin.math.min

/**
* Information about a progressing download.
Expand Down Expand Up @@ -83,18 +84,38 @@ public data class SyncDownloadProgress private constructor(
*/
internal constructor(localProgress: Map<String, LocalOperationCounters>, target: Checkpoint) : this(
buildMap {
var invalidated = false

for (entry in target.checksums) {
val savedProgress = localProgress[entry.bucket]
val atLast = savedProgress?.atLast ?: 0
val sinceLast = savedProgress?.sinceLast ?: 0

put(
entry.bucket,
BucketProgress(
priority = entry.priority,
atLast = savedProgress?.atLast ?: 0,
sinceLast = savedProgress?.sinceLast ?: 0,
targetCount = entry.count ?: 0,
atLast = atLast,
sinceLast = sinceLast,
),
)

entry.count?.let { knownCount ->
if (knownCount < atLast + sinceLast) {
// Either due to a defrag / sync rule deploy or a compaction operation, the
// size of the bucket shrank so much that the local ops exceed the ops in
// the updated bucket. We can't possibly report progress in this case (it
// would overshoot 100%).
invalidated = true
}
}
}

if (invalidated) {
for ((key, value) in entries) {
put(key, value.copy(sinceLast = 0, atLast = 0))
}
}
},
)
Expand All @@ -120,7 +141,7 @@ public data class SyncDownloadProgress private constructor(
put(
bucket.bucket,
previous.copy(
sinceLast = previous.sinceLast + bucket.data.size,
sinceLast = min(previous.sinceLast + bucket.data.size, previous.total),
),
)
}
Expand Down