Skip to content

Commit d57bf0a

Browse files
committed
Fix crud upload triggering to only occur on ps_crud changes.
1 parent ec0769f commit d57bf0a

File tree

3 files changed

+17
-14
lines changed

3 files changed

+17
-14
lines changed

packages/powersync/lib/src/database/native/native_powersync_database.dart

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class PowerSyncDatabaseImpl
143143
await isInitialized;
144144
final dbRef = database.isolateConnectionFactory();
145145
ReceivePort rPort = ReceivePort();
146-
StreamSubscription? updateSubscription;
146+
StreamSubscription? crudUpdateSubscription;
147147
rPort.listen((data) async {
148148
if (data is List) {
149149
String action = data[0];
@@ -160,9 +160,9 @@ class PowerSyncDatabaseImpl
160160
});
161161
} else if (action == 'init') {
162162
SendPort port = data[1];
163-
var throttled =
164-
UpdateNotification.throttleStream(updates, crudThrottleTime);
165-
updateSubscription = throttled.listen((event) {
163+
var crudStream =
164+
database.onChange(['ps_crud'], throttle: crudThrottleTime);
165+
crudUpdateSubscription = crudStream.listen((event) {
166166
port.send(['update']);
167167
});
168168
disconnector.onAbort.then((_) {
@@ -179,7 +179,7 @@ class PowerSyncDatabaseImpl
179179
// Clear status apart from lastSyncedAt
180180
setStatus(SyncStatus(lastSyncedAt: currentStatus.lastSyncedAt));
181181
rPort.close();
182-
updateSubscription?.cancel();
182+
crudUpdateSubscription?.cancel();
183183
} else if (action == 'log') {
184184
LogRecord record = data[1];
185185
logger.log(
@@ -281,7 +281,7 @@ Future<void> _powerSyncDatabaseIsolate(
281281
_PowerSyncDatabaseIsolateArgs args) async {
282282
final sPort = args.sPort;
283283
ReceivePort rPort = ReceivePort();
284-
StreamController updateController = StreamController.broadcast();
284+
StreamController<String> crudUpdateController = StreamController.broadcast();
285285
final upstreamDbClient = args.dbRef.upstreamPort.open();
286286

287287
CommonDatabase? db;
@@ -292,14 +292,14 @@ Future<void> _powerSyncDatabaseIsolate(
292292
if (message is List) {
293293
String action = message[0];
294294
if (action == 'update') {
295-
updateController.add('update');
295+
crudUpdateController.add('update');
296296
} else if (action == 'close') {
297297
// The SyncSqliteConnection uses this mutex
298298
// It needs to be closed before killing the isolate
299299
// in order to free the mutex for other operations.
300300
await mutex.close();
301301
db?.dispose();
302-
updateController.close();
302+
crudUpdateController.close();
303303
upstreamDbClient.close();
304304
// Abort any open http requests, and wait for it to be closed properly
305305
await openedStreamingSync?.abort();
@@ -349,7 +349,7 @@ Future<void> _powerSyncDatabaseIsolate(
349349
credentialsCallback: loadCredentials,
350350
invalidCredentialsCallback: invalidateCredentials,
351351
uploadCrud: uploadCrud,
352-
updateStream: updateController.stream,
352+
crudUpdateTriggerStream: crudUpdateController.stream,
353353
retryDelay: args.retryDelay,
354354
client: http.Client(),
355355
syncParameters: args.parameters);

packages/powersync/lib/src/database/web/web_powersync_database.dart

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,17 @@ class PowerSyncDatabaseImpl
135135

136136
await isInitialized;
137137

138+
final crudStream =
139+
database.onChange(['ps_crud'], throttle: crudThrottleTime);
140+
138141
// TODO better multitab support
139142
final storage = BucketStorage(database);
140143
final sync = StreamingSyncImplementation(
141144
adapter: storage,
142145
credentialsCallback: connector.getCredentialsCached,
143146
invalidCredentialsCallback: connector.fetchCredentials,
144147
uploadCrud: () => connector.uploadData(this),
145-
updateStream: updates,
148+
crudUpdateTriggerStream: crudStream,
146149
retryDelay: Duration(seconds: 3),
147150
client: FetchClient(mode: RequestMode.cors),
148151
syncParameters: params,

packages/powersync/lib/src/streaming_sync.dart

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class StreamingSyncImplementation {
2727

2828
final Future<void> Function() uploadCrud;
2929

30-
final Stream updateStream;
30+
final Stream crudUpdateTriggerStream;
3131

3232
final StreamController<SyncStatus> _statusStreamController =
3333
StreamController<SyncStatus>.broadcast();
@@ -59,7 +59,7 @@ class StreamingSyncImplementation {
5959
required this.credentialsCallback,
6060
this.invalidCredentialsCallback,
6161
required this.uploadCrud,
62-
required this.updateStream,
62+
required this.crudUpdateTriggerStream,
6363
required this.retryDelay,
6464
this.syncParameters,
6565
required http.Client client,
@@ -155,7 +155,7 @@ class StreamingSyncImplementation {
155155
Future<void> crudLoop() async {
156156
await uploadAllCrud();
157157

158-
await for (var _ in updateStream) {
158+
await for (var _ in crudUpdateTriggerStream) {
159159
if (_abort?.aborted == true) {
160160
break;
161161
}
@@ -169,11 +169,11 @@ class StreamingSyncImplementation {
169169
CrudEntry? checkedCrudItem;
170170

171171
while (true) {
172-
_updateStatus(uploading: true);
173172
try {
174173
// This is the first item in the FIFO CRUD queue.
175174
CrudEntry? nextCrudItem = await adapter.nextCrudItem();
176175
if (nextCrudItem != null) {
176+
_updateStatus(uploading: true);
177177
if (nextCrudItem.clientId == checkedCrudItem?.clientId) {
178178
// This will force a higher log level than exceptions which are caught here.
179179
isolateLogger.warning(

0 commit comments

Comments
 (0)