Skip to content

Commit f970090

Browse files
committed
Sync client: Handle connection events
1 parent e8d102d commit f970090

File tree

4 files changed

+48
-4
lines changed

4 files changed

+48
-4
lines changed

crates/core/src/sync/interface.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,24 @@ pub enum SyncEvent<'a> {
8787
/// If pending CRUD entries have previously prevented a sync from completing, this even can be
8888
/// used to try again.
8989
UploadFinished,
90+
ConnectionEstablished,
91+
StreamEnded,
9092
/// Forward a text line (JSON) received from the sync service.
91-
TextLine { data: &'a str },
93+
TextLine {
94+
data: &'a str,
95+
},
9296
/// Forward a binary line (BSON) received from the sync service.
93-
BinaryLine { data: &'a [u8] },
97+
BinaryLine {
98+
data: &'a [u8],
99+
},
94100
/// The active stream subscriptions (as in, `SyncStreamSubscription` instances active right now)
95101
/// have changed.
96102
///
97103
/// The client will compare the new active subscriptions with the current one and will issue a
98104
/// request to restart the sync iteration if necessary.
99-
DidUpdateSubscriptions { active_streams: Rc<Vec<StreamKey>> },
105+
DidUpdateSubscriptions {
106+
active_streams: Rc<Vec<StreamKey>>,
107+
},
100108
}
101109

102110
/// An instruction sent by the core extension to the SDK.
@@ -244,6 +252,13 @@ pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(
244252
.map_err(PowerSyncError::as_argument_error)?,
245253
})
246254
}
255+
"connection" => SyncControlRequest::SyncEvent(match payload.text() {
256+
"established" => SyncEvent::ConnectionEstablished,
257+
"end" => SyncEvent::StreamEnded,
258+
_ => {
259+
return Err(PowerSyncError::argument_error("unknown connection event"));
260+
}
261+
}),
247262
"subscriptions" => {
248263
let request = serde_json::from_str(payload.text())
249264
.map_err(PowerSyncError::as_argument_error)?;

crates/core/src/sync/streaming_sync.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,14 @@ impl StreamingSyncIteration {
556556
continue;
557557
}
558558
}
559+
SyncEvent::ConnectionEstablished => {
560+
self.status
561+
.update(|s| s.mark_connected(), &mut event.instructions);
562+
continue;
563+
}
564+
SyncEvent::StreamEnded => {
565+
break false;
566+
}
559567
SyncEvent::DidRefreshToken => {
560568
// Break so that the client SDK starts another iteration.
561569
break true;

dart/test/sync_test.dart

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,19 @@ void _syncTests<T>({
173173
});
174174
});
175175

176+
test('handles connection events', () {
177+
invokeControl('start', null);
178+
expect(invokeControl('connection', 'established'), [
179+
containsPair('UpdateSyncStatus',
180+
containsPair('status', containsPair('connected', true)))
181+
]);
182+
expect(invokeControl('connection', 'end'), [
183+
{
184+
'CloseSyncStream': {'hide_disconnect': false}
185+
}
186+
]);
187+
});
188+
176189
test('does not publish until reaching checkpoint', () {
177190
invokeControl('start', null);
178191
pushCheckpoint(buckets: priorityBuckets);

docs/sync.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ The following commands are supported:
1616
- A `schema: { tables: Table[], raw_tables: RawTable[] }` entry specifying the schema of the database to
1717
use. Regular tables are also inferred from the database itself, but raw tables need to be specified.
1818
If no raw tables are used, the `schema` entry can be omitted.
19+
- `active_streams`: An array of `{name: string, params: Record<string, any>}` entries representing streams that
20+
have an active subscription object in the application at the time the stream was opened.
1921
2. `stop`: No payload, requests the current sync iteration (if any) to be shut down.
2022
3. `line_text`: Payload is a serialized JSON object received from the sync service.
2123
4. `line_binary`: Payload is a BSON-encoded object received from the sync service.
@@ -26,8 +28,14 @@ The following commands are supported:
2628
6. `completed_upload`: Notify the sync implementation that all local changes have been uploaded.
2729
7. `update_subscriptions`: Notify the sync implementation that subscriptions which are currently active in the app
2830
have changed. Depending on the TTL of caches, this may cause it to request a reconnect.
29-
8. `subscriptions`: Store a new sync steam subscription in the database or remove it.
31+
8. `connection`: Notify the sync implementation about the connection being opened (second parameter should be `established`)
32+
or the HTTP stream closing (second parameter should be `end`).
33+
This is used to set `connected` to true in the sync status without waiting for the first sync line.
34+
9. `subscriptions`: Store a new sync steam subscription in the database or remove it.
3035
This command can run outside of a sync iteration and does not affect it.
36+
10. `update_subscriptions`: Second parameter is a JSON-encoded array of `{name: string, params: Record<string, any>}`.
37+
If a new subscription is created, or when a subscription without a TTL has been removed, the client will ask to
38+
restart the connection.
3139

3240
`powersync_control` returns a JSON-encoded array of instructions for the client:
3341

0 commit comments

Comments
 (0)