Skip to content

Commit 80cd039

Browse files
committed
Update sqlite3_web, simplify
1 parent 21df6da commit 80cd039

File tree

4 files changed

+106
-172
lines changed

4 files changed

+106
-172
lines changed

packages/sqlite_async/lib/src/web/database.dart

Lines changed: 99 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
import 'dart:async';
22
import 'dart:developer';
33
import 'dart:js_interop';
4-
import 'dart:js_interop_unsafe';
54

65
import 'package:sqlite3/common.dart';
76
import 'package:sqlite3_web/sqlite3_web.dart';
8-
import 'package:sqlite3_web/protocol_utils.dart' as proto;
97
import 'package:sqlite_async/sqlite_async.dart';
108
import 'package:sqlite_async/src/utils/profiler.dart';
119
import 'package:sqlite_async/src/web/database/broadcast_updates.dart';
@@ -97,72 +95,85 @@ class WebDatabase
9795
@override
9896
Future<T> readLock<T>(Future<T> Function(SqliteReadContext tx) callback,
9997
{Duration? lockTimeout, String? debugContext}) async {
100-
if (_mutex case var mutex?) {
101-
return await mutex.lock(timeout: lockTimeout, () {
102-
return ScopedReadContext.assumeReadLock(
103-
_UnscopedContext(this), callback);
104-
});
105-
} else {
106-
// No custom mutex, coordinate locks through shared worker.
107-
await _database.customRequest(
108-
CustomDatabaseMessage(CustomDatabaseMessageKind.requestSharedLock));
109-
110-
try {
111-
return await ScopedReadContext.assumeReadLock(
112-
_UnscopedContext(this), callback);
113-
} finally {
114-
await _database.customRequest(
115-
CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock));
116-
}
117-
}
98+
// Since there is only a single physical connection per database on the web,
99+
// we can't enable concurrent readers to a writer. Even supporting multiple
100+
// readers alone isn't safe, since these readers could start read
101+
// transactions where we need to block other tabs from sending `BEGIN` and
102+
// `COMMIT` statements if they were to start their own transactions.
103+
return _lockInternal(
104+
(unscoped) => ScopedReadContext.assumeReadLock(unscoped, callback),
105+
lockTimeout: lockTimeout,
106+
debugContext: debugContext,
107+
flush: false,
108+
);
118109
}
119110

120111
@override
121112
Future<T> writeTransaction<T>(
122113
Future<T> Function(SqliteWriteContext tx) callback,
123114
{Duration? lockTimeout,
124115
bool? flush}) {
125-
return writeLock((writeContext) {
126-
return ScopedWriteContext.assumeWriteLock(
127-
_UnscopedContext(this),
128-
(ctx) async {
129-
return await ctx.writeTransaction(callback);
130-
},
131-
);
132-
},
133-
debugContext: 'writeTransaction()',
134-
lockTimeout: lockTimeout,
135-
flush: flush);
116+
return _lockInternal(
117+
(context) {
118+
return ScopedWriteContext.assumeWriteLock(
119+
context,
120+
(ctx) async {
121+
return await ctx.writeTransaction(callback);
122+
},
123+
);
124+
},
125+
flush: flush ?? true,
126+
lockTimeout: lockTimeout,
127+
);
136128
}
137129

138130
@override
139131
Future<T> writeLock<T>(Future<T> Function(SqliteWriteContext tx) callback,
140132
{Duration? lockTimeout, String? debugContext, bool? flush}) async {
133+
return await _lockInternal(
134+
(unscoped) {
135+
return ScopedWriteContext.assumeWriteLock(unscoped, callback);
136+
},
137+
flush: flush ?? true,
138+
debugContext: debugContext,
139+
lockTimeout: lockTimeout,
140+
);
141+
}
142+
143+
Future<T> _lockInternal<T>(
144+
Future<T> Function(_UnscopedContext) callback, {
145+
required bool flush,
146+
Duration? lockTimeout,
147+
String? debugContext,
148+
}) async {
141149
if (_mutex case var mutex?) {
142150
return await mutex.lock(timeout: lockTimeout, () async {
143-
final context = _UnscopedContext(this);
151+
final context = _UnscopedContext(this, null);
144152
try {
145-
return await ScopedWriteContext.assumeWriteLock(context, callback);
153+
return await callback(context);
146154
} finally {
147-
if (flush != false) {
155+
if (flush) {
148156
await this.flush();
149157
}
150158
}
151159
});
152160
} else {
153-
// No custom mutex, coordinate locks through shared worker.
154-
await _database.customRequest(CustomDatabaseMessage(
155-
CustomDatabaseMessageKind.requestExclusiveLock));
156-
final context = _UnscopedContext(this);
157-
try {
158-
return await ScopedWriteContext.assumeWriteLock(context, callback);
159-
} finally {
160-
if (flush != false) {
161-
await this.flush();
161+
final abortTrigger = switch (lockTimeout) {
162+
null => null,
163+
final duration => Future.delayed(duration),
164+
};
165+
166+
return await _database.requestLock(abortTrigger: abortTrigger,
167+
(token) async {
168+
final context = _UnscopedContext(this, token);
169+
try {
170+
return await callback(context);
171+
} finally {
172+
if (flush) {
173+
await this.flush();
174+
}
162175
}
163-
await _database.customRequest(
164-
CustomDatabaseMessage(CustomDatabaseMessageKind.releaseLock));
165-
}
176+
});
166177
}
167178
}
168179

@@ -184,9 +195,20 @@ class WebDatabase
184195
final class _UnscopedContext extends UnscopedContext {
185196
final WebDatabase _database;
186197

198+
/// If this context is scoped to a lock on the database, the [LockToken] from
199+
/// `package:sqlite3_web`.
200+
///
201+
/// This token needs to be passed to queries to run them. While a lock token
202+
/// exists on the database, all queries not passing that token are blocked.
203+
final LockToken? _lock;
204+
187205
final TimelineTask? _task;
188206

189-
_UnscopedContext(this._database)
207+
/// Whether statements should be rejected if the database is not in an
208+
/// autocommit state.
209+
bool _checkInTransaction = false;
210+
211+
_UnscopedContext(this._database, this._lock)
190212
: _task = _database.profileQueries ? TimelineTask() : null;
191213

192214
@override
@@ -213,8 +235,15 @@ final class _UnscopedContext extends UnscopedContext {
213235
sql: sql,
214236
parameters: parameters,
215237
() async {
216-
return await wrapSqliteException(
217-
() => _database._database.select(sql, parameters));
238+
return await wrapSqliteException(() async {
239+
final result = await _database._database.select(
240+
sql,
241+
parameters: parameters,
242+
token: _lock,
243+
checkInTransaction: _checkInTransaction,
244+
);
245+
return result.result;
246+
});
218247
},
219248
);
220249
}
@@ -234,8 +263,15 @@ final class _UnscopedContext extends UnscopedContext {
234263
@override
235264
Future<ResultSet> execute(String sql, [List<Object?> parameters = const []]) {
236265
return _task.timeAsync('execute', sql: sql, parameters: parameters, () {
237-
return wrapSqliteException(
238-
() => _database._database.select(sql, parameters));
266+
return wrapSqliteException(() async {
267+
final result = await _database._database.select(
268+
sql,
269+
parameters: parameters,
270+
token: _lock,
271+
checkInTransaction: _checkInTransaction,
272+
);
273+
return result.result;
274+
});
239275
});
240276
}
241277

@@ -246,7 +282,12 @@ final class _UnscopedContext extends UnscopedContext {
246282
for (final set in parameterSets) {
247283
// use execute instead of select to avoid transferring rows from the
248284
// worker to this context.
249-
await _database._database.execute(sql, set);
285+
await _database._database.execute(
286+
sql,
287+
parameters: set,
288+
token: _lock,
289+
checkInTransaction: _checkInTransaction,
290+
);
250291
}
251292
});
252293
});
@@ -256,75 +297,7 @@ final class _UnscopedContext extends UnscopedContext {
256297
UnscopedContext interceptOutermostTransaction() {
257298
// All execute calls done in the callback will be checked for the
258299
// autocommit state
259-
return _ExclusiveTransactionContext(_database);
260-
}
261-
}
262-
263-
final class _ExclusiveTransactionContext extends _UnscopedContext {
264-
_ExclusiveTransactionContext(super._database);
265-
266-
Future<ResultSet> _executeInternal(
267-
String sql, List<Object?> parameters) async {
268-
// Operations inside transactions are executed with custom requests
269-
// in order to verify that the connection does not have autocommit enabled.
270-
// The worker will check if autocommit = true before executing the SQL.
271-
// An exception will be thrown if autocommit is enabled.
272-
// The custom request which does the above will return the ResultSet as a formatted
273-
// JavaScript object. This is the converted into a Dart ResultSet.
274-
return await wrapSqliteException(() async {
275-
var res = await _database._database.customRequest(CustomDatabaseMessage(
276-
CustomDatabaseMessageKind.executeInTransaction, sql, parameters))
277-
as JSObject;
278-
279-
if (res.has('format') && (res['format'] as JSNumber).toDartInt == 2) {
280-
// Newer workers use a serialization format more efficient than dartify().
281-
return proto.deserializeResultSet(res['r'] as JSObject);
282-
}
283-
284-
var result = Map<String, dynamic>.from(res.dartify() as Map);
285-
final columnNames = [
286-
for (final entry in result['columnNames']) entry as String
287-
];
288-
final rawTableNames = result['tableNames'];
289-
final tableNames = rawTableNames != null
290-
? [
291-
for (final entry in (rawTableNames as List<Object?>))
292-
entry as String
293-
]
294-
: null;
295-
296-
final rows = <List<Object?>>[];
297-
for (final row in (result['rows'] as List<Object?>)) {
298-
final dartRow = <Object?>[];
299-
300-
for (final column in (row as List<Object?>)) {
301-
dartRow.add(column);
302-
}
303-
304-
rows.add(dartRow);
305-
}
306-
final resultSet = ResultSet(columnNames, tableNames, rows);
307-
return resultSet;
308-
});
309-
}
310-
311-
@override
312-
Future<ResultSet> execute(String sql,
313-
[List<Object?> parameters = const []]) async {
314-
return _task.timeAsync('execute', sql: sql, parameters: parameters, () {
315-
return _executeInternal(sql, parameters);
316-
});
317-
}
318-
319-
@override
320-
Future<void> executeBatch(
321-
String sql, List<List<Object?>> parameterSets) async {
322-
return _task.timeAsync('executeBatch', sql: sql, () async {
323-
for (final set in parameterSets) {
324-
await _database._database.customRequest(CustomDatabaseMessage(
325-
CustomDatabaseMessageKind.executeBatchInTransaction, sql, set));
326-
}
327-
});
300+
return _UnscopedContext(_database, _lock).._checkInTransaction = true;
328301
}
329302
}
330303

@@ -337,6 +310,11 @@ Future<T> wrapSqliteException<T>(Future<T> Function() callback) async {
337310
throw serializedCause;
338311
}
339312

313+
if (ex.message.contains('Database is not in a transaction')) {
314+
throw SqliteException(
315+
0, "Transaction rolled back by earlier statement. Cannot execute.");
316+
}
317+
340318
// Older versions of package:sqlite_web reported SqliteExceptions as strings
341319
// only.
342320
if (ex.toString().contains('SqliteException')) {

packages/sqlite_async/lib/src/web/protocol.dart

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,8 @@ import 'dart:js_interop';
66
import 'package:sqlite3_web/protocol_utils.dart' as proto;
77

88
enum CustomDatabaseMessageKind {
9-
requestSharedLock,
10-
requestExclusiveLock,
11-
releaseLock,
12-
lockObtained,
9+
ok,
1310
getAutoCommit,
14-
executeInTransaction,
1511
executeBatchInTransaction,
1612
updateSubscriptionManagement,
1713
notifyUpdates,

packages/sqlite_async/lib/src/web/worker/worker_utils.dart

Lines changed: 2 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import 'dart:async';
22
import 'dart:js_interop';
3-
import 'dart:js_interop_unsafe';
43

54
import 'package:meta/meta.dart';
65
import 'package:mutex/mutex.dart';
@@ -62,12 +61,6 @@ class AsyncSqliteDatabase extends WorkerDatabase {
6261
return _state.putIfAbsent(connection, _ConnectionState.new);
6362
}
6463

65-
void _markHoldsMutex(ClientConnection connection) {
66-
final state = _findState(connection);
67-
state.holdsMutex = true;
68-
_registerCloseListener(state, connection);
69-
}
70-
7164
void _registerCloseListener(
7265
_ConnectionState state, ClientConnection connection) {
7366
if (!state.hasOnCloseListener) {
@@ -87,44 +80,11 @@ class AsyncSqliteDatabase extends WorkerDatabase {
8780
final message = request as CustomDatabaseMessage;
8881

8982
switch (message.kind) {
90-
case CustomDatabaseMessageKind.requestSharedLock:
91-
await mutex.acquireRead();
92-
_markHoldsMutex(connection);
93-
case CustomDatabaseMessageKind.requestExclusiveLock:
94-
await mutex.acquireWrite();
95-
_markHoldsMutex(connection);
96-
case CustomDatabaseMessageKind.releaseLock:
97-
_findState(connection).holdsMutex = false;
98-
mutex.release();
99-
case CustomDatabaseMessageKind.lockObtained:
83+
case CustomDatabaseMessageKind.ok:
10084
case CustomDatabaseMessageKind.notifyUpdates:
10185
throw UnsupportedError('This is a response, not a request');
10286
case CustomDatabaseMessageKind.getAutoCommit:
10387
return database.autocommit.toJS;
104-
case CustomDatabaseMessageKind.executeInTransaction:
105-
final sql = message.rawSql.toDart;
106-
final hasTypeInfo = message.typeInfo.isDefinedAndNotNull;
107-
final parameters = proto.deserializeParameters(
108-
message.rawParameters, message.typeInfo);
109-
if (database.autocommit) {
110-
throw SqliteException(0,
111-
"Transaction rolled back by earlier statement. Cannot execute: $sql");
112-
}
113-
114-
var res = database.select(sql, parameters);
115-
if (hasTypeInfo) {
116-
// If the client is sending a request that has parameters with type
117-
// information, it will also support a newer serialization format for
118-
// result sets.
119-
return JSObject()
120-
..['format'] = 2.toJS
121-
..['r'] = proto.serializeResultSet(res);
122-
} else {
123-
var dartMap = resultSetToMap(res);
124-
var jsObject = dartMap.jsify();
125-
return jsObject;
126-
}
127-
12888
case CustomDatabaseMessageKind.executeBatchInTransaction:
12989
final sql = message.rawSql.toDart;
13090
final parameters = proto.deserializeParameters(
@@ -157,7 +117,7 @@ class AsyncSqliteDatabase extends WorkerDatabase {
157117
}
158118
}
159119

160-
return CustomDatabaseMessage(CustomDatabaseMessageKind.lockObtained);
120+
return CustomDatabaseMessage(CustomDatabaseMessageKind.ok);
161121
}
162122

163123
Map<String, dynamic> resultSetToMap(ResultSet resultSet) {

0 commit comments

Comments
 (0)