@@ -20,9 +20,12 @@ use crate::{
2020    error:: { PowerSyncError ,  PowerSyncErrorCause } , 
2121    kv:: client_id, 
2222    state:: DatabaseState , 
23-     sync:: { checkpoint:: OwnedBucketChecksum ,  interface:: StartSyncStream } , 
23+     sync:: { 
24+         checkpoint:: OwnedBucketChecksum ,  interface:: StartSyncStream ,  line:: DataLine , 
25+         sync_status:: Timestamp ,  BucketPriority , 
26+     } , 
2427} ; 
25- use  sqlite_nostd:: { self  as  sqlite,   ResultCode } ; 
28+ use  sqlite_nostd:: { self  as  sqlite} ; 
2629
2730use  super :: { 
2831    interface:: { Instruction ,  LogSeverity ,  StreamingSyncRequest ,  SyncControlRequest ,  SyncEvent } , 
@@ -245,50 +248,50 @@ impl StreamingSyncIteration {
245248        Wait  {  a :  PhantomData  } 
246249    } 
247250
248-     /// Handles a single sync line. 
251+     /// Starts handling a single sync line without altering any in-memory state of the state 
252+ /// machine. 
249253/// 
250- /// When it returns `Ok(true)` , the sync iteration should  be stopped. For errors,  the type of  
251- /// error determines whether the iteration can continue . 
252- fn  handle_line ( 
253-         & mut   self , 
254-         target :  & mut   SyncTarget , 
254+ /// After this call succeeds , the returned value can  be used to update  the state. For a  
255+ /// discussion on why this split is necessary, see [SyncStateMachineTransition] . 
256+ fn  prepare_handling_sync_line < ' a > ( 
257+         & self , 
258+         target :  & SyncTarget , 
255259        event :  & mut  ActiveEvent , 
256-         line :  & SyncLine , 
257-     )  -> Result < bool ,  PowerSyncError >  { 
258-         match  line { 
260+         line :  & ' a   SyncLine < ' a > , 
261+     )  -> Result < SyncStateMachineTransition < ' a > ,  PowerSyncError >  { 
262+         Ok ( match  line { 
259263            SyncLine :: Checkpoint ( checkpoint)  => { 
260-                 self . validated_but_not_applied  = None ; 
261-                 let  to_delete = target. track_checkpoint ( & checkpoint) ; 
264+                 let  ( to_delete,  updated_target)  = target. track_checkpoint ( & checkpoint) ; 
262265
263266                self . adapter 
264267                    . delete_buckets ( to_delete. iter ( ) . map ( |b| b. as_str ( ) ) ) ?; 
265-                 let  progress = self . load_progress ( target . target_checkpoint ( ) . unwrap ( ) ) ?; 
266-                 self . status . update ( 
267-                     |s| s . start_tracking_checkpoint ( progress) , 
268-                     & mut  event . instructions , 
269-                 ) ; 
268+                 let  progress = self . load_progress ( updated_target . target_checkpoint ( ) . unwrap ( ) ) ?; 
269+                 SyncStateMachineTransition :: StartTrackingCheckpoint   { 
270+                     progress, 
271+                     updated_target , 
272+                 } 
270273            } 
271274            SyncLine :: CheckpointDiff ( diff)  => { 
272-                 let  Some ( target)  = target. target_checkpoint_mut ( )  else  { 
275+                 let  Some ( target)  = target. target_checkpoint ( )  else  { 
273276                    return  Err ( PowerSyncError :: sync_protocol_error ( 
274277                        "Received checkpoint_diff without previous checkpoint" , 
275278                        PowerSyncErrorCause :: Unknown , 
276279                    ) ) ; 
277280                } ; 
278281
282+                 let  mut  target = target. clone ( ) ; 
279283                target. apply_diff ( & diff) ; 
280-                 self . validated_but_not_applied  = None ; 
281284                self . adapter 
282285                    . delete_buckets ( diff. removed_buckets . iter ( ) . map ( |i| & * * i) ) ?; 
283286
284-                 let  progress = self . load_progress ( target) ?; 
285-                 self . status . update ( 
286-                     |s| s . start_tracking_checkpoint ( progress) , 
287-                     & mut  event . instructions , 
288-                 ) ; 
287+                 let  progress = self . load_progress ( & target) ?; 
288+                 SyncStateMachineTransition :: StartTrackingCheckpoint   { 
289+                     progress, 
290+                     updated_target :   SyncTarget :: Tracking ( target ) , 
291+                 } 
289292            } 
290293            SyncLine :: CheckpointComplete ( _)  => { 
291-                 let  Some ( target)  = target. target_checkpoint_mut ( )  else  { 
294+                 let  Some ( target)  = target. target_checkpoint ( )  else  { 
292295                    return  Err ( PowerSyncError :: sync_protocol_error ( 
293296                        "Received checkpoint complete without previous checkpoint" , 
294297                        PowerSyncErrorCause :: Unknown , 
@@ -307,29 +310,34 @@ impl StreamingSyncIteration {
307310                            severity :  LogSeverity :: WARNING , 
308311                            line :  format ! ( "Could not apply checkpoint, {checkpoint_result}" ) . into ( ) , 
309312                        } ) ; 
310-                         return   Ok ( true ) ; 
313+                         SyncStateMachineTransition :: CloseIteration 
311314                    } 
312315                    SyncLocalResult :: PendingLocalChanges  => { 
313316                        event. instructions . push ( Instruction :: LogLine  { 
314317                                    severity :  LogSeverity :: INFO , 
315318                                    line :  "Could not apply checkpoint due to local data. Will retry at completed upload or next checkpoint." . into ( ) , 
316319                                } ) ; 
317320
318-                         self . validated_but_not_applied  = Some ( target. clone ( ) ) ; 
321+                         SyncStateMachineTransition :: SyncLocalFailedDueToPendingCrud  { 
322+                             validated_but_not_applied :  target. clone ( ) , 
323+                         } 
319324                    } 
320325                    SyncLocalResult :: ChangesApplied  => { 
321326                        event. instructions . push ( Instruction :: LogLine  { 
322327                            severity :  LogSeverity :: DEBUG , 
323328                            line :  "Validated and applied checkpoint" . into ( ) , 
324329                        } ) ; 
325330                        event. instructions . push ( Instruction :: FlushFileSystem  { } ) ; 
326-                         self . handle_checkpoint_applied ( event) ?; 
331+                         SyncStateMachineTransition :: SyncLocalChangesApplied  { 
332+                             partial :  None , 
333+                             timestamp :  self . adapter . now ( ) ?, 
334+                         } 
327335                    } 
328336                } 
329337            } 
330338            SyncLine :: CheckpointPartiallyComplete ( complete)  => { 
331339                let  priority = complete. priority ; 
332-                 let  Some ( target)  = target. target_checkpoint_mut ( )  else  { 
340+                 let  Some ( target)  = target. target_checkpoint ( )  else  { 
333341                    return  Err ( PowerSyncError :: state_error ( 
334342                        "Received checkpoint complete without previous checkpoint" , 
335343                    ) ) ; 
@@ -353,45 +361,105 @@ impl StreamingSyncIteration {
353361                            ) 
354362                            . into ( ) , 
355363                        } ) ; 
356-                         return   Ok ( true ) ; 
364+                         SyncStateMachineTransition :: CloseIteration 
357365                    } 
358366                    SyncLocalResult :: PendingLocalChanges  => { 
359367                        // If we have pending uploads, we can't complete new checkpoints outside 
360368                        // of priority 0. We'll resolve this for a complete checkpoint later. 
369+                         SyncStateMachineTransition :: Empty 
361370                    } 
362371                    SyncLocalResult :: ChangesApplied  => { 
363372                        let  now = self . adapter . now ( ) ?; 
364-                         event. instructions . push ( Instruction :: FlushFileSystem  { } ) ; 
365-                         self . status . update ( 
366-                             |status| { 
367-                                 status. partial_checkpoint_complete ( priority,  now) ; 
368-                             } , 
369-                             & mut  event. instructions , 
370-                         ) ; 
373+                         SyncStateMachineTransition :: SyncLocalChangesApplied  { 
374+                             partial :  Some ( priority) , 
375+                             timestamp :  now, 
376+                         } 
371377                    } 
372378                } 
373379            } 
374380            SyncLine :: Data ( data_line)  => { 
375-                 self . status 
376-                     . update ( |s| s. track_line ( & data_line) ,  & mut  event. instructions ) ; 
377381                insert_bucket_operations ( & self . adapter ,  & data_line) ?; 
382+                 SyncStateMachineTransition :: DataLineSaved  {  line :  data_line } 
378383            } 
379384            SyncLine :: KeepAlive ( token)  => { 
380385                if  token. is_expired ( )  { 
381386                    // Token expired already - stop the connection immediately. 
382387                    event
383388                        . instructions 
384389                        . push ( Instruction :: FetchCredentials  {  did_expire :  true  } ) ; 
385-                     return  Ok ( true ) ; 
390+ 
391+                     SyncStateMachineTransition :: CloseIteration 
386392                }  else  if  token. should_prefetch ( )  { 
387393                    event
388394                        . instructions 
389395                        . push ( Instruction :: FetchCredentials  {  did_expire :  false  } ) ; 
396+                     SyncStateMachineTransition :: Empty 
397+                 }  else  { 
398+                     SyncStateMachineTransition :: Empty 
390399                } 
391400            } 
392-         } 
401+         } ) 
402+     } 
393403
394-         Ok ( false ) 
404+     /// Applies a sync state transition, returning whether the iteration should be stopped. 
405+ fn  apply_transition ( 
406+         & mut  self , 
407+         target :  & mut  SyncTarget , 
408+         event :  & mut  ActiveEvent , 
409+         transition :  SyncStateMachineTransition , 
410+     )  -> bool  { 
411+         match  transition { 
412+             SyncStateMachineTransition :: StartTrackingCheckpoint  { 
413+                 progress, 
414+                 updated_target, 
415+             }  => { 
416+                 self . status . update ( 
417+                     |s| s. start_tracking_checkpoint ( progress) , 
418+                     & mut  event. instructions , 
419+                 ) ; 
420+                 self . validated_but_not_applied  = None ; 
421+                 * target = updated_target; 
422+             } 
423+             SyncStateMachineTransition :: DataLineSaved  {  line }  => { 
424+                 self . status 
425+                     . update ( |s| s. track_line ( & line) ,  & mut  event. instructions ) ; 
426+             } 
427+             SyncStateMachineTransition :: CloseIteration  => return  true , 
428+             SyncStateMachineTransition :: SyncLocalFailedDueToPendingCrud  { 
429+                 validated_but_not_applied, 
430+             }  => { 
431+                 self . validated_but_not_applied  = Some ( validated_but_not_applied) ; 
432+             } 
433+             SyncStateMachineTransition :: SyncLocalChangesApplied  {  partial,  timestamp }  => { 
434+                 if  let  Some ( priority)  = partial { 
435+                     self . status . update ( 
436+                         |status| { 
437+                             status. partial_checkpoint_complete ( priority,  timestamp) ; 
438+                         } , 
439+                         & mut  event. instructions , 
440+                     ) ; 
441+                 }  else  { 
442+                     self . handle_checkpoint_applied ( event,  timestamp) ; 
443+                 } 
444+             } 
445+             SyncStateMachineTransition :: Empty  => { } 
446+         } ; 
447+ 
448+         false 
449+     } 
450+ 
451+     /// Handles a single sync line. 
452+ /// 
453+ /// When it returns `Ok(true)`, the sync iteration should be stopped. For errors, the type of 
454+ /// error determines whether the iteration can continue. 
455+ fn  handle_line ( 
456+         & mut  self , 
457+         target :  & mut  SyncTarget , 
458+         event :  & mut  ActiveEvent , 
459+         line :  & SyncLine , 
460+     )  -> Result < bool ,  PowerSyncError >  { 
461+         let  transition = self . prepare_handling_sync_line ( target,  event,  line) ?; 
462+         Ok ( self . apply_transition ( target,  event,  transition) ) 
395463    } 
396464
397465    /// Runs a full sync iteration, returning nothing when it completes regularly or an error when 
@@ -432,7 +500,7 @@ impl StreamingSyncIteration {
432500                                        . into ( ) , 
433501                                } ) ; 
434502
435-                                 self . handle_checkpoint_applied ( event) ? ; 
503+                                 self . handle_checkpoint_applied ( event,   self . adapter . now ( ) ? ) ; 
436504                            } 
437505                            _ => { 
438506                                event. instructions . push ( Instruction :: LogLine  { 
@@ -522,16 +590,13 @@ impl StreamingSyncIteration {
522590        Ok ( local_bucket_names) 
523591    } 
524592
525-     fn  handle_checkpoint_applied ( & mut  self ,  event :  & mut  ActiveEvent )  ->  Result < ( ) ,   ResultCode >  { 
593+     fn  handle_checkpoint_applied ( & mut  self ,  event :  & mut  ActiveEvent ,   timestamp :   Timestamp )  { 
526594        event. instructions . push ( Instruction :: DidCompleteSync  { } ) ; 
527595
528-         let  now = self . adapter . now ( ) ?; 
529596        self . status . update ( 
530-             |status| status. applied_checkpoint ( now ) , 
597+             |status| status. applied_checkpoint ( timestamp ) , 
531598            & mut  event. instructions , 
532599        ) ; 
533- 
534-         Ok ( ( ) ) 
535600    } 
536601} 
537602
@@ -553,18 +618,16 @@ impl SyncTarget {
553618        } 
554619    } 
555620
556-     fn  target_checkpoint_mut ( & mut  self )  -> Option < & mut  OwnedCheckpoint >  { 
557-         match  self  { 
558-             Self :: Tracking ( cp)  => Some ( cp) , 
559-             _ => None , 
560-         } 
561-     } 
562- 
563621    /// Starts tracking the received `Checkpoint`. 
564622/// 
565- /// This updates the internal state and returns a set of buckets to delete because they've been 
566- /// tracked locally but not in the new checkpoint. 
567- fn  track_checkpoint < ' a > ( & mut  self ,  checkpoint :  & Checkpoint < ' a > )  -> BTreeSet < String >  { 
623+ /// This returns a set of buckets to delete because they've been tracked locally but not in the 
624+ /// checkpoint, as well as the updated state of the [SyncTarget] to apply after deleting those 
625+ /// buckets. 
626+ /// 
627+ /// The new state is not applied automatically - the old state should be kept in-memory until 
628+ /// the buckets have actually been deleted so that the operation can be retried if deleting 
629+ /// buckets fails. 
630+ fn  track_checkpoint < ' a > ( & self ,  checkpoint :  & Checkpoint < ' a > )  -> ( BTreeSet < String > ,  Self )  { 
568631        let  mut  to_delete:  BTreeSet < String >  = match  & self  { 
569632            SyncTarget :: Tracking ( checkpoint)  => checkpoint. buckets . keys ( ) . cloned ( ) . collect ( ) , 
570633            SyncTarget :: BeforeCheckpoint ( buckets)  => buckets. iter ( ) . cloned ( ) . collect ( ) , 
@@ -576,8 +639,10 @@ impl SyncTarget {
576639            to_delete. remove ( & * bucket. bucket ) ; 
577640        } 
578641
579-         * self  = SyncTarget :: Tracking ( OwnedCheckpoint :: from_checkpoint ( checkpoint,  buckets) ) ; 
580-         to_delete
642+         ( 
643+             to_delete, 
644+             SyncTarget :: Tracking ( OwnedCheckpoint :: from_checkpoint ( checkpoint,  buckets) ) , 
645+         ) 
581646    } 
582647} 
583648
@@ -614,3 +679,32 @@ impl OwnedCheckpoint {
614679        self . write_checkpoint  = diff. write_checkpoint ; 
615680    } 
616681} 
682+ 
683+ /// A transition representing pending changes between [StreamingSyncIteration::prepare_handling_sync_line] 
684+ /// and [StreamingSyncIteration::apply_transition]. 
685+ /// 
686+ /// This split allows the main logic handling sync lines to take a non-mutable reference to internal 
687+ /// client state, guaranteeing that it does not mutate state until changes have been written to the 
688+ /// database. Only after those writes have succeeded are the internal state changes applied. 
689+ /// 
690+ /// This split ensures that `powersync_control` calls are idempotent when running into temporary 
691+ /// SQLite errors, a property we need for compatibility with e.g. WA-sqlite, where the VFS can 
692+ /// return `BUSY` errors and the SQLite library automatically retries running statements. 
693+ enum  SyncStateMachineTransition < ' a >  { 
694+     StartTrackingCheckpoint  { 
695+         progress :  SyncDownloadProgress , 
696+         updated_target :  SyncTarget , 
697+     } , 
698+     DataLineSaved  { 
699+         line :  & ' a  DataLine < ' a > , 
700+     } , 
701+     SyncLocalFailedDueToPendingCrud  { 
702+         validated_but_not_applied :  OwnedCheckpoint , 
703+     } , 
704+     SyncLocalChangesApplied  { 
705+         partial :  Option < BucketPriority > , 
706+         timestamp :  Timestamp , 
707+     } , 
708+     CloseIteration , 
709+     Empty , 
710+ } 
0 commit comments