Skip to content

Commit a5d46d0

Browse files
committed
Reduce warnings for sync after uploads
1 parent f970090 commit a5d46d0

File tree

5 files changed

+195
-34
lines changed

5 files changed

+195
-34
lines changed

crates/core/src/sync/storage_adapter.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,29 @@ impl StorageAdapter {
459459
self.delete_subscription.exec()?;
460460
Ok(())
461461
}
462+
463+
pub fn local_state(&self) -> Result<Option<LocalState>, PowerSyncError> {
464+
let stmt = self
465+
.db
466+
.prepare_v2("SELECT target_op, last_op FROM ps_buckets WHERE name = ?")?;
467+
stmt.bind_text(1, "$local", sqlite_nostd::Destructor::STATIC)?;
468+
469+
Ok(if stmt.step()? == ResultCode::ROW {
470+
let target_op = stmt.column_int64(0);
471+
let last_op = stmt.column_int64(1);
472+
Some(LocalState {
473+
target_op,
474+
_last_op: last_op,
475+
})
476+
} else {
477+
None
478+
})
479+
}
480+
}
481+
482+
pub struct LocalState {
483+
pub target_op: i64,
484+
pub _last_op: i64,
462485
}
463486

464487
pub struct BucketInfo {

crates/core/src/sync/streaming_sync.rs

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,12 @@ impl StreamingSyncIteration {
448448
|s| s.start_tracking_checkpoint(progress, subscription_state),
449449
&mut event.instructions,
450450
);
451+
452+
// Technically, we could still try to apply a pending checkpoint after receiving a
453+
// new one. However, sync_local assumes it's only called in a state where there's no
454+
// pending checkpoint, so we'd have to take the oplog state at the time we've
455+
// originally received the validated-but-not-applied checkpoint. This is likely not
456+
// something worth doing.
451457
self.validated_but_not_applied = None;
452458
*target = updated_target;
453459
}
@@ -515,28 +521,7 @@ impl StreamingSyncIteration {
515521
SyncEvent::BinaryLine { data } => bson::from_bytes(data)
516522
.map_err(|e| PowerSyncError::sync_protocol_error("invalid binary line", e))?,
517523
SyncEvent::UploadFinished => {
518-
if let Some(checkpoint) = self.validated_but_not_applied.take() {
519-
let result = self.sync_local(&checkpoint, None)?;
520-
521-
match result {
522-
SyncLocalResult::ChangesApplied => {
523-
event.instructions.push(Instruction::LogLine {
524-
severity: LogSeverity::DEBUG,
525-
line: "Applied pending checkpoint after completed upload"
526-
.into(),
527-
});
528-
529-
self.handle_checkpoint_applied(event, self.adapter.now()?);
530-
}
531-
_ => {
532-
event.instructions.push(Instruction::LogLine {
533-
severity: LogSeverity::WARNING,
534-
line: "Could not apply pending checkpoint even after completed upload"
535-
.into(),
536-
});
537-
}
538-
}
539-
}
524+
self.try_applying_write_after_completed_upload(event)?;
540525

541526
continue;
542527
}
@@ -608,6 +593,43 @@ impl StreamingSyncIteration {
608593
Ok(progress)
609594
}
610595

596+
fn try_applying_write_after_completed_upload(
597+
&mut self,
598+
event: &mut ActiveEvent,
599+
) -> Result<(), PowerSyncError> {
600+
let Some(checkpoint) = self.validated_but_not_applied.take() else {
601+
return Ok(());
602+
};
603+
604+
let target_write = self.adapter.local_state()?.map(|e| e.target_op);
605+
if checkpoint.write_checkpoint < target_write {
606+
// Note: None < Some(x). The pending checkpoint does not contain the write
607+
// checkpoint created during the upload, so we don't have to try applying it, it's
608+
// guaranteed to be outdated.
609+
return Ok(());
610+
}
611+
612+
let result = self.sync_local(&checkpoint, None)?;
613+
match result {
614+
SyncLocalResult::ChangesApplied => {
615+
event.instructions.push(Instruction::LogLine {
616+
severity: LogSeverity::DEBUG,
617+
line: "Applied pending checkpoint after completed upload".into(),
618+
});
619+
620+
self.handle_checkpoint_applied(event, self.adapter.now()?);
621+
}
622+
_ => {
623+
event.instructions.push(Instruction::LogLine {
624+
severity: LogSeverity::WARNING,
625+
line: "Could not apply pending checkpoint even after completed upload".into(),
626+
});
627+
}
628+
}
629+
630+
Ok(())
631+
}
632+
611633
/// Reconciles local stream subscriptions with service-side state received in a checkpoint.
612634
///
613635
/// This involves:

crates/core/src/sync_local.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -102,17 +102,10 @@ impl<'a> SyncOperation<'a> {
102102
if needs_check {
103103
// language=SQLite
104104
let statement = self.db.prepare_v2(
105-
"\
106-
SELECT group_concat(name)
107-
FROM ps_buckets
108-
WHERE target_op > last_op AND name = '$local'",
105+
"SELECT 1 FROM ps_buckets WHERE target_op > last_op AND name = '$local'",
109106
)?;
110107

111-
if statement.step()? != ResultCode::ROW {
112-
return Err(PowerSyncError::unknown_internal());
113-
}
114-
115-
if statement.column_type(0)? == ColumnType::Text {
108+
if statement.step()? == ResultCode::ROW {
116109
return Ok(false);
117110
}
118111

dart/test/sync_test.dart

Lines changed: 125 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,14 @@ void _syncTests<T>({
121121
}
122122

123123
List<Object?> pushCheckpoint(
124-
{int lastOpId = 1, List<Object> buckets = const []}) {
125-
return syncLine(checkpoint(lastOpId: lastOpId, buckets: buckets));
124+
{int lastOpId = 1,
125+
List<Object> buckets = const [],
126+
String? writeCheckpoint}) {
127+
return syncLine(checkpoint(
128+
lastOpId: lastOpId,
129+
buckets: buckets,
130+
writeCheckpoint: writeCheckpoint,
131+
));
126132
}
127133

128134
List<Object?> pushCheckpointComplete({int? priority, String lastOpId = '1'}) {
@@ -676,6 +682,123 @@ void _syncTests<T>({
676682
});
677683
});
678684

685+
group('applies pending changes', () {
686+
test('write checkpoint before upload complete', () {
687+
// local write while offline
688+
db.execute("insert into items (id, col) values ('local', 'data');");
689+
invokeControl('start', null);
690+
691+
// Start upload process. Assume data has been uploaded and a write
692+
// checkpoint has been requested, but not received yet.
693+
db.execute('DELETE FROM ps_crud');
694+
pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1');
695+
pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'});
696+
expect(pushCheckpointComplete(), [
697+
containsPair('LogLine', {
698+
'severity': 'INFO',
699+
'line': contains('Will retry at completed upload')
700+
})
701+
]);
702+
703+
// Now complete the upload process.
704+
db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'");
705+
invokeControl('completed_upload', null);
706+
707+
// This should apply the pending write checkpoint.
708+
expect(fetchRows(), [
709+
{'id': 'row-0', 'col': 'hi'}
710+
]);
711+
});
712+
713+
test('write checkpoint with synced data', () {
714+
// local write while offline
715+
db.execute("insert into items (id, col) values ('local', 'data');");
716+
invokeControl('start', null);
717+
718+
// Complete upload process
719+
db.execute('DELETE FROM ps_crud');
720+
db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'");
721+
expect(invokeControl('completed_upload', null), isEmpty);
722+
723+
// Sync afterwards containing data and write checkpoint.
724+
pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1');
725+
pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'});
726+
pushCheckpointComplete();
727+
expect(fetchRows(), [
728+
{'id': 'row-0', 'col': 'hi'}
729+
]);
730+
});
731+
732+
test('write checkpoint after synced data', () {
733+
// local write while offline
734+
db.execute("insert into items (id, col) values ('local', 'data');");
735+
invokeControl('start', null);
736+
737+
// Upload changes, assume that triggered a checkpoint.
738+
db.execute('DELETE FROM ps_crud');
739+
pushCheckpoint(buckets: priorityBuckets);
740+
pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'});
741+
expect(pushCheckpointComplete(), [
742+
containsPair('LogLine', {
743+
'severity': 'INFO',
744+
'line': contains('Will retry at completed upload')
745+
})
746+
]);
747+
748+
// Now the upload is complete and requests a write checkpoint
749+
db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'");
750+
expect(invokeControl('completed_upload', null), isEmpty);
751+
752+
// Which triggers a new iteration
753+
pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1');
754+
expect(
755+
pushCheckpointComplete(),
756+
contains(containsPair('LogLine', {
757+
'severity': 'DEBUG',
758+
'line': contains('Validated and applied checkpoint')
759+
})));
760+
761+
expect(fetchRows(), [
762+
{'id': 'row-0', 'col': 'hi'}
763+
]);
764+
});
765+
766+
test('second local write', () {
767+
// first local write while offline
768+
db.execute("insert into items (id, col) values ('local', 'data');");
769+
invokeControl('start', null);
770+
771+
// Upload changes, assume that triggered a checkpoint.
772+
db.execute('DELETE FROM ps_crud');
773+
pushCheckpoint(buckets: priorityBuckets, writeCheckpoint: '1');
774+
pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'});
775+
expect(pushCheckpointComplete(), [
776+
containsPair('LogLine', {
777+
'severity': 'INFO',
778+
'line': contains('Will retry at completed upload')
779+
})
780+
]);
781+
782+
// Second local write during sync
783+
db.execute("insert into items (id, col) values ('local2', 'data2');");
784+
785+
// Now the upload is complete and requests a write checkpoint
786+
db.execute(r"UPDATE ps_buckets SET target_op = 1 WHERE name = '$local'");
787+
expect(invokeControl('completed_upload', null), [
788+
containsPair('LogLine', {
789+
'severity': 'WARNING',
790+
'line':
791+
'Could not apply pending checkpoint even after completed upload'
792+
})
793+
]);
794+
795+
expect(fetchRows(), [
796+
{'id': 'local', 'col': 'data'},
797+
{'id': 'local2', 'col': 'data2'},
798+
]);
799+
});
800+
});
801+
679802
group('errors', () {
680803
syncTest('diff without prior checkpoint', (_) {
681804
invokeControl('start', null);

dart/test/utils/test_utils.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Object checkpoint({
1111
return {
1212
'checkpoint': {
1313
'last_op_id': '$lastOpId',
14-
'write_checkpoint': null,
14+
'write_checkpoint': writeCheckpoint,
1515
'buckets': buckets,
1616
'streams': streams,
1717
}

0 commit comments

Comments
 (0)