Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions crates/core/src/sync/storage_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,24 @@ impl StorageAdapter {
self.delete_subscription.exec()?;
Ok(())
}

pub fn local_state(&self) -> Result<Option<LocalState>, PowerSyncError> {
let stmt = self
.db
.prepare_v2("SELECT target_op FROM ps_buckets WHERE name = ?")?;
stmt.bind_text(1, "$local", sqlite_nostd::Destructor::STATIC)?;

Ok(if stmt.step()? == ResultCode::ROW {
let target_op = stmt.column_int64(0);
Some(LocalState { target_op })
} else {
None
})
}
}

pub struct LocalState {
pub target_op: i64,
}

pub struct BucketInfo {
Expand Down
66 changes: 44 additions & 22 deletions crates/core/src/sync/streaming_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,12 @@ impl StreamingSyncIteration {
|s| s.start_tracking_checkpoint(progress, subscription_state),
&mut event.instructions,
);

// Technically, we could still try to apply a pending checkpoint after receiving a
// new one. However, sync_local assumes it's only called in a state where there's no
// pending checkpoint, so we'd have to take the oplog state at the time we've
// originally received the validated-but-not-applied checkpoint. This is likely not
// something worth doing.
self.validated_but_not_applied = None;
*target = updated_target;
}
Expand Down Expand Up @@ -515,28 +521,7 @@ impl StreamingSyncIteration {
SyncEvent::BinaryLine { data } => bson::from_bytes(data)
.map_err(|e| PowerSyncError::sync_protocol_error("invalid binary line", e))?,
SyncEvent::UploadFinished => {
if let Some(checkpoint) = self.validated_but_not_applied.take() {
let result = self.sync_local(&checkpoint, None)?;

match result {
SyncLocalResult::ChangesApplied => {
event.instructions.push(Instruction::LogLine {
severity: LogSeverity::DEBUG,
line: "Applied pending checkpoint after completed upload"
.into(),
});

self.handle_checkpoint_applied(event, self.adapter.now()?);
}
_ => {
event.instructions.push(Instruction::LogLine {
severity: LogSeverity::WARNING,
line: "Could not apply pending checkpoint even after completed upload"
.into(),
});
}
}
}
self.try_applying_write_after_completed_upload(event)?;

continue;
}
Expand Down Expand Up @@ -608,6 +593,43 @@ impl StreamingSyncIteration {
Ok(progress)
}

fn try_applying_write_after_completed_upload(
&mut self,
event: &mut ActiveEvent,
) -> Result<(), PowerSyncError> {
let Some(checkpoint) = self.validated_but_not_applied.take() else {
return Ok(());
};

let target_write = self.adapter.local_state()?.map(|e| e.target_op);
if checkpoint.write_checkpoint < target_write {
// Note: None < Some(x). The pending checkpoint does not contain the write
// checkpoint created during the upload, so we don't have to try applying it, it's
// guaranteed to be outdated.
return Ok(());
}

let result = self.sync_local(&checkpoint, None)?;
match result {
SyncLocalResult::ChangesApplied => {
event.instructions.push(Instruction::LogLine {
severity: LogSeverity::DEBUG,
line: "Applied pending checkpoint after completed upload".into(),
});

self.handle_checkpoint_applied(event, self.adapter.now()?);
}
_ => {
event.instructions.push(Instruction::LogLine {
severity: LogSeverity::WARNING,
line: "Could not apply pending checkpoint even after completed upload".into(),
});
}
}

Ok(())
}

/// Reconciles local stream subscriptions with service-side state received in a checkpoint.
///
/// This involves:
Expand Down
11 changes: 2 additions & 9 deletions crates/core/src/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,10 @@ impl<'a> SyncOperation<'a> {
if needs_check {
// language=SQLite
let statement = self.db.prepare_v2(
"\
SELECT group_concat(name)
FROM ps_buckets
WHERE target_op > last_op AND name = '$local'",
"SELECT 1 FROM ps_buckets WHERE target_op > last_op AND name = '$local'",
)?;

if statement.step()? != ResultCode::ROW {
return Err(PowerSyncError::unknown_internal());
}

if statement.column_type(0)? == ColumnType::Text {
if statement.step()? == ResultCode::ROW {
return Ok(false);
}

Expand Down
127 changes: 125 additions & 2 deletions dart/test/sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,14 @@ void _syncTests<T>({
}

List<Object?> pushCheckpoint(
{int lastOpId = 1, List<Object> buckets = const []}) {
return syncLine(checkpoint(lastOpId: lastOpId, buckets: buckets));
{int lastOpId = 1,
List<Object> buckets = const [],
String? writeCheckpoint}) {
return syncLine(checkpoint(
lastOpId: lastOpId,
buckets: buckets,
writeCheckpoint: writeCheckpoint,
));
}

List<Object?> pushCheckpointComplete({int? priority, String lastOpId = '1'}) {
Expand Down Expand Up @@ -676,6 +682,123 @@ void _syncTests<T>({
});
});

group('applies pending changes', () {
test('write checkpoint before upload complete', () {
// local write while offline
db.execute("insert into items (id, col) values ('local', 'data');");
invokeControl('start', null);

// Start upload process. Assume data has been uploaded and a write
// checkpoint has been requested, but not received yet.
db.execute('DELETE FROM ps_crud');
pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1');
pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'});
expect(pushCheckpointComplete(), [
containsPair('LogLine', {
'severity': 'INFO',
'line': contains('Will retry at completed upload')
})
]);

// Now complete the upload process.
db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'");
invokeControl('completed_upload', null);

// This should apply the pending write checkpoint.
expect(fetchRows(), [
{'id': 'row-0', 'col': 'hi'}
]);
});

test('write checkpoint with synced data', () {
// local write while offline
db.execute("insert into items (id, col) values ('local', 'data');");
invokeControl('start', null);

// Complete upload process
db.execute('DELETE FROM ps_crud');
db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'");
expect(invokeControl('completed_upload', null), isEmpty);

// Sync afterwards containing data and write checkpoint.
pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1');
pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'});
pushCheckpointComplete();
expect(fetchRows(), [
{'id': 'row-0', 'col': 'hi'}
]);
});

test('write checkpoint after synced data', () {
// local write while offline
db.execute("insert into items (id, col) values ('local', 'data');");
invokeControl('start', null);

// Upload changes, assume that triggered a checkpoint.
db.execute('DELETE FROM ps_crud');
pushCheckpoint(buckets: priorityBuckets);
pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'});
expect(pushCheckpointComplete(), [
containsPair('LogLine', {
'severity': 'INFO',
'line': contains('Will retry at completed upload')
})
]);

// Now the upload is complete and requests a write checkpoint
db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'");
expect(invokeControl('completed_upload', null), isEmpty);

// Which triggers a new iteration
pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1');
expect(
pushCheckpointComplete(),
contains(containsPair('LogLine', {
'severity': 'DEBUG',
'line': contains('Validated and applied checkpoint')
})));

expect(fetchRows(), [
{'id': 'row-0', 'col': 'hi'}
]);
});

test('second local write', () {
// first local write while offline
db.execute("insert into items (id, col) values ('local', 'data');");
invokeControl('start', null);

// Upload changes, assume that triggered a checkpoint.
db.execute('DELETE FROM ps_crud');
pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1');
pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'});
expect(pushCheckpointComplete(), [
containsPair('LogLine', {
'severity': 'INFO',
'line': contains('Will retry at completed upload')
})
]);

// Second local write during sync
db.execute("insert into items (id, col) values ('local2', 'data2');");

// Now the upload is complete and requests a write checkpoint
db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'");
expect(invokeControl('completed_upload', null), [
containsPair('LogLine', {
'severity': 'WARNING',
'line':
'Could not apply pending checkpoint even after completed upload'
})
]);

expect(fetchRows(), [
{'id': 'local', 'col': 'data'},
{'id': 'local2', 'col': 'data2'},
]);
});
});

group('errors', () {
syncTest('diff without prior checkpoint', (_) {
invokeControl('start', null);
Expand Down
2 changes: 1 addition & 1 deletion dart/test/utils/test_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Object checkpoint({
return {
'checkpoint': {
'last_op_id': '$lastOpId',
'write_checkpoint': null,
'write_checkpoint': writeCheckpoint,
'buckets': buckets,
'streams': streams,
}
Expand Down
Loading