From 21385c6f906d052648642d04ea260bbfb68b1eb6 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 29 Oct 2025 09:20:39 +0200 Subject: [PATCH 01/11] fix: improve OPFS multi-tab connection recovery --- .changeset/gorgeous-pots-repeat.md | 5 ++++ .../components/providers/SystemProvider.tsx | 16 ++++++++--- .../worker/sync/SharedSyncImplementation.ts | 27 ++++++++++--------- 3 files changed, 32 insertions(+), 16 deletions(-) create mode 100644 .changeset/gorgeous-pots-repeat.md diff --git a/.changeset/gorgeous-pots-repeat.md b/.changeset/gorgeous-pots-repeat.md new file mode 100644 index 000000000..0f0ada625 --- /dev/null +++ b/.changeset/gorgeous-pots-repeat.md @@ -0,0 +1,5 @@ +--- +'@powersync/web': patch +--- + +Improve OPFS multitab connection recovery when tabs are closed. diff --git a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx index 8a3f3c209..c50cb0e94 100644 --- a/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx +++ b/demos/react-supabase-todolist/src/components/providers/SystemProvider.tsx @@ -3,7 +3,14 @@ import { AppSchema, ListRecord, LISTS_TABLE, TODOS_TABLE } from '@/library/power import { SupabaseConnector } from '@/library/powersync/SupabaseConnector'; import { CircularProgress } from '@mui/material'; import { PowerSyncContext } from '@powersync/react'; -import { createBaseLogger, DifferentialWatchedQuery, LogLevel, PowerSyncDatabase } from '@powersync/web'; +import { + createBaseLogger, + DifferentialWatchedQuery, + LogLevel, + PowerSyncDatabase, + WASQLiteOpenFactory, + WASQLiteVFS +} from '@powersync/web'; import React, { Suspense } from 'react'; import { NavigationPanelContextProvider } from '../navigation/NavigationPanelContext'; @@ -12,9 +19,10 @@ export const useSupabase = () => React.useContext(SupabaseContext); export const db = new PowerSyncDatabase({ schema: AppSchema, - database: { - dbFilename: 'example.db' - } + database: new WASQLiteOpenFactory({ + dbFilename: 'example.db', + vfs: WASQLiteVFS.OPFSCoopSyncVFS + }) }); export type EnhancedListRecord = ListRecord & { total_tasks: number; completed_tasks: number }; diff --git a/packages/web/src/worker/sync/SharedSyncImplementation.ts b/packages/web/src/worker/sync/SharedSyncImplementation.ts index f8923c69f..5c4f5b683 100644 --- a/packages/web/src/worker/sync/SharedSyncImplementation.ts +++ b/packages/web/src/worker/sync/SharedSyncImplementation.ts @@ -1,10 +1,4 @@ import { - type ILogger, - type ILogLevel, - type PowerSyncConnectionOptions, - type StreamingSyncImplementation, - type StreamingSyncImplementationListener, - type SyncStatusOptions, AbortOperation, BaseObserver, ConnectionManager, @@ -13,7 +7,13 @@ import { PowerSyncBackendConnector, SqliteBucketStorage, SubscribedStream, - SyncStatus + SyncStatus, + type ILogger, + type ILogLevel, + type PowerSyncConnectionOptions, + type StreamingSyncImplementation, + type StreamingSyncImplementationListener, + type SyncStatusOptions } from '@powersync/common'; import { Mutex } from 'async-mutex'; import * as Comlink from 'comlink'; @@ -75,7 +75,7 @@ export type WrappedSyncPort = { clientProvider: Comlink.Remote; db?: DBAdapter; currentSubscriptions: SubscribedStream[]; - closeListeners: (() => void)[]; + closeListeners: (() => void | Promise)[]; }; /** @@ -334,12 +334,15 @@ export class SharedSyncImplementation extends BaseObserver this.logger.warn('Error while disconnecting. Will attempt to reconnect.', ex)); // Clearing the adapter will result in a new one being opened in connect this.dbAdapter = null; @@ -482,9 +485,9 @@ export class SharedSyncImplementation extends BaseObserver { + lastClient.closeListeners.push(async () => { this.logger.info('Aborting open connection because associated tab closed.'); - wrapped.close(); + await wrapped.close().catch((ex) => this.logger.warn('error closing database connection', ex)); wrapped.markRemoteClosed(); }); From e6960da3f01b0d0434f8bf998604bc810519e716 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 19 Nov 2025 16:13:07 +0200 Subject: [PATCH 02/11] wip: indexeddb holds --- .../db/adapters/AsyncDatabaseConnection.ts | 2 + .../db/adapters/LockedAsyncDatabaseAdapter.ts | 29 ++-- .../WorkerWrappedAsyncDatabaseConnection.ts | 8 ++ .../adapters/wa-sqlite/WASQLiteConnection.ts | 37 +++++ .../src/worker/db/SharedWASQLiteConnection.ts | 126 ++++++++++++++++++ .../web/src/worker/db/WASQLiteDB.worker.ts | 79 ++++------- .../src/worker/db/WorkerWASQLiteConnection.ts | 29 ++++ packages/web/vitest.config.ts | 2 +- 8 files changed, 247 insertions(+), 65 deletions(-) create mode 100644 packages/web/src/worker/db/SharedWASQLiteConnection.ts create mode 100644 packages/web/src/worker/db/WorkerWASQLiteConnection.ts diff --git a/packages/web/src/db/adapters/AsyncDatabaseConnection.ts b/packages/web/src/db/adapters/AsyncDatabaseConnection.ts index 183d5210b..2e90c77c9 100644 --- a/packages/web/src/db/adapters/AsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/AsyncDatabaseConnection.ts @@ -25,6 +25,8 @@ export type OnTableChangeCallback = (event: BatchedUpdateNotification) => void; export interface AsyncDatabaseConnection { init(): Promise; close(): Promise; + markHold(): Promise; + releaseHold(holdId: string): Promise; execute(sql: string, params?: any[]): Promise; executeRaw(sql: string, params?: any[]): Promise; executeBatch(sql: string, params?: any[]): Promise; diff --git a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts index 25e0afa56..5525d1b0a 100644 --- a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts +++ b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts @@ -1,14 +1,14 @@ import { - type ILogger, BaseObserver, - createLogger, DBAdapter, DBAdapterListener, DBGetUtils, DBLockOptions, LockContext, QueryResult, - Transaction + Transaction, + createLogger, + type ILogger } from '@powersync/common'; import { getNavigatorLocks } from '../..//shared/navigator'; import { AsyncDatabaseConnection } from './AsyncDatabaseConnection'; @@ -125,7 +125,7 @@ export class LockedAsyncDatabaseAdapter if (false == this._db instanceof WorkerWrappedAsyncDatabaseConnection) { throw new Error(`Only worker connections can be shared`); } - return this._db.shareConnection(); + return (this._db as WorkerWrappedAsyncDatabaseConnection).shareConnection(); } /** @@ -221,13 +221,22 @@ export class LockedAsyncDatabaseAdapter }, timeoutMs) : null; - return getNavigatorLocks().request(`db-lock-${this._dbIdentifier}`, { signal: abortController.signal }, () => { - this.pendingAbortControllers.delete(abortController); - if (timoutId) { - clearTimeout(timoutId); + return getNavigatorLocks().request( + `db-lock-${this._dbIdentifier}`, + { signal: abortController.signal }, + async () => { + this.pendingAbortControllers.delete(abortController); + if (timoutId) { + clearTimeout(timoutId); + } + const holdId = await this.baseDB.markHold(); + try { + return await callback(); + } finally { + await this.baseDB.releaseHold(holdId); + } } - return callback(); - }); + ); } async readTransaction(fn: (tx: Transaction) => Promise, options?: DBLockOptions | undefined): Promise { diff --git a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts index 5c4c6902e..2161d4b9c 100644 --- a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts @@ -60,6 +60,14 @@ export class WorkerWrappedAsyncDatabaseConnection { + return this.baseConnection.markHold(); + } + + releaseHold(holdId: string): Promise { + return this.baseConnection.releaseHold(holdId); + } + private withRemote(workerPromise: () => Promise): Promise { const controller = this.notifyRemoteClosed; if (controller) { diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts index 091c85da4..5619f6d40 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts @@ -26,6 +26,14 @@ export type WASQLiteBroadCastTableUpdateEvent = { */ export type WASQLiteConnectionListener = { tablesUpdated: (event: BatchedUpdateNotification) => void; + /** + * Triggered when an active hold is overwritten by a new hold. + * This is most likely to happen when a shared connection has been closed + * without releasing the hold. + * This listener can be used to cleanup any resources associated with the previous hold. + * @param holdId - The id of the hold that has been overwritten. + */ + holdOverwritten: (holdId: string) => Promise; }; /** @@ -148,6 +156,9 @@ export class WASqliteConnection */ protected connectionId: number; + protected _holdCounter: number; + protected _holdId: string | null; + constructor(protected options: ResolvedWASQLiteOpenFactoryOptions) { super(); this.updatedTables = new Set(); @@ -156,6 +167,16 @@ export class WASqliteConnection this.connectionId = new Date().valueOf() + Math.random(); this.statementMutex = new Mutex(); this._moduleFactory = DEFAULT_MODULE_FACTORIES[this.options.vfs]; + this._holdCounter = 0; + this._holdId = null; + } + + /** + * Gets the id for the current hold. + * This can be used to check for invalid states. + */ + get currentHoldId() { + return this._holdId; } protected get sqliteAPI() { @@ -172,6 +193,22 @@ export class WASqliteConnection return this._dbP; } + async markHold(): Promise { + const previousHoldId = this._holdId; + this._holdId = `${++this._holdCounter}`; + if (previousHoldId) { + await this.iterateAsyncListeners(async (cb) => cb.holdOverwritten?.(previousHoldId)); + } + return this._holdId; + } + + async releaseHold(holdId: string): Promise { + if (holdId != this._holdId) { + throw new Error(`Invalid hold state, expected ${this._holdId} but got ${holdId}`); + } + this._holdId = null; + } + protected async openDB() { this._dbP = await this.sqliteAPI.open_v2(this.options.dbFilename); return this._dbP; diff --git a/packages/web/src/worker/db/SharedWASQLiteConnection.ts b/packages/web/src/worker/db/SharedWASQLiteConnection.ts new file mode 100644 index 000000000..3f397dee1 --- /dev/null +++ b/packages/web/src/worker/db/SharedWASQLiteConnection.ts @@ -0,0 +1,126 @@ +import { ILogger } from '@powersync/common'; +import { + AsyncDatabaseConnection, + OnTableChangeCallback, + ProxiedQueryResult +} from '../../db/adapters/AsyncDatabaseConnection'; +import { ResolvedWebSQLOpenOptions } from '../../db/adapters/web-sql-flags'; + +/** + * Keeps track of open DB connections and the clients which + * are using it. + */ +export type SharedDBWorkerConnection = { + clientIds: Set; + db: AsyncDatabaseConnection; +}; + +export type SharedWASQLiteConnectionOptions = { + dbMap: Map; + dbFilename: string; + clientId: number; + logger: ILogger; +}; + +export class SharedWASQLiteConnection implements AsyncDatabaseConnection { + protected isClosing: boolean; + // Keeps track if this current hold if the shared connection has a hold + protected activeHoldId: string | null; + + constructor(protected options: SharedWASQLiteConnectionOptions) { + // Add this client ID to the set of known clients + this.clientIds.add(options.clientId); + this.isClosing = false; + this.activeHoldId = null; + } + + async init(): Promise { + // No-op since the connection is already initialized when it was created + } + + async markHold(): Promise { + this.activeHoldId = await this.connection.markHold(); + return this.activeHoldId; + } + + async releaseHold(id: string): Promise { + try { + await this.connection.releaseHold(id); + } finally { + this.activeHoldId = null; + } + } + + protected get logger() { + return this.options.logger; + } + + protected get dbEntry() { + return this.options.dbMap.get(this.options.dbFilename)!; + } + + protected get connection() { + return this.dbEntry.db; + } + + protected get clientIds() { + return this.dbEntry.clientIds; + } + + /** + * Handles closing of a shared connection. + * The connection is only closed if there are no active clients using it. + */ + async close(): Promise { + // This prevents further statements on this connection from being executed + this.isClosing = true; + const { clientIds, logger } = this; + const { clientId, dbFilename, dbMap } = this.options; + logger.debug(`Close requested from client ${clientId} of ${[...clientIds]}`); + clientIds.delete(clientId); + + if (this.activeHoldId) { + /** + * The hold hasn't been released, but we're closing now. + * We can proactively cleanup and release the hold. + */ + await this.connection.execute('ROLLBACK').catch(() => {}); + await this.connection.releaseHold(this.activeHoldId).catch(() => {}); + } + + if (clientIds.size == 0) { + logger.debug(`Closing connection to ${this.options}.`); + dbMap.delete(dbFilename); + return this.connection.close(); + } + logger.debug(`Connection to ${dbFilename} not closed yet due to active clients.`); + return; + } + + protected async withClosing(action: () => Promise) { + if (this.isClosing) { + throw new Error('Connection is closing'); + } + return action(); + } + + async execute(sql: string, params?: any[]): Promise { + return this.withClosing(() => this.connection.execute(sql, params)); + } + + async executeRaw(sql: string, params?: any[]): Promise { + return this.withClosing(() => this.connection.executeRaw(sql, params)); + } + + executeBatch(sql: string, params?: any[] | undefined): Promise { + return this.withClosing(() => this.connection.executeBatch(sql, params)); + } + + registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void> { + return this.connection.registerOnTableChange(callback); + } + + getConfig(): Promise { + return this.connection.getConfig(); + } +} diff --git a/packages/web/src/worker/db/WASQLiteDB.worker.ts b/packages/web/src/worker/db/WASQLiteDB.worker.ts index 3db4bdc49..65d6e3f0a 100644 --- a/packages/web/src/worker/db/WASQLiteDB.worker.ts +++ b/packages/web/src/worker/db/WASQLiteDB.worker.ts @@ -6,47 +6,20 @@ import '@journeyapps/wa-sqlite'; import { createBaseLogger, createLogger } from '@powersync/common'; import * as Comlink from 'comlink'; import { AsyncDatabaseConnection } from '../../db/adapters/AsyncDatabaseConnection'; -import { WASqliteConnection } from '../../db/adapters/wa-sqlite/WASQLiteConnection'; -import { - ResolvedWASQLiteOpenFactoryOptions, - WorkerDBOpenerOptions -} from '../../db/adapters/wa-sqlite/WASQLiteOpenFactory'; +import { WorkerDBOpenerOptions } from '../../db/adapters/wa-sqlite/WASQLiteOpenFactory'; import { getNavigatorLocks } from '../../shared/navigator'; +import { SharedDBWorkerConnection, SharedWASQLiteConnection } from './SharedWASQLiteConnection'; +import { WorkerWASQLiteConnection, proxyWASQLiteConnection } from './WorkerWASQLiteConnection'; const baseLogger = createBaseLogger(); baseLogger.useDefaults(); const logger = createLogger('db-worker'); -/** - * Keeps track of open DB connections and the clients which - * are using it. - */ -type SharedDBWorkerConnection = { - clientIds: Set; - db: AsyncDatabaseConnection; -}; - const DBMap = new Map(); const OPEN_DB_LOCK = 'open-wasqlite-db'; let nextClientId = 1; -const openWorkerConnection = async (options: ResolvedWASQLiteOpenFactoryOptions): Promise => { - const connection = new WASqliteConnection(options); - return { - init: Comlink.proxy(() => connection.init()), - getConfig: Comlink.proxy(() => connection.getConfig()), - close: Comlink.proxy(() => connection.close()), - execute: Comlink.proxy(async (sql: string, params?: any[]) => connection.execute(sql, params)), - executeRaw: Comlink.proxy(async (sql: string, params?: any[]) => connection.executeRaw(sql, params)), - executeBatch: Comlink.proxy(async (sql: string, params?: any[]) => connection.executeBatch(sql, params)), - registerOnTableChange: Comlink.proxy(async (callback) => { - // Proxy the callback remove function - return Comlink.proxy(await connection.registerOnTableChange(callback)); - }) - }; -}; - const openDBShared = async (options: WorkerDBOpenerOptions): Promise => { // Prevent multiple simultaneous opens from causing race conditions return getNavigatorLocks().request(OPEN_DB_LOCK, async () => { @@ -57,38 +30,36 @@ const openDBShared = async (options: WorkerDBOpenerOptions): Promise(); - const connection = await openWorkerConnection(options); + // This format returns proxy objects for function callbacks + const connection = new WorkerWASQLiteConnection(options); await connection.init(); + + connection.registerListener({ + holdOverwritten: async () => { + /** + * The previous hold has been overwritten, without being released. + * we need to cleanup any resources associated with it. + * We can perform a rollback to release any potential transactions that were started. + */ + await connection.execute('ROLLBACK').catch(() => {}); + } + }); + DBMap.set(dbFilename, { clientIds, db: connection }); } - const dbEntry = DBMap.get(dbFilename)!; - dbEntry.clientIds.add(clientId); - const { db } = dbEntry; - - const wrappedConnection = { - ...db, - init: Comlink.proxy(async () => { - // the init has been done automatically - }), - close: Comlink.proxy(async () => { - const { clientIds } = dbEntry; - logger.debug(`Close requested from client ${clientId} of ${[...clientIds]}`); - clientIds.delete(clientId); - if (clientIds.size == 0) { - logger.debug(`Closing connection to ${dbFilename}.`); - DBMap.delete(dbFilename); - return db.close?.(); - } - logger.debug(`Connection to ${dbFilename} not closed yet due to active clients.`); - return; - }) - }; + // Associates this clientId with the shared connection entry + const sharedConnection = new SharedWASQLiteConnection({ + dbMap: DBMap, + dbFilename, + clientId, + logger + }); - return Comlink.proxy(wrappedConnection); + return proxyWASQLiteConnection(sharedConnection); }); }; diff --git a/packages/web/src/worker/db/WorkerWASQLiteConnection.ts b/packages/web/src/worker/db/WorkerWASQLiteConnection.ts new file mode 100644 index 000000000..831075bb1 --- /dev/null +++ b/packages/web/src/worker/db/WorkerWASQLiteConnection.ts @@ -0,0 +1,29 @@ +import * as Comlink from 'comlink'; +import { AsyncDatabaseConnection, OnTableChangeCallback } from '../../db/adapters/AsyncDatabaseConnection'; +import { WASqliteConnection } from '../../db/adapters/wa-sqlite/WASQLiteConnection'; + +/** + * Fully proxies a WASQLiteConnection to be used as an AsyncDatabaseConnection. + */ +export function proxyWASQLiteConnection(connection: AsyncDatabaseConnection): AsyncDatabaseConnection { + return Comlink.proxy({ + init: Comlink.proxy(() => connection.init()), + close: Comlink.proxy(() => connection.close()), + markHold: Comlink.proxy(() => connection.markHold()), + releaseHold: Comlink.proxy((holdId: string) => connection.releaseHold(holdId)), + execute: Comlink.proxy((sql: string, params?: any[]) => connection.execute(sql, params)), + executeRaw: Comlink.proxy((sql: string, params?: any[]) => connection.executeRaw(sql, params)), + executeBatch: Comlink.proxy((sql: string, params?: any[]) => connection.executeBatch(sql, params)), + registerOnTableChange: Comlink.proxy((callback: OnTableChangeCallback) => + connection.registerOnTableChange(callback) + ), + getConfig: Comlink.proxy(() => connection.getConfig()) + }); +} + +export class WorkerWASQLiteConnection extends WASqliteConnection { + registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void> { + // Proxy the callback remove function + return Comlink.proxy(super.registerOnTableChange(callback)); + } +} diff --git a/packages/web/vitest.config.ts b/packages/web/vitest.config.ts index 0e586da8d..3f1ec3a0e 100644 --- a/packages/web/vitest.config.ts +++ b/packages/web/vitest.config.ts @@ -50,7 +50,7 @@ const config: UserConfigExport = { */ isolate: true, provider: 'playwright', - headless: true, + headless: false, instances: [ { browser: 'chromium' From 1e030460bab7657f1096432018112716b12cd50d Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 19 Nov 2025 16:21:06 +0200 Subject: [PATCH 03/11] fix proxy --- packages/web/src/worker/db/SharedWASQLiteConnection.ts | 3 ++- packages/web/src/worker/db/WorkerWASQLiteConnection.ts | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/web/src/worker/db/SharedWASQLiteConnection.ts b/packages/web/src/worker/db/SharedWASQLiteConnection.ts index 3f397dee1..e8d65a29f 100644 --- a/packages/web/src/worker/db/SharedWASQLiteConnection.ts +++ b/packages/web/src/worker/db/SharedWASQLiteConnection.ts @@ -90,8 +90,9 @@ export class SharedWASQLiteConnection implements AsyncDatabaseConnection { if (clientIds.size == 0) { logger.debug(`Closing connection to ${this.options}.`); + const connection = this.connection; dbMap.delete(dbFilename); - return this.connection.close(); + await connection.close(); } logger.debug(`Connection to ${dbFilename} not closed yet due to active clients.`); return; diff --git a/packages/web/src/worker/db/WorkerWASQLiteConnection.ts b/packages/web/src/worker/db/WorkerWASQLiteConnection.ts index 831075bb1..dadf0dd74 100644 --- a/packages/web/src/worker/db/WorkerWASQLiteConnection.ts +++ b/packages/web/src/worker/db/WorkerWASQLiteConnection.ts @@ -22,8 +22,8 @@ export function proxyWASQLiteConnection(connection: AsyncDatabaseConnection): As } export class WorkerWASQLiteConnection extends WASqliteConnection { - registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void> { + async registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void> { // Proxy the callback remove function - return Comlink.proxy(super.registerOnTableChange(callback)); + return Comlink.proxy(await super.registerOnTableChange(callback)); } } From 07d5420cec7919be2dd12ccd833ab8f3a8b71317 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 19 Nov 2025 16:22:30 +0200 Subject: [PATCH 04/11] add changeset --- .changeset/silver-insects-unite.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/silver-insects-unite.md diff --git a/.changeset/silver-insects-unite.md b/.changeset/silver-insects-unite.md new file mode 100644 index 000000000..ec6c51816 --- /dev/null +++ b/.changeset/silver-insects-unite.md @@ -0,0 +1,5 @@ +--- +'@powersync/web': patch +--- + +Fixed an issue where IndexedDB could cause "cannot start a transaction within a transaction" errors From ffeec7ec4fb9cc5dbccf997a76a778d16ea063dd Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 19 Nov 2025 16:24:05 +0200 Subject: [PATCH 05/11] fix tests --- packages/web/vitest.config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/web/vitest.config.ts b/packages/web/vitest.config.ts index 3f1ec3a0e..0e586da8d 100644 --- a/packages/web/vitest.config.ts +++ b/packages/web/vitest.config.ts @@ -50,7 +50,7 @@ const config: UserConfigExport = { */ isolate: true, provider: 'playwright', - headless: false, + headless: true, instances: [ { browser: 'chromium' From 2717b7a77df85b968ce8a6e198a6dfee837a0c3a Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 19 Nov 2025 16:47:59 +0200 Subject: [PATCH 06/11] Code cleanup --- .../web/src/db/adapters/AsyncDatabaseConnection.ts | 8 ++++++++ .../web/src/worker/db/SharedWASQLiteConnection.ts | 11 +++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/packages/web/src/db/adapters/AsyncDatabaseConnection.ts b/packages/web/src/db/adapters/AsyncDatabaseConnection.ts index 2e90c77c9..38d004109 100644 --- a/packages/web/src/db/adapters/AsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/AsyncDatabaseConnection.ts @@ -25,7 +25,15 @@ export type OnTableChangeCallback = (event: BatchedUpdateNotification) => void; export interface AsyncDatabaseConnection { init(): Promise; close(): Promise; + /** + * Marks the connection as in-use by a certain actor. + * @returns A hold ID which can be used to release the hold. + */ markHold(): Promise; + /** + * Releases a hold on the connection. + * @param holdId The hold ID to release. + */ releaseHold(holdId: string): Promise; execute(sql: string, params?: any[]): Promise; executeRaw(sql: string, params?: any[]): Promise; diff --git a/packages/web/src/worker/db/SharedWASQLiteConnection.ts b/packages/web/src/worker/db/SharedWASQLiteConnection.ts index e8d65a29f..99e80bd33 100644 --- a/packages/web/src/worker/db/SharedWASQLiteConnection.ts +++ b/packages/web/src/worker/db/SharedWASQLiteConnection.ts @@ -80,12 +80,11 @@ export class SharedWASQLiteConnection implements AsyncDatabaseConnection { clientIds.delete(clientId); if (this.activeHoldId) { - /** - * The hold hasn't been released, but we're closing now. - * We can proactively cleanup and release the hold. - */ - await this.connection.execute('ROLLBACK').catch(() => {}); - await this.connection.releaseHold(this.activeHoldId).catch(() => {}); + // We can't cleanup here since we're not in a lock context. + // The cleanup will occur once a new hold is acquired. + this.logger.info( + `Hold ${this.activeHoldId} was still active when the connection was closed. Cleanup will occur once a new hold is acquired.` + ); } if (clientIds.size == 0) { From 47408538536ae8260978ed643e0af74d66440bbb Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 20 Nov 2025 10:23:21 +0200 Subject: [PATCH 07/11] Add unit tests and autocommit check for tests --- .../db/adapters/AsyncDatabaseConnection.ts | 5 + .../WorkerWrappedAsyncDatabaseConnection.ts | 10 +- .../adapters/wa-sqlite/WASQLiteConnection.ts | 8 ++ .../src/worker/db/SharedWASQLiteConnection.ts | 32 ++--- .../web/src/worker/db/WASQLiteDB.worker.ts | 4 +- .../src/worker/db/WorkerWASQLiteConnection.ts | 21 +--- packages/web/tests/multiple_instances.test.ts | 113 +++++++++++++++++- 7 files changed, 153 insertions(+), 40 deletions(-) diff --git a/packages/web/src/db/adapters/AsyncDatabaseConnection.ts b/packages/web/src/db/adapters/AsyncDatabaseConnection.ts index 38d004109..1b983802c 100644 --- a/packages/web/src/db/adapters/AsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/AsyncDatabaseConnection.ts @@ -35,6 +35,11 @@ export interface AsyncDatabaseConnection; + /** + * Checks if the database connection is in autocommit mode. + * @returns true if in autocommit mode, false if in a transaction + */ + isAutoCommit(): Promise; execute(sql: string, params?: any[]): Promise; executeRaw(sql: string, params?: any[]): Promise; executeBatch(sql: string, params?: any[]): Promise; diff --git a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts index 2161d4b9c..bc698b13a 100644 --- a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts @@ -61,11 +61,15 @@ export class WorkerWrappedAsyncDatabaseConnection { - return this.baseConnection.markHold(); + return this.withRemote(() => this.baseConnection.markHold()); } releaseHold(holdId: string): Promise { - return this.baseConnection.releaseHold(holdId); + return this.withRemote(() => this.baseConnection.releaseHold(holdId)); + } + + isAutoCommit(): Promise { + return this.baseConnection.isAutoCommit(); } private withRemote(workerPromise: () => Promise): Promise { @@ -74,6 +78,8 @@ export class WorkerWrappedAsyncDatabaseConnection { if (controller.signal.aborted) { reject(new Error('Called operation on closed remote')); + // Don't run the operation if we're going to reject + return; } function handleAbort() { diff --git a/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts b/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts index 5619f6d40..8860a7233 100644 --- a/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts +++ b/packages/web/src/db/adapters/wa-sqlite/WASQLiteConnection.ts @@ -193,6 +193,14 @@ export class WASqliteConnection return this._dbP; } + /** + * Checks if the database connection is in autocommit mode. + * @returns true if in autocommit mode, false if in a transaction + */ + async isAutoCommit(): Promise { + return this.sqliteAPI.get_autocommit(this.dbP) != 0; + } + async markHold(): Promise { const previousHoldId = this._holdId; this._holdId = `${++this._holdCounter}`; diff --git a/packages/web/src/worker/db/SharedWASQLiteConnection.ts b/packages/web/src/worker/db/SharedWASQLiteConnection.ts index 99e80bd33..3537442ab 100644 --- a/packages/web/src/worker/db/SharedWASQLiteConnection.ts +++ b/packages/web/src/worker/db/SharedWASQLiteConnection.ts @@ -34,6 +34,22 @@ export class SharedWASQLiteConnection implements AsyncDatabaseConnection { this.activeHoldId = null; } + protected get logger() { + return this.options.logger; + } + + protected get dbEntry() { + return this.options.dbMap.get(this.options.dbFilename)!; + } + + protected get connection() { + return this.dbEntry.db; + } + + protected get clientIds() { + return this.dbEntry.clientIds; + } + async init(): Promise { // No-op since the connection is already initialized when it was created } @@ -51,20 +67,8 @@ export class SharedWASQLiteConnection implements AsyncDatabaseConnection { } } - protected get logger() { - return this.options.logger; - } - - protected get dbEntry() { - return this.options.dbMap.get(this.options.dbFilename)!; - } - - protected get connection() { - return this.dbEntry.db; - } - - protected get clientIds() { - return this.dbEntry.clientIds; + async isAutoCommit(): Promise { + return this.connection.isAutoCommit(); } /** diff --git a/packages/web/src/worker/db/WASQLiteDB.worker.ts b/packages/web/src/worker/db/WASQLiteDB.worker.ts index 65d6e3f0a..4f40fdad4 100644 --- a/packages/web/src/worker/db/WASQLiteDB.worker.ts +++ b/packages/web/src/worker/db/WASQLiteDB.worker.ts @@ -9,7 +9,7 @@ import { AsyncDatabaseConnection } from '../../db/adapters/AsyncDatabaseConnecti import { WorkerDBOpenerOptions } from '../../db/adapters/wa-sqlite/WASQLiteOpenFactory'; import { getNavigatorLocks } from '../../shared/navigator'; import { SharedDBWorkerConnection, SharedWASQLiteConnection } from './SharedWASQLiteConnection'; -import { WorkerWASQLiteConnection, proxyWASQLiteConnection } from './WorkerWASQLiteConnection'; +import { WorkerWASQLiteConnection } from './WorkerWASQLiteConnection'; const baseLogger = createBaseLogger(); baseLogger.useDefaults(); @@ -59,7 +59,7 @@ const openDBShared = async (options: WorkerDBOpenerOptions): Promise connection.init()), - close: Comlink.proxy(() => connection.close()), - markHold: Comlink.proxy(() => connection.markHold()), - releaseHold: Comlink.proxy((holdId: string) => connection.releaseHold(holdId)), - execute: Comlink.proxy((sql: string, params?: any[]) => connection.execute(sql, params)), - executeRaw: Comlink.proxy((sql: string, params?: any[]) => connection.executeRaw(sql, params)), - executeBatch: Comlink.proxy((sql: string, params?: any[]) => connection.executeBatch(sql, params)), - registerOnTableChange: Comlink.proxy((callback: OnTableChangeCallback) => - connection.registerOnTableChange(callback) - ), - getConfig: Comlink.proxy(() => connection.getConfig()) - }); -} - export class WorkerWASQLiteConnection extends WASqliteConnection { async registerOnTableChange(callback: OnTableChangeCallback): Promise<() => void> { // Proxy the callback remove function diff --git a/packages/web/tests/multiple_instances.test.ts b/packages/web/tests/multiple_instances.test.ts index 80b39e419..76fadaf26 100644 --- a/packages/web/tests/multiple_instances.test.ts +++ b/packages/web/tests/multiple_instances.test.ts @@ -3,26 +3,31 @@ import { createBaseLogger, createLogger, DEFAULT_CRUD_UPLOAD_THROTTLE_MS, - DEFAULT_STREAMING_SYNC_OPTIONS, SqliteBucketStorage, SyncStatus } from '@powersync/common'; import { + OpenAsyncDatabaseConnection, SharedWebStreamingSyncImplementation, SharedWebStreamingSyncImplementationOptions, + WASqliteConnection, WebRemote } from '@powersync/web'; - -import { beforeAll, describe, expect, it, vi } from 'vitest'; +import * as Comlink from 'comlink'; +import { beforeAll, describe, expect, it, onTestFinished, vi } from 'vitest'; +import { LockedAsyncDatabaseAdapter } from '../src/db/adapters/LockedAsyncDatabaseAdapter'; import { WebDBAdapter } from '../src/db/adapters/WebDBAdapter'; +import { WorkerWrappedAsyncDatabaseConnection } from '../src/db/adapters/WorkerWrappedAsyncDatabaseConnection'; import { TestConnector } from './utils/MockStreamOpenFactory'; import { generateTestDb, testSchema } from './utils/testDb'; +const DB_FILENAME = 'test-multiple-instances.db'; + describe('Multiple Instances', { sequential: true }, () => { const openDatabase = () => generateTestDb({ database: { - dbFilename: `test-multiple-instances.db` + dbFilename: DB_FILENAME }, schema: testSchema }); @@ -99,6 +104,106 @@ describe('Multiple Instances', { sequential: true }, () => { await createAsset(powersync2); }); + it('should handled interrupted transactions', { timeout: Infinity }, async () => { + //Create a shared PowerSync database. We'll just use this for internally managing connections. + const powersync = openDatabase(); + await powersync.init(); + + // Now get a shared connection to the same database + const webAdapter = powersync.database as WebDBAdapter; + + // Allow us to share the connection. This is what shared sync workers will use. + const shared = await webAdapter.shareConnection(); + const config = webAdapter.getConfiguration(); + const opener = Comlink.wrap(shared.port); + + // Open up a shared connection + const initialSharedConnection = (await opener(config)) as Comlink.Remote; + onTestFinished(async () => { + await initialSharedConnection.close(); + }); + + // This will simulate another subsequent shared connection + const subsequentSharedConnection = (await opener(config)) as Comlink.Remote; + onTestFinished(async () => { + await subsequentSharedConnection.close(); + }); + + // In the beginning, we should not be in a transaction + const isAutoCommit = await initialSharedConnection.isAutoCommit(); + // Should be true initially + expect(isAutoCommit).true; + + // Now we'll simulate the locked connections which are used by the shared sync worker + const wrappedInitialSharedConnection = new WorkerWrappedAsyncDatabaseConnection({ + baseConnection: initialSharedConnection, + identifier: DB_FILENAME, + remoteCanCloseUnexpectedly: true, + remote: opener + }); + + // Wrap the second connection in a locked adapter, this simulates the actual use case + const lockedInitialConnection = new LockedAsyncDatabaseAdapter({ + name: DB_FILENAME, + openConnection: async () => wrappedInitialSharedConnection + }); + + // Allows us to unblock a transaction which is awaiting a promise + let unblockTransaction: (() => void) | undefined; + + // Start a transaction that will be interrupted + const transactionPromise = lockedInitialConnection.writeTransaction(async (tx) => { + // Transaction should be started now + + // Wait till we are unblocked. Keep this transaction open. + await new Promise((resolve) => { + unblockTransaction = resolve; + }); + + // This should throw if the db was closed + await tx.get('SELECT 1'); + }); + + // Wait for the transaction to have started + await vi.waitFor(() => expect(unblockTransaction).toBeDefined(), { timeout: 2000 }); + + // Since we're in a transaction from above + expect(await initialSharedConnection.isAutoCommit()).false; + + // The in-use connection should be closed now + // This simulates a tab being closed. + await wrappedInitialSharedConnection.close(); + wrappedInitialSharedConnection.markRemoteClosed(); + + // The transaction should be unblocked now + unblockTransaction?.(); + + // Since we closed while in the transaction, the execution call should have thrown + await expect(transactionPromise).rejects.toThrow('Called operation on closed remote'); + + // It will still be false until we request a new hold + // Requesting a new hold will cleanup the previous transaction. + expect(await subsequentSharedConnection.isAutoCommit()).false; + + // Allows us to simulate a new locked shared connection. + const lockedSubsequentConnection = new LockedAsyncDatabaseAdapter({ + name: DB_FILENAME, + openConnection: async () => + new WorkerWrappedAsyncDatabaseConnection({ + baseConnection: subsequentSharedConnection, + identifier: DB_FILENAME, + remoteCanCloseUnexpectedly: true, + remote: opener + }) + }); + + // Starting a new transaction should work cleanup the old and work as expected + await lockedSubsequentConnection.writeTransaction(async (tx) => { + await tx.get('SELECT 1'); + expect(await subsequentSharedConnection.isAutoCommit()).false; + }); + }); + it('should watch table changes between instances', async () => { const db1 = openDatabase(); const db2 = openDatabase(); From 1ddf6bedb479a7312e083efab0c07171113400ff Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 20 Nov 2025 10:34:52 +0200 Subject: [PATCH 08/11] Update changelog --- .changeset/silver-insects-unite.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.changeset/silver-insects-unite.md b/.changeset/silver-insects-unite.md index ec6c51816..ce71d827c 100644 --- a/.changeset/silver-insects-unite.md +++ b/.changeset/silver-insects-unite.md @@ -2,4 +2,5 @@ '@powersync/web': patch --- -Fixed an issue where IndexedDB could cause "cannot start a transaction within a transaction" errors +- Fixed an issue where IndexedDB could cause "cannot start a transaction within a transaction" errors. +- Improved reconnect logic when multiple tabs are closed. From 4804531aea96f5129bbf925fff65e2a367e87fa8 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 20 Nov 2025 11:01:13 +0200 Subject: [PATCH 09/11] Code cleanup --- .../web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts | 2 +- packages/web/src/worker/db/SharedWASQLiteConnection.ts | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts index bc698b13a..e03bf8aa8 100644 --- a/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts +++ b/packages/web/src/db/adapters/WorkerWrappedAsyncDatabaseConnection.ts @@ -69,7 +69,7 @@ export class WorkerWrappedAsyncDatabaseConnection { - return this.baseConnection.isAutoCommit(); + return this.withRemote(() => this.baseConnection.isAutoCommit()); } private withRemote(workerPromise: () => Promise): Promise { diff --git a/packages/web/src/worker/db/SharedWASQLiteConnection.ts b/packages/web/src/worker/db/SharedWASQLiteConnection.ts index 3537442ab..48b9a4a41 100644 --- a/packages/web/src/worker/db/SharedWASQLiteConnection.ts +++ b/packages/web/src/worker/db/SharedWASQLiteConnection.ts @@ -96,6 +96,7 @@ export class SharedWASQLiteConnection implements AsyncDatabaseConnection { const connection = this.connection; dbMap.delete(dbFilename); await connection.close(); + return; } logger.debug(`Connection to ${dbFilename} not closed yet due to active clients.`); return; From df072236dfb3acbe9ebe9ce38523425eb0394bed Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 20 Nov 2025 11:08:35 +0200 Subject: [PATCH 10/11] maybe make node sync tests more stable --- packages/node/tests/utils.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/node/tests/utils.ts b/packages/node/tests/utils.ts index 101e6dbc7..e21d19451 100644 --- a/packages/node/tests/utils.ts +++ b/packages/node/tests/utils.ts @@ -150,6 +150,8 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{ const newConnection = async (options?: Partial) => { const db = await createDatabase(tmpdir, { + // This might help with test stability/timeouts if a retry is needed + retryDelayMs: 100, ...options, database: { dbFilename: databaseName, From 8fc41a8279d2e5b58b64bac4187f8e1ab217124b Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 20 Nov 2025 16:05:11 +0200 Subject: [PATCH 11/11] conditionally use holds --- .../src/db/adapters/LockedAsyncDatabaseAdapter.ts | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts index 5525d1b0a..2b1f2221a 100644 --- a/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts +++ b/packages/web/src/db/adapters/LockedAsyncDatabaseAdapter.ts @@ -14,6 +14,8 @@ import { getNavigatorLocks } from '../..//shared/navigator'; import { AsyncDatabaseConnection } from './AsyncDatabaseConnection'; import { SharedConnectionWorker, WebDBAdapter } from './WebDBAdapter'; import { WorkerWrappedAsyncDatabaseConnection } from './WorkerWrappedAsyncDatabaseConnection'; +import { WASQLiteVFS } from './wa-sqlite/WASQLiteConnection'; +import { ResolvedWASQLiteOpenFactoryOptions } from './wa-sqlite/WASQLiteOpenFactory'; import { ResolvedWebSQLOpenOptions } from './web-sql-flags'; /** @@ -48,6 +50,7 @@ export class LockedAsyncDatabaseAdapter protected _disposeTableChangeListener: (() => void) | null = null; private _config: ResolvedWebSQLOpenOptions | null = null; protected pendingAbortControllers: Set; + protected requiresHolds: boolean | null; closing: boolean; closed: boolean; @@ -59,6 +62,7 @@ export class LockedAsyncDatabaseAdapter this.pendingAbortControllers = new Set(); this.closed = false; this.closing = false; + this.requiresHolds = null; // Set the name if provided. We can query for the name if not available yet this.debugMode = options.debugMode ?? false; if (this.debugMode) { @@ -107,6 +111,10 @@ export class LockedAsyncDatabaseAdapter this._config = await this._db.getConfig(); await this.registerOnChangeListener(this._db); this.iterateListeners((cb) => cb.initialized?.()); + /** + * This is only required for the long-lived shared IndexedDB connections. + */ + this.requiresHolds = (this._config as ResolvedWASQLiteOpenFactoryOptions).vfs == WASQLiteVFS.IDBBatchAtomicVFS; } getConfiguration(): ResolvedWebSQLOpenOptions { @@ -229,11 +237,13 @@ export class LockedAsyncDatabaseAdapter if (timoutId) { clearTimeout(timoutId); } - const holdId = await this.baseDB.markHold(); + const holdId = this.requiresHolds ? await this.baseDB.markHold() : null; try { return await callback(); } finally { - await this.baseDB.releaseHold(holdId); + if (holdId) { + await this.baseDB.releaseHold(holdId); + } } } );