From a0452056c461b8ed21e7ab51183b8ed3ca79442d Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sun, 29 Jun 2025 18:18:30 -0600 Subject: [PATCH 1/3] Sync error recovery --- crates/core/src/error.rs | 15 ++ crates/core/src/sync/streaming_sync.rs | 317 ++++++++++++++----------- 2 files changed, 192 insertions(+), 140 deletions(-) diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 0b56038d..607d4d5c 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -5,6 +5,7 @@ use alloc::{ boxed::Box, string::{String, ToString}, }; +use num_traits::FromPrimitive; use sqlite_nostd::{context, sqlite3, Connection, Context, ResultCode}; use thiserror::Error; @@ -129,6 +130,20 @@ impl PowerSyncError { Internal { .. } => ResultCode::INTERNAL, } } + + pub fn can_retry(&self) -> bool { + match self.inner.as_ref() { + RawPowerSyncError::Sqlite(cause) => { + let base_error = ResultCode::from_i32((cause.code as i32) & 0xFF); + if base_error == Some(ResultCode::BUSY) || base_error == Some(ResultCode::LOCKED) { + true + } else { + false + } + } + _ => false, + } + } } impl Display for PowerSyncError { diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index 3d6209de..45b398ca 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -86,7 +86,11 @@ impl SyncClient { } }; - Ok(active.instructions) + if let Some(recoverable) = active.recoverable_error.take() { + Err(recoverable) + } else { + Ok(active.instructions) + } } SyncControlRequest::StopSyncStream => self.state.tear_down(), } @@ -137,6 +141,7 @@ impl SyncIterationHandle { state, adapter: StorageAdapter::new(db)?, status: SyncStatusContainer::new(), + validated_but_not_applied: None, }; let future = runner.run().boxed_local(); @@ -182,6 +187,12 @@ struct ActiveEvent<'a> { handled: bool, /// The event to handle event: SyncEvent<'a>, + /// An error to return to the client for a `powersync_control` invocation when that error + /// shouldn't interrupt the sync iteration. + /// + /// For errors that do close the iteration, we report a result by having [SyncIterationHandle::run] + /// returning the error. + recoverable_error: Option, /// Instructions to forward to the client when the `powersync_control` invocation completes. instructions: Vec, } @@ -191,6 +202,7 @@ impl<'a> ActiveEvent<'a> { Self { handled: false, event, + recoverable_error: None, instructions: Vec::new(), } } @@ -202,6 +214,10 @@ struct StreamingSyncIteration { adapter: StorageAdapter, options: StartSyncStream, status: SyncStatusContainer, + // A checkpoint that has been fully received and validated, but couldn't be applied due to + // pending local data. We will retry applying this checkpoint when the client SDK informs us + // that it has finished uploading changes. + validated_but_not_applied: Option, } impl StreamingSyncIteration { @@ -229,14 +245,160 @@ impl StreamingSyncIteration { Wait { a: PhantomData } } + /// Handles a single sync line. + /// + /// When it returns `Ok(true)`, the sync iteration should be stopped. For errors, the type of + /// error determines whether the iteration can continue. + fn handle_line( + &mut self, + target: &mut SyncTarget, + event: &mut ActiveEvent, + line: &SyncLine, + ) -> Result { + match line { + SyncLine::Checkpoint(checkpoint) => { + self.validated_but_not_applied = None; + let to_delete = target.track_checkpoint(&checkpoint); + + self.adapter + .delete_buckets(to_delete.iter().map(|b| b.as_str()))?; + let progress = self.load_progress(target.target_checkpoint().unwrap())?; + self.status.update( + |s| s.start_tracking_checkpoint(progress), + &mut event.instructions, + ); + } + SyncLine::CheckpointDiff(diff) => { + let Some(target) = target.target_checkpoint_mut() else { + return Err(PowerSyncError::sync_protocol_error( + "Received checkpoint_diff without previous checkpoint", + PowerSyncErrorCause::Unknown, + )); + }; + + target.apply_diff(&diff); + self.validated_but_not_applied = None; + self.adapter + .delete_buckets(diff.removed_buckets.iter().map(|i| &**i))?; + + let progress = self.load_progress(target)?; + self.status.update( + |s| s.start_tracking_checkpoint(progress), + &mut event.instructions, + ); + } + SyncLine::CheckpointComplete(_) => { + let Some(target) = target.target_checkpoint_mut() else { + return Err(PowerSyncError::sync_protocol_error( + "Received checkpoint complete without previous checkpoint", + PowerSyncErrorCause::Unknown, + )); + }; + let result = + self.adapter + .sync_local(&self.state, target, None, &self.options.schema)?; + + match result { + SyncLocalResult::ChecksumFailure(checkpoint_result) => { + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + // await new Promise((resolve) => setTimeout(resolve, 50)); + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::WARNING, + line: format!("Could not apply checkpoint, {checkpoint_result}").into(), + }); + return Ok(true); + } + SyncLocalResult::PendingLocalChanges => { + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::INFO, + line: "Could not apply checkpoint due to local data. Will retry at completed upload or next checkpoint.".into(), + }); + + self.validated_but_not_applied = Some(target.clone()); + } + SyncLocalResult::ChangesApplied => { + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::DEBUG, + line: "Validated and applied checkpoint".into(), + }); + event.instructions.push(Instruction::FlushFileSystem {}); + self.handle_checkpoint_applied(event)?; + } + } + } + SyncLine::CheckpointPartiallyComplete(complete) => { + let priority = complete.priority; + let Some(target) = target.target_checkpoint_mut() else { + return Err(PowerSyncError::state_error( + "Received checkpoint complete without previous checkpoint", + )); + }; + let result = self.adapter.sync_local( + &self.state, + target, + Some(priority), + &self.options.schema, + )?; + + match result { + SyncLocalResult::ChecksumFailure(checkpoint_result) => { + // This means checksums failed. Start again with a new checkpoint. + // TODO: better back-off + // await new Promise((resolve) => setTimeout(resolve, 50)); + event.instructions.push(Instruction::LogLine { + severity: LogSeverity::WARNING, + line: format!( + "Could not apply partial checkpoint, {checkpoint_result}" + ) + .into(), + }); + return Ok(true); + } + SyncLocalResult::PendingLocalChanges => { + // If we have pending uploads, we can't complete new checkpoints outside + // of priority 0. We'll resolve this for a complete checkpoint later. + } + SyncLocalResult::ChangesApplied => { + let now = self.adapter.now()?; + event.instructions.push(Instruction::FlushFileSystem {}); + self.status.update( + |status| { + status.partial_checkpoint_complete(priority, now); + }, + &mut event.instructions, + ); + } + } + } + SyncLine::Data(data_line) => { + self.status + .update(|s| s.track_line(&data_line), &mut event.instructions); + insert_bucket_operations(&self.adapter, &data_line)?; + } + SyncLine::KeepAlive(token) => { + if token.is_expired() { + // Token expired already - stop the connection immediately. + event + .instructions + .push(Instruction::FetchCredentials { did_expire: true }); + return Ok(true); + } else if token.should_prefetch() { + event + .instructions + .push(Instruction::FetchCredentials { did_expire: false }); + } + } + } + + Ok(false) + } + + /// Runs a full sync iteration, returning nothing when it completes regularly or an error when + /// the sync iteration should be interrupted. async fn run(mut self) -> Result<(), PowerSyncError> { let mut target = SyncTarget::BeforeCheckpoint(self.prepare_request().await?); - // A checkpoint that has been fully received and validated, but couldn't be applied due to - // pending local data. We will retry applying this checkpoint when the client SDK informs us - // that it has finished uploading changes. - let mut validated_but_not_applied = None::; - loop { let event = Self::receive_event().await; @@ -254,7 +416,7 @@ impl StreamingSyncIteration { SyncEvent::BinaryLine { data } => bson::from_bytes(data) .map_err(|e| PowerSyncError::sync_protocol_error("invalid binary line", e))?, SyncEvent::UploadFinished => { - if let Some(checkpoint) = validated_but_not_applied.take() { + if let Some(checkpoint) = self.validated_but_not_applied.take() { let result = self.adapter.sync_local( &self.state, &checkpoint, @@ -292,142 +454,17 @@ impl StreamingSyncIteration { self.status.update_only(|s| s.mark_connected()); - match line { - SyncLine::Checkpoint(checkpoint) => { - validated_but_not_applied = None; - let to_delete = target.track_checkpoint(&checkpoint); - - self.adapter - .delete_buckets(to_delete.iter().map(|b| b.as_str()))?; - let progress = self.load_progress(target.target_checkpoint().unwrap())?; - self.status.update( - |s| s.start_tracking_checkpoint(progress), - &mut event.instructions, - ); - } - SyncLine::CheckpointDiff(diff) => { - let Some(target) = target.target_checkpoint_mut() else { - return Err(PowerSyncError::sync_protocol_error( - "Received checkpoint_diff without previous checkpoint", - PowerSyncErrorCause::Unknown, - )); - }; - - target.apply_diff(&diff); - validated_but_not_applied = None; - self.adapter - .delete_buckets(diff.removed_buckets.iter().map(|i| &**i))?; - - let progress = self.load_progress(target)?; - self.status.update( - |s| s.start_tracking_checkpoint(progress), - &mut event.instructions, - ); - } - SyncLine::CheckpointComplete(_) => { - let Some(target) = target.target_checkpoint_mut() else { - return Err(PowerSyncError::sync_protocol_error( - "Received checkpoint complete without previous checkpoint", - PowerSyncErrorCause::Unknown, - )); - }; - let result = - self.adapter - .sync_local(&self.state, target, None, &self.options.schema)?; - - match result { - SyncLocalResult::ChecksumFailure(checkpoint_result) => { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - // await new Promise((resolve) => setTimeout(resolve, 50)); - event.instructions.push(Instruction::LogLine { - severity: LogSeverity::WARNING, - line: format!("Could not apply checkpoint, {checkpoint_result}") - .into(), - }); - break; - } - SyncLocalResult::PendingLocalChanges => { - event.instructions.push(Instruction::LogLine { - severity: LogSeverity::INFO, - line: "Could not apply checkpoint due to local data. Will retry at completed upload or next checkpoint.".into(), - }); - - validated_but_not_applied = Some(target.clone()); - } - SyncLocalResult::ChangesApplied => { - event.instructions.push(Instruction::LogLine { - severity: LogSeverity::DEBUG, - line: "Validated and applied checkpoint".into(), - }); - event.instructions.push(Instruction::FlushFileSystem {}); - self.handle_checkpoint_applied(event)?; - } - } - } - SyncLine::CheckpointPartiallyComplete(complete) => { - let priority = complete.priority; - let Some(target) = target.target_checkpoint_mut() else { - return Err(PowerSyncError::state_error( - "Received checkpoint complete without previous checkpoint", - )); - }; - let result = self.adapter.sync_local( - &self.state, - target, - Some(priority), - &self.options.schema, - )?; - - match result { - SyncLocalResult::ChecksumFailure(checkpoint_result) => { - // This means checksums failed. Start again with a new checkpoint. - // TODO: better back-off - // await new Promise((resolve) => setTimeout(resolve, 50)); - event.instructions.push(Instruction::LogLine { - severity: LogSeverity::WARNING, - line: format!( - "Could not apply partial checkpoint, {checkpoint_result}" - ) - .into(), - }); - break; - } - SyncLocalResult::PendingLocalChanges => { - // If we have pending uploads, we can't complete new checkpoints outside - // of priority 0. We'll resolve this for a complete checkpoint later. - } - SyncLocalResult::ChangesApplied => { - let now = self.adapter.now()?; - event.instructions.push(Instruction::FlushFileSystem {}); - self.status.update( - |status| { - status.partial_checkpoint_complete(priority, now); - }, - &mut event.instructions, - ); - } - } - } - SyncLine::Data(data_line) => { - self.status - .update(|s| s.track_line(&data_line), &mut event.instructions); - insert_bucket_operations(&self.adapter, &data_line)?; - } - SyncLine::KeepAlive(token) => { - if token.is_expired() { - // Token expired already - stop the connection immediately. - event - .instructions - .push(Instruction::FetchCredentials { did_expire: true }); + match self.handle_line(&mut target, event, &line) { + Ok(end_iteration) => { + if end_iteration { break; - } else if token.should_prefetch() { - event - .instructions - .push(Instruction::FetchCredentials { did_expire: false }); + } else { + () } } - } + Err(e) if e.can_retry() => {} + Err(e) => return Err(e), + }; self.status.emit_changes(&mut event.instructions); } From d9b0f09c93a3ba92debbb7ecae52432b44051400 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Sun, 29 Jun 2025 18:30:53 -0600 Subject: [PATCH 2/3] Actually set error --- crates/core/src/sync/streaming_sync.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index 45b398ca..b8f2f89c 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -462,7 +462,9 @@ impl StreamingSyncIteration { () } } - Err(e) if e.can_retry() => {} + Err(e) if e.can_retry() => { + event.recoverable_error = Some(e); + } Err(e) => return Err(e), }; From d9a3acf2f1f6574a95f1ca232e766e40f8ce3d76 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 30 Jun 2025 10:36:06 -0600 Subject: [PATCH 3/3] Refactor state machine --- crates/core/src/sync/streaming_sync.rs | 216 ++++++++++++++++++------- crates/core/src/sync/sync_status.rs | 6 +- dart/pubspec.lock | 56 ++++--- dart/pubspec.yaml | 1 + dart/test/error_test.dart | 7 +- dart/test/sync_test.dart | 137 +++++++++++++++- dart/test/utils/matchers.dart | 8 + 7 files changed, 330 insertions(+), 101 deletions(-) create mode 100644 dart/test/utils/matchers.dart diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index b8f2f89c..62381f2c 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -20,9 +20,12 @@ use crate::{ error::{PowerSyncError, PowerSyncErrorCause}, kv::client_id, state::DatabaseState, - sync::{checkpoint::OwnedBucketChecksum, interface::StartSyncStream}, + sync::{ + checkpoint::OwnedBucketChecksum, interface::StartSyncStream, line::DataLine, + sync_status::Timestamp, BucketPriority, + }, }; -use sqlite_nostd::{self as sqlite, ResultCode}; +use sqlite_nostd::{self as sqlite}; use super::{ interface::{Instruction, LogSeverity, StreamingSyncRequest, SyncControlRequest, SyncEvent}, @@ -245,50 +248,50 @@ impl StreamingSyncIteration { Wait { a: PhantomData } } - /// Handles a single sync line. + /// Starts handling a single sync line without altering any in-memory state of the state + /// machine. /// - /// When it returns `Ok(true)`, the sync iteration should be stopped. For errors, the type of - /// error determines whether the iteration can continue. - fn handle_line( - &mut self, - target: &mut SyncTarget, + /// After this call succeeds, the returned value can be used to update the state. For a + /// discussion on why this split is necessary, see [SyncStateMachineTransition]. + fn prepare_handling_sync_line<'a>( + &self, + target: &SyncTarget, event: &mut ActiveEvent, - line: &SyncLine, - ) -> Result { - match line { + line: &'a SyncLine<'a>, + ) -> Result, PowerSyncError> { + Ok(match line { SyncLine::Checkpoint(checkpoint) => { - self.validated_but_not_applied = None; - let to_delete = target.track_checkpoint(&checkpoint); + let (to_delete, updated_target) = target.track_checkpoint(&checkpoint); self.adapter .delete_buckets(to_delete.iter().map(|b| b.as_str()))?; - let progress = self.load_progress(target.target_checkpoint().unwrap())?; - self.status.update( - |s| s.start_tracking_checkpoint(progress), - &mut event.instructions, - ); + let progress = self.load_progress(updated_target.target_checkpoint().unwrap())?; + SyncStateMachineTransition::StartTrackingCheckpoint { + progress, + updated_target, + } } SyncLine::CheckpointDiff(diff) => { - let Some(target) = target.target_checkpoint_mut() else { + let Some(target) = target.target_checkpoint() else { return Err(PowerSyncError::sync_protocol_error( "Received checkpoint_diff without previous checkpoint", PowerSyncErrorCause::Unknown, )); }; + let mut target = target.clone(); target.apply_diff(&diff); - self.validated_but_not_applied = None; self.adapter .delete_buckets(diff.removed_buckets.iter().map(|i| &**i))?; - let progress = self.load_progress(target)?; - self.status.update( - |s| s.start_tracking_checkpoint(progress), - &mut event.instructions, - ); + let progress = self.load_progress(&target)?; + SyncStateMachineTransition::StartTrackingCheckpoint { + progress, + updated_target: SyncTarget::Tracking(target), + } } SyncLine::CheckpointComplete(_) => { - let Some(target) = target.target_checkpoint_mut() else { + let Some(target) = target.target_checkpoint() else { return Err(PowerSyncError::sync_protocol_error( "Received checkpoint complete without previous checkpoint", PowerSyncErrorCause::Unknown, @@ -307,7 +310,7 @@ impl StreamingSyncIteration { severity: LogSeverity::WARNING, line: format!("Could not apply checkpoint, {checkpoint_result}").into(), }); - return Ok(true); + SyncStateMachineTransition::CloseIteration } SyncLocalResult::PendingLocalChanges => { event.instructions.push(Instruction::LogLine { @@ -315,7 +318,9 @@ impl StreamingSyncIteration { line: "Could not apply checkpoint due to local data. Will retry at completed upload or next checkpoint.".into(), }); - self.validated_but_not_applied = Some(target.clone()); + SyncStateMachineTransition::SyncLocalFailedDueToPendingCrud { + validated_but_not_applied: target.clone(), + } } SyncLocalResult::ChangesApplied => { event.instructions.push(Instruction::LogLine { @@ -323,13 +328,16 @@ impl StreamingSyncIteration { line: "Validated and applied checkpoint".into(), }); event.instructions.push(Instruction::FlushFileSystem {}); - self.handle_checkpoint_applied(event)?; + SyncStateMachineTransition::SyncLocalChangesApplied { + partial: None, + timestamp: self.adapter.now()?, + } } } } SyncLine::CheckpointPartiallyComplete(complete) => { let priority = complete.priority; - let Some(target) = target.target_checkpoint_mut() else { + let Some(target) = target.target_checkpoint() else { return Err(PowerSyncError::state_error( "Received checkpoint complete without previous checkpoint", )); @@ -353,28 +361,25 @@ impl StreamingSyncIteration { ) .into(), }); - return Ok(true); + SyncStateMachineTransition::CloseIteration } SyncLocalResult::PendingLocalChanges => { // If we have pending uploads, we can't complete new checkpoints outside // of priority 0. We'll resolve this for a complete checkpoint later. + SyncStateMachineTransition::Empty } SyncLocalResult::ChangesApplied => { let now = self.adapter.now()?; - event.instructions.push(Instruction::FlushFileSystem {}); - self.status.update( - |status| { - status.partial_checkpoint_complete(priority, now); - }, - &mut event.instructions, - ); + SyncStateMachineTransition::SyncLocalChangesApplied { + partial: Some(priority), + timestamp: now, + } } } } SyncLine::Data(data_line) => { - self.status - .update(|s| s.track_line(&data_line), &mut event.instructions); insert_bucket_operations(&self.adapter, &data_line)?; + SyncStateMachineTransition::DataLineSaved { line: data_line } } SyncLine::KeepAlive(token) => { if token.is_expired() { @@ -382,16 +387,79 @@ impl StreamingSyncIteration { event .instructions .push(Instruction::FetchCredentials { did_expire: true }); - return Ok(true); + + SyncStateMachineTransition::CloseIteration } else if token.should_prefetch() { event .instructions .push(Instruction::FetchCredentials { did_expire: false }); + SyncStateMachineTransition::Empty + } else { + SyncStateMachineTransition::Empty } } - } + }) + } - Ok(false) + /// Applies a sync state transition, returning whether the iteration should be stopped. + fn apply_transition( + &mut self, + target: &mut SyncTarget, + event: &mut ActiveEvent, + transition: SyncStateMachineTransition, + ) -> bool { + match transition { + SyncStateMachineTransition::StartTrackingCheckpoint { + progress, + updated_target, + } => { + self.status.update( + |s| s.start_tracking_checkpoint(progress), + &mut event.instructions, + ); + self.validated_but_not_applied = None; + *target = updated_target; + } + SyncStateMachineTransition::DataLineSaved { line } => { + self.status + .update(|s| s.track_line(&line), &mut event.instructions); + } + SyncStateMachineTransition::CloseIteration => return true, + SyncStateMachineTransition::SyncLocalFailedDueToPendingCrud { + validated_but_not_applied, + } => { + self.validated_but_not_applied = Some(validated_but_not_applied); + } + SyncStateMachineTransition::SyncLocalChangesApplied { partial, timestamp } => { + if let Some(priority) = partial { + self.status.update( + |status| { + status.partial_checkpoint_complete(priority, timestamp); + }, + &mut event.instructions, + ); + } else { + self.handle_checkpoint_applied(event, timestamp); + } + } + SyncStateMachineTransition::Empty => {} + }; + + false + } + + /// Handles a single sync line. + /// + /// When it returns `Ok(true)`, the sync iteration should be stopped. For errors, the type of + /// error determines whether the iteration can continue. + fn handle_line( + &mut self, + target: &mut SyncTarget, + event: &mut ActiveEvent, + line: &SyncLine, + ) -> Result { + let transition = self.prepare_handling_sync_line(target, event, line)?; + Ok(self.apply_transition(target, event, transition)) } /// Runs a full sync iteration, returning nothing when it completes regularly or an error when @@ -432,7 +500,7 @@ impl StreamingSyncIteration { .into(), }); - self.handle_checkpoint_applied(event)?; + self.handle_checkpoint_applied(event, self.adapter.now()?); } _ => { event.instructions.push(Instruction::LogLine { @@ -522,16 +590,13 @@ impl StreamingSyncIteration { Ok(local_bucket_names) } - fn handle_checkpoint_applied(&mut self, event: &mut ActiveEvent) -> Result<(), ResultCode> { + fn handle_checkpoint_applied(&mut self, event: &mut ActiveEvent, timestamp: Timestamp) { event.instructions.push(Instruction::DidCompleteSync {}); - let now = self.adapter.now()?; self.status.update( - |status| status.applied_checkpoint(now), + |status| status.applied_checkpoint(timestamp), &mut event.instructions, ); - - Ok(()) } } @@ -553,18 +618,16 @@ impl SyncTarget { } } - fn target_checkpoint_mut(&mut self) -> Option<&mut OwnedCheckpoint> { - match self { - Self::Tracking(cp) => Some(cp), - _ => None, - } - } - /// Starts tracking the received `Checkpoint`. /// - /// This updates the internal state and returns a set of buckets to delete because they've been - /// tracked locally but not in the new checkpoint. - fn track_checkpoint<'a>(&mut self, checkpoint: &Checkpoint<'a>) -> BTreeSet { + /// This returns a set of buckets to delete because they've been tracked locally but not in the + /// checkpoint, as well as the updated state of the [SyncTarget] to apply after deleting those + /// buckets. + /// + /// The new state is not applied automatically - the old state should be kept in-memory until + /// the buckets have actually been deleted so that the operation can be retried if deleting + /// buckets fails. + fn track_checkpoint<'a>(&self, checkpoint: &Checkpoint<'a>) -> (BTreeSet, Self) { let mut to_delete: BTreeSet = match &self { SyncTarget::Tracking(checkpoint) => checkpoint.buckets.keys().cloned().collect(), SyncTarget::BeforeCheckpoint(buckets) => buckets.iter().cloned().collect(), @@ -576,8 +639,10 @@ impl SyncTarget { to_delete.remove(&*bucket.bucket); } - *self = SyncTarget::Tracking(OwnedCheckpoint::from_checkpoint(checkpoint, buckets)); - to_delete + ( + to_delete, + SyncTarget::Tracking(OwnedCheckpoint::from_checkpoint(checkpoint, buckets)), + ) } } @@ -614,3 +679,32 @@ impl OwnedCheckpoint { self.write_checkpoint = diff.write_checkpoint; } } + +/// A transition representing pending changes between [StreamingSyncIteration::prepare_handling_sync_line] +/// and [StreamingSyncIteration::apply_transition]. +/// +/// This split allows the main logic handling sync lines to take a non-mutable reference to internal +/// client state, guaranteeing that it does not mutate state until changes have been written to the +/// database. Only after those writes have succeeded are the internal state changes applied. +/// +/// This split ensures that `powersync_control` calls are idempotent when running into temporary +/// SQLite errors, a property we need for compatibility with e.g. WA-sqlite, where the VFS can +/// return `BUSY` errors and the SQLite library automatically retries running statements. +enum SyncStateMachineTransition<'a> { + StartTrackingCheckpoint { + progress: SyncDownloadProgress, + updated_target: SyncTarget, + }, + DataLineSaved { + line: &'a DataLine<'a>, + }, + SyncLocalFailedDueToPendingCrud { + validated_but_not_applied: OwnedCheckpoint, + }, + SyncLocalChangesApplied { + partial: Option, + timestamp: Timestamp, + }, + CloseIteration, + Empty, +} diff --git a/crates/core/src/sync/sync_status.rs b/crates/core/src/sync/sync_status.rs index 95340950..89a86a3b 100644 --- a/crates/core/src/sync/sync_status.rs +++ b/crates/core/src/sync/sync_status.rs @@ -205,10 +205,12 @@ impl SyncDownloadProgress { ); } + // Ignore errors here - SQLite seems to report errors from an earlier statement iteration + // sometimes. + let _ = adapter.progress_stmt.reset(); + // Go through local bucket states to detect pending progress from previous sync iterations // that may have been interrupted. - adapter.progress_stmt.reset()?; - while let Some(row) = adapter.step_progress()? { let Some(progress) = buckets.get_mut(row.bucket) else { continue; diff --git a/dart/pubspec.lock b/dart/pubspec.lock index 5000e7e8..d79b4661 100644 --- a/dart/pubspec.lock +++ b/dart/pubspec.lock @@ -5,18 +5,18 @@ packages: dependency: transitive description: name: _fe_analyzer_shared - sha256: e55636ed79578b9abca5fecf9437947798f5ef7456308b5cb85720b793eac92f + sha256: da0d9209ca76bde579f2da330aeb9df62b6319c834fa7baae052021b0462401f url: "https://pub.dev" source: hosted - version: "82.0.0" + version: "85.0.0" analyzer: dependency: transitive description: name: analyzer - sha256: "13c1e6c6fd460522ea840abec3f677cc226f5fec7872c04ad7b425517ccf54f7" + sha256: f6154230675c44a191f2e20d16eeceb4aa18b30ca732db4efaf94c6a7d43cfa6 url: "https://pub.dev" source: hosted - version: "7.4.4" + version: "7.5.2" args: dependency: transitive description: @@ -45,10 +45,10 @@ packages: dependency: "direct main" description: name: bson - sha256: "9b761248a3494fea594aecf5d6f369b5f04d7b082aa2b8c06579ade77f1a7e47" + sha256: f8c80be7a62a88f4add7c48cc83567c36a77532de107224df8328ef71f125045 url: "https://pub.dev" source: hosted - version: "5.0.6" + version: "5.0.7" cli_config: dependency: transitive description: @@ -85,10 +85,10 @@ packages: dependency: transitive description: name: coverage - sha256: "9086475ef2da7102a0c0a4e37e1e30707e7fb7b6d28c209f559a9c5f8ce42016" + sha256: aa07dbe5f2294c827b7edb9a87bba44a9c15a3cc81bc8da2ca19b37322d30080 url: "https://pub.dev" source: hosted - version: "1.12.0" + version: "1.14.1" crypto: dependency: transitive description: @@ -101,10 +101,10 @@ packages: dependency: transitive description: name: decimal - sha256: "28239b8b929c1bd8618702e6dbc96e2618cf99770bbe9cb040d6cf56a11e4ec3" + sha256: fc706a5618b81e5b367b01dd62621def37abc096f2b46a9bd9068b64c1fa36d0 url: "https://pub.dev" source: hosted - version: "3.2.1" + version: "3.2.4" fake_async: dependency: "direct dev" description: @@ -213,10 +213,10 @@ packages: dependency: "direct dev" description: name: meta - sha256: e3641ec5d63ebf0d9b41bd43201a66e3fc79a65db5f61fc181f04cd27aab950c + sha256: "23f08335362185a5ea2ad3a4e597f1375e78bce8a040df5c600c8d3552ef2394" url: "https://pub.dev" source: hosted - version: "1.16.0" + version: "1.17.0" mime: dependency: transitive description: @@ -405,26 +405,34 @@ packages: dependency: "direct dev" description: name: test - sha256: "301b213cd241ca982e9ba50266bd3f5bd1ea33f1455554c5abb85d1be0e2d87e" + sha256: "65e29d831719be0591f7b3b1a32a3cda258ec98c58c7b25f7b84241bc31215bb" url: "https://pub.dev" source: hosted - version: "1.25.15" + version: "1.26.2" test_api: dependency: transitive description: name: test_api - sha256: fb31f383e2ee25fbbfe06b40fe21e1e458d14080e3c67e7ba0acfde4df4e0bbd + sha256: "522f00f556e73044315fa4585ec3270f1808a4b186c936e612cab0b565ff1e00" url: "https://pub.dev" source: hosted - version: "0.7.4" + version: "0.7.6" test_core: dependency: transitive description: name: test_core - sha256: "84d17c3486c8dfdbe5e12a50c8ae176d15e2a771b96909a9442b40173649ccaa" + sha256: "80bf5a02b60af04b09e14f6fe68b921aad119493e26e490deaca5993fef1b05a" url: "https://pub.dev" source: hosted - version: "0.6.8" + version: "0.6.11" + test_descriptor: + dependency: "direct dev" + description: + name: test_descriptor + sha256: "9ce468c97ae396e8440d26bb43763f84e2a2a5331813ee5a397cb4da481aaf9a" + url: "https://pub.dev" + source: hosted + version: "2.0.2" typed_data: dependency: transitive description: @@ -445,18 +453,18 @@ packages: dependency: transitive description: name: vm_service - sha256: ddfa8d30d89985b96407efce8acbdd124701f96741f2d981ca860662f1c0dc02 + sha256: "45caa6c5917fa127b5dbcfbd1fa60b14e583afdc08bfc96dda38886ca252eb60" url: "https://pub.dev" source: hosted - version: "15.0.0" + version: "15.0.2" watcher: dependency: transitive description: name: watcher - sha256: "69da27e49efa56a15f8afe8f4438c4ec02eff0a117df1b22ea4aad194fe1c104" + sha256: "0b7fd4a0bbc4b92641dbf20adfd7e3fd1398fe17102d94b674234563e110088a" url: "https://pub.dev" source: hosted - version: "1.1.1" + version: "1.1.2" web: dependency: transitive description: @@ -469,10 +477,10 @@ packages: dependency: transitive description: name: web_socket - sha256: bfe6f435f6ec49cb6c01da1e275ae4228719e59a6b067048c51e72d9d63bcc4b + sha256: "34d64019aa8e36bf9842ac014bb5d2f5586ca73df5e4d9bf5c936975cae6982c" url: "https://pub.dev" source: hosted - version: "1.0.0" + version: "1.0.1" web_socket_channel: dependency: transitive description: diff --git a/dart/pubspec.yaml b/dart/pubspec.yaml index 40b46486..9177161d 100644 --- a/dart/pubspec.yaml +++ b/dart/pubspec.yaml @@ -10,6 +10,7 @@ dependencies: dev_dependencies: test: ^1.25.0 + test_descriptor: ^2.0.2 file: ^7.0.1 sqlite3_test: ^0.1.1 fake_async: ^1.3.3 diff --git a/dart/test/error_test.dart b/dart/test/error_test.dart index ccada8e0..b7349a97 100644 --- a/dart/test/error_test.dart +++ b/dart/test/error_test.dart @@ -3,6 +3,7 @@ import 'dart:convert'; import 'package:sqlite3/common.dart'; import 'package:test/test.dart'; +import 'utils/matchers.dart'; import 'utils/native_test_utils.dart'; void main() { @@ -67,9 +68,3 @@ void main() { }); }); } - -Matcher isSqliteException(int code, dynamic message) { - return isA() - .having((e) => e.extendedResultCode, 'extendedResultCode', code) - .having((e) => e.message, 'message', message); -} diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index 371fed93..a39d9538 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -10,8 +10,10 @@ import 'package:sqlite3/common.dart'; import 'package:sqlite3/sqlite3.dart'; import 'package:sqlite3_test/sqlite3_test.dart'; import 'package:test/test.dart'; +import 'package:test_descriptor/test_descriptor.dart' as d; import 'package:path/path.dart'; +import 'utils/matchers.dart'; import 'utils/native_test_utils.dart'; @isTest @@ -50,17 +52,24 @@ void _syncTests({ List invokeControlRaw(String operation, Object? data) { db.execute('begin'); - final [row] = - db.select('SELECT powersync_control(?, ?)', [operation, data]); + ResultSet result; - // Make sure that powersync_control doesn't leave any busy statements - // behind. - // TODO: Re-enable after we can guarantee sqlite_stmt being available - // const statement = 'SELECT * FROM sqlite_stmt WHERE busy AND sql != ?;'; - // final busy = db.select(statement, [statement]); - // expect(busy, isEmpty); + try { + result = db.select('SELECT powersync_control(?, ?)', [operation, data]); + + // Make sure that powersync_control doesn't leave any busy statements + // behind. + // TODO: Re-enable after we can guarantee sqlite_stmt being available + // const statement = 'SELECT * FROM sqlite_stmt WHERE busy AND sql != ?;'; + // final busy = db.select(statement, [statement]); + // expect(busy, isEmpty); + } catch (e) { + db.execute('rollback'); + rethrow; + } db.execute('commit'); + final [row] = result; return jsonDecode(row.columnAt(0)); } @@ -683,6 +692,118 @@ void _syncTests({ // Should delete bucket with checksum mismatch expect(db.select('SELECT * FROM ps_buckets'), isEmpty); }); + + group('recoverable', () { + late CommonDatabase secondary; + final checkpoint = { + 'checkpoint': { + 'last_op_id': '1', + 'write_checkpoint': null, + 'buckets': [ + { + 'bucket': 'a', + 'checksum': 0, + 'priority': 3, + 'count': 1, + } + ], + }, + }; + + setUp(() { + final fileName = d.path('test.db'); + + db = openTestDatabase(fileName: fileName) + ..select('select powersync_init();') + ..select('select powersync_replace_schema(?)', [json.encode(_schema)]) + ..execute('update ps_kv set value = ?2 where key = ?1', + ['client_id', 'test-test-test-test']); + + secondary = openTestDatabase(fileName: fileName); + }); + + test('starting checkpoints', () { + db.execute('INSERT INTO ps_buckets (name) VALUES (?)', ['unrelated']); + invokeControl('start', null); + + // Lock the db so that the checkpoint line can't delete the unrelated + // bucket. + secondary.execute('begin exclusive'); + expect( + () => syncLine(checkpoint), + throwsA( + isSqliteException( + 5, 'powersync_control: internal SQLite call returned BUSY'), + ), + ); + secondary.execute('commit'); + expect(db.select('SELECT name FROM ps_buckets'), [ + {'name': 'unrelated'} + ]); + + syncLine(checkpoint); + expect(db.select('SELECT name FROM ps_buckets'), isEmpty); + }); + + test('saving oplog data', () { + invokeControl('start', null); + syncLine(checkpoint); + + // Lock the database before the data line + secondary.execute('begin exclusive'); + + // This should make powersync_control unable to save oplog data. + expect( + () => pushSyncData('a', '1', '1', 'PUT', {'col': 'hi'}), + throwsA(isSqliteException( + 5, 'powersync_control: internal SQLite call returned BUSY')), + ); + + // But we should be able to retry + secondary.execute('commit'); + + expect(pushSyncData('a', '1', '1', 'PUT', {'col': 'hi'}), [ + containsPair( + 'UpdateSyncStatus', + containsPair( + 'status', + containsPair( + 'downloading', + { + 'buckets': { + 'a': { + 'priority': 3, + 'at_last': 0, + 'since_last': 1, + 'target_count': 1 + }, + } + }, + ), + ), + ) + ]); + }); + + test('applying local changes', () { + invokeControl('start', null); + syncLine(checkpoint); + pushSyncData('a', '1', '1', 'PUT', {'col': 'hi'}); + + secondary.execute('begin exclusive'); + expect( + () => pushCheckpointComplete(), + throwsA( + isSqliteException( + 5, 'powersync_control: internal SQLite call returned BUSY'), + ), + ); + secondary.execute('commit'); + + pushCheckpointComplete(); + expect(db.select('SELECT * FROM items'), hasLength(1)); + }); + }); }); syncTest('sets powersync_in_sync_operation', (_) { diff --git a/dart/test/utils/matchers.dart b/dart/test/utils/matchers.dart new file mode 100644 index 00000000..4d198ff5 --- /dev/null +++ b/dart/test/utils/matchers.dart @@ -0,0 +1,8 @@ +import 'package:sqlite3/common.dart'; +import 'package:test/test.dart'; + +Matcher isSqliteException(int code, dynamic message) { + return isA() + .having((e) => e.extendedResultCode, 'extendedResultCode', code) + .having((e) => e.message, 'message', message); +}