From c128403a7b918692df18f42cd2606d14b37c0c97 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 10 Dec 2025 23:18:44 +0000 Subject: [PATCH] fix(electric-db-collection): fix awaitMatch race condition on inserts and export isChangeMessage Two issues fixed: 1. **isChangeMessage not exported**: The `isChangeMessage` and `isControlMessage` utilities were exported from electric.ts but not re-exported from the package's index.ts, making them unavailable to users despite documentation stating otherwise. 2. **awaitMatch race condition on inserts**: When `awaitMatch` was called after the Electric messages had already been processed (including up-to-date), it would timeout because: - The message buffer (`currentBatchMessages`) was cleared on up-to-date - Immediate matches found in the buffer still waited for another up-to-date to resolve Fixed by: - Moving buffer clearing to the START of batch processing (preserves messages until next batch) - Adding `batchCommitted` flag to track when a batch is committed - For immediate matches: resolve immediately only if batch is committed (consistent with awaitTxId) - For immediate matches during batch processing: wait for up-to-date (maintains commit semantics) - Set `batchCommitted` BEFORE `resolveMatchedPendingMatches()` to avoid timing window - Set `batchCommitted` on snapshot-end in on-demand mode (matching "ready" semantics) Fixes issue reported on Discord where inserts would timeout while updates worked. --- .changeset/fix-await-match-inserts.md | 16 + .../electric-db-collection/src/electric.ts | 38 +- packages/electric-db-collection/src/index.ts | 2 + .../tests/electric.test.ts | 494 ++++++++++++++++++ 4 files changed, 544 insertions(+), 6 deletions(-) create mode 100644 .changeset/fix-await-match-inserts.md diff --git a/.changeset/fix-await-match-inserts.md b/.changeset/fix-await-match-inserts.md new file mode 100644 index 000000000..afcd3a6c8 --- /dev/null +++ b/.changeset/fix-await-match-inserts.md @@ -0,0 +1,16 @@ +--- +'@tanstack/electric-db-collection': patch +--- + +Fix awaitMatch race condition on inserts and export isChangeMessage/isControlMessage. + +**Bug fixes:** + +- Fixed race condition where `awaitMatch` would timeout on inserts when Electric synced faster than the API call +- Messages are now preserved in buffer until next batch arrives, allowing `awaitMatch` to find them +- Added `batchCommitted` flag to track commit state, consistent with `awaitTxId` semantics +- Fixed `batchCommitted` to also trigger on `snapshot-end` in `on-demand` mode (matching "ready" semantics) + +**Export fixes:** + +- `isChangeMessage` and `isControlMessage` are now exported from the package index as documented diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index d8a0e2bcb..bc9d4bdd4 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -523,6 +523,10 @@ export function electricCollectionOptions>( // Buffer messages since last up-to-date to handle race conditions const currentBatchMessages = new Store>>([]) + // Track whether the current batch has been committed (up-to-date received) + // This allows awaitMatch to resolve immediately for messages from committed batches + const batchCommitted = new Store(false) + /** * Helper function to remove multiple matches from the pendingMatches store */ @@ -560,6 +564,7 @@ export function electricCollectionOptions>( syncMode: internalSyncMode, pendingMatches, currentBatchMessages, + batchCommitted, removePendingMatches, resolveMatchedPendingMatches, collectionId: config.id, @@ -689,10 +694,21 @@ export function electricCollectionOptions>( // Check against current batch messages first to handle race conditions for (const message of currentBatchMessages.state) { if (matchFn(message)) { + // If batch is committed (up-to-date already received), resolve immediately + // just like awaitTxId does when it finds a txid in seenTxids + if (batchCommitted.state) { + debug( + `${config.id ? `[${config.id}] ` : ``}awaitMatch found immediate match in committed batch, resolving immediately`, + ) + clearTimeout(timeoutId) + resolve(true) + return + } + + // If batch is not yet committed, register match and wait for up-to-date debug( `${config.id ? `[${config.id}] ` : ``}awaitMatch found immediate match in current batch, waiting for up-to-date`, ) - // Register match as already matched pendingMatches.setState((current) => { const newMatches = new Map(current) newMatches.set(matchId, { @@ -700,7 +716,7 @@ export function electricCollectionOptions>( resolve, reject, timeoutId, - matched: true, // Already matched + matched: true, // Already matched, will resolve on up-to-date }) return newMatches }) @@ -831,6 +847,7 @@ function createElectricSync>( > > currentBatchMessages: Store>> + batchCommitted: Store removePendingMatches: (matchIds: Array) => void resolveMatchedPendingMatches: () => void collectionId?: string @@ -843,6 +860,7 @@ function createElectricSync>( syncMode, pendingMatches, currentBatchMessages, + batchCommitted, removePendingMatches, resolveMatchedPendingMatches, collectionId, @@ -982,6 +1000,12 @@ function createElectricSync>( let hasUpToDate = false let hasSnapshotEnd = false + // Clear the current batch buffer at the START of processing a new batch + // This preserves messages from the previous batch until new ones arrive, + // allowing awaitMatch to find messages even if called after up-to-date + currentBatchMessages.setState(() => []) + batchCommitted.setState(() => false) + for (const message of messages) { // Add message to current batch buffer (for race condition handling) if (isChangeMessage(message)) { @@ -1143,9 +1167,6 @@ function createElectricSync>( } } - // Clear the current batch buffer since we're now up-to-date - currentBatchMessages.setState(() => []) - if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) { // Mark the collection as ready now that sync is up to date wrappedMarkReady(isBufferingInitialSync()) @@ -1183,7 +1204,12 @@ function createElectricSync>( return seen }) - // Resolve all matched pending matches on up-to-date + // Resolve all matched pending matches on up-to-date or snapshot-end in on-demand mode + // Set batchCommitted BEFORE resolving to avoid timing window where late awaitMatch + // calls could register as "matched" after resolver pass already ran + if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) { + batchCommitted.setState(() => true) + } resolveMatchedPendingMatches() } }) diff --git a/packages/electric-db-collection/src/index.ts b/packages/electric-db-collection/src/index.ts index 87c98d24e..31f615e56 100644 --- a/packages/electric-db-collection/src/index.ts +++ b/packages/electric-db-collection/src/index.ts @@ -1,5 +1,7 @@ export { electricCollectionOptions, + isChangeMessage, + isControlMessage, type ElectricCollectionConfig, type ElectricCollectionUtils, type Txid, diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index d5d74e767..be2031bf6 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -892,6 +892,477 @@ describe(`Electric Integration`, () => { vi.useRealTimers() }) + + it(`should find matching message in awaitMatch even when called after up-to-date`, async () => { + // This test verifies the fix for the race condition where: + // 1. Server receives insert and syncs it via Electric + // 2. Electric messages (including up-to-date) are processed + // 3. THEN awaitMatch is called - it should still find the message + + let resolveServerCall: () => void + const serverCallPromise = new Promise((resolve) => { + resolveServerCall = resolve + }) + + const onInsert = vi + .fn() + .mockImplementation(async ({ transaction, collection: col }) => { + const item = transaction.mutations[0].modified + + // Simulate waiting for server call to complete + // During this time, Electric messages will arrive and be processed + await serverCallPromise + + // Now awaitMatch is called AFTER the messages were processed + // This should still find the message in the buffer + await col.utils.awaitMatch((message: any) => { + return ( + isChangeMessage(message) && + message.headers.operation === `insert` && + message.value.id === item.id + ) + }, 5000) + }) + + const config = { + id: `test-await-match-after-up-to-date`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Start insert - will call onInsert which waits for serverCallPromise + const insertPromise = testCollection.insert({ + id: 42, + name: `Race Condition Test`, + }) + + // Wait for onInsert to start and reach the await serverCallPromise + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Send Electric messages while onInsert is waiting for serverCallPromise + // This simulates the race condition where messages arrive while API call is in progress + subscriber([ + { + key: `42`, + value: { id: 42, name: `Race Condition Test` }, + headers: { operation: `insert` }, + }, + { headers: { control: `up-to-date` } }, + ]) + + // Wait a tick to ensure messages are processed + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Now resolve the server call - awaitMatch will be called AFTER messages were processed + resolveServerCall!() + + // Should complete successfully - awaitMatch should find the message in the buffer + await insertPromise.isPersisted.promise + + expect(onInsert).toHaveBeenCalled() + expect(testCollection.has(42)).toBe(true) + }) + + it(`should wait for up-to-date when match found during batch processing`, async () => { + // This test verifies that if awaitMatch finds a match while the batch + // is still being processed (before up-to-date), it waits for up-to-date + // before resolving, ensuring data is committed before returning + + let matchFoundTime: number | undefined + let resolveTime: number | undefined + + const onInsert = vi + .fn() + .mockImplementation(async ({ transaction, collection: col }) => { + const item = transaction.mutations[0].modified + + await col.utils.awaitMatch((message: any) => { + if ( + isChangeMessage(message) && + message.headers.operation === `insert` && + message.value.id === item.id + ) { + matchFoundTime = Date.now() + return true + } + return false + }, 5000) + resolveTime = Date.now() + }) + + const config = { + id: `test-wait-for-up-to-date`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + const insertPromise = testCollection.insert({ + id: 100, + name: `Wait Test`, + }) + + // Send insert message first, then up-to-date after a delay + // This simulates awaitMatch finding the message before up-to-date + setTimeout(() => { + subscriber([ + { + key: `100`, + value: { id: 100, name: `Wait Test` }, + headers: { operation: `insert` }, + }, + ]) + }, 50) + + // Send up-to-date after another delay + setTimeout(() => { + subscriber([{ headers: { control: `up-to-date` } }]) + }, 150) + + await insertPromise.isPersisted.promise + + expect(onInsert).toHaveBeenCalled() + // Match should have been found before resolve (both times should exist) + expect(matchFoundTime).toBeDefined() + expect(resolveTime).toBeDefined() + // Verify that resolve happened AFTER match was found (waited for up-to-date) + expect(resolveTime).toBeGreaterThanOrEqual(matchFoundTime!) + }) + + it(`should clear buffer on new batch and match new messages`, async () => { + // Verify that when a new batch arrives, the old buffer is cleared + // and awaitMatch correctly matches messages from the new batch + + let resolveServerCall: () => void + const serverCallPromise = new Promise((resolve) => { + resolveServerCall = resolve + }) + + const onInsert = vi + .fn() + .mockImplementation(async ({ transaction, collection: col }) => { + const item = transaction.mutations[0].modified + + // Simulate API call + await serverCallPromise + + await col.utils.awaitMatch((message: any) => { + return ( + isChangeMessage(message) && + message.headers.operation === `insert` && + message.value.id === item.id + ) + }, 5000) + }) + + const config = { + id: `test-buffer-clearing`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Start insert for item 201 + const insertPromise = testCollection.insert({ + id: 201, + name: `Second Item`, + }) + + // Wait for onInsert to start + await new Promise((resolve) => setTimeout(resolve, 10)) + + // First batch - insert different item 200 (simulating other sync activity) + subscriber([ + { + key: `200`, + value: { id: 200, name: `First Item` }, + headers: { operation: `insert` }, + }, + { headers: { control: `up-to-date` } }, + ]) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Second batch - insert item 201 (our target) + subscriber([ + { + key: `201`, + value: { id: 201, name: `Second Item` }, + headers: { operation: `insert` }, + }, + { headers: { control: `up-to-date` } }, + ]) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Resolve server call - awaitMatch should find item 201 in the buffer + // even though item 200 from the first batch is no longer in the buffer + resolveServerCall!() + + await insertPromise.isPersisted.promise + + expect(testCollection.has(201)).toBe(true) + }) + + it(`should timeout when no match in committed batch and no new messages`, async () => { + // Verify that awaitMatch times out when the message isn't found + + const onInsert = vi + .fn() + .mockImplementation(async ({ collection: col }) => { + // Look for a message that doesn't exist + await col.utils.awaitMatch( + (message: any) => + isChangeMessage(message) && message.value.id === 999, + 100, // Short timeout + ) + }) + + const config = { + id: `test-no-match-timeout`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Send a batch with different items + subscriber([ + { + key: `300`, + value: { id: 300, name: `Wrong Item` }, + headers: { operation: `insert` }, + }, + { headers: { control: `up-to-date` } }, + ]) + + await new Promise((resolve) => setTimeout(resolve, 20)) + + // Insert looking for id 999 which doesn't exist + const tx = testCollection.insert({ id: 301, name: `Test` }) + + await expect(tx.isPersisted.promise).rejects.toThrow( + `Timeout waiting for custom match function`, + ) + }) + + it(`should resolve multiple awaitMatch calls from same committed batch`, async () => { + // Verify that multiple concurrent awaitMatch calls can all find their + // respective messages in the same committed batch + + const matches: Array = [] + const serverCalls: Array<{ resolve: () => void }> = [] + + const onInsert = vi + .fn() + .mockImplementation(async ({ transaction, collection: col }) => { + const item = transaction.mutations[0].modified + + // Create a promise for this insert's "API call" + const serverCallPromise = new Promise((resolve) => { + serverCalls.push({ resolve }) + }) + await serverCallPromise + + await col.utils.awaitMatch((message: any) => { + if ( + isChangeMessage(message) && + message.headers.operation === `insert` && + message.value.id === item.id + ) { + matches.push(item.id) + return true + } + return false + }, 5000) + }) + + const config = { + id: `test-multiple-matches`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Start all three inserts concurrently + const insert1 = testCollection.insert({ id: 400, name: `Item A` }) + const insert2 = testCollection.insert({ id: 401, name: `Item B` }) + const insert3 = testCollection.insert({ id: 402, name: `Item C` }) + + // Wait for all onInsert handlers to start + await new Promise((resolve) => setTimeout(resolve, 20)) + + // Send batch with all three items (simulating fast sync from server) + subscriber([ + { + key: `400`, + value: { id: 400, name: `Item A` }, + headers: { operation: `insert` }, + }, + { + key: `401`, + value: { id: 401, name: `Item B` }, + headers: { operation: `insert` }, + }, + { + key: `402`, + value: { id: 402, name: `Item C` }, + headers: { operation: `insert` }, + }, + { headers: { control: `up-to-date` } }, + ]) + + await new Promise((resolve) => setTimeout(resolve, 20)) + + // Resolve all server calls - all awaitMatch calls should find their messages + serverCalls.forEach((call) => call.resolve()) + + await Promise.all([ + insert1.isPersisted.promise, + insert2.isPersisted.promise, + insert3.isPersisted.promise, + ]) + + expect(matches).toContain(400) + expect(matches).toContain(401) + expect(matches).toContain(402) + expect(testCollection.has(400)).toBe(true) + expect(testCollection.has(401)).toBe(true) + expect(testCollection.has(402)).toBe(true) + }) + + it(`should handle awaitMatch across multiple sequential batches`, async () => { + // Real-world scenario: continuous sync with multiple batches + // Each insert should find its matching message in the committed buffer + + const serverCalls: Array<{ resolve: () => void; id: number }> = [] + + const onInsert = vi + .fn() + .mockImplementation(async ({ transaction, collection: col }) => { + const item = transaction.mutations[0].modified + + // Create a promise for this insert's "API call" + const serverCallPromise = new Promise((resolve) => { + serverCalls.push({ resolve, id: item.id }) + }) + await serverCallPromise + + await col.utils.awaitMatch((message: any) => { + return ( + isChangeMessage(message) && + message.headers.operation === `insert` && + message.value.id === item.id + ) + }, 5000) + }) + + const config = { + id: `test-sequential-batches`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Insert 1 - starts waiting on server call + const insert1 = testCollection.insert({ id: 500, name: `Batch 1 Item` }) + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Batch 1 arrives with item 500 + subscriber([ + { + key: `500`, + value: { id: 500, name: `Batch 1 Item` }, + headers: { operation: `insert` }, + }, + { headers: { control: `up-to-date` } }, + ]) + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Resolve server call for insert 1 + const call1 = serverCalls.find((c) => c.id === 500) + call1?.resolve() + await insert1.isPersisted.promise + expect(testCollection.has(500)).toBe(true) + + // Insert 2 - starts waiting on server call + const insert2 = testCollection.insert({ id: 501, name: `Batch 2 Item` }) + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Batch 2 arrives (clears batch 1 from buffer) + subscriber([ + { + key: `501`, + value: { id: 501, name: `Batch 2 Item` }, + headers: { operation: `insert` }, + }, + { headers: { control: `up-to-date` } }, + ]) + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Resolve server call for insert 2 + const call2 = serverCalls.find((c) => c.id === 501) + call2?.resolve() + await insert2.isPersisted.promise + expect(testCollection.has(501)).toBe(true) + + // Insert 3 - starts waiting on server call + const insert3 = testCollection.insert({ id: 502, name: `Batch 3 Item` }) + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Batch 3 arrives + subscriber([ + { + key: `502`, + value: { id: 502, name: `Batch 3 Item` }, + headers: { operation: `insert` }, + }, + { headers: { control: `up-to-date` } }, + ]) + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Resolve server call for insert 3 + const call3 = serverCalls.find((c) => c.id === 502) + call3?.resolve() + await insert3.isPersisted.promise + expect(testCollection.has(502)).toBe(true) + }) }) // Tests for matching strategies utilities @@ -914,6 +1385,29 @@ describe(`Electric Integration`, () => { expect(isChangeMessage(controlMessage)).toBe(false) }) + it(`should export isChangeMessage and isControlMessage from package index`, async () => { + // Verify that the exports are available from the package's public API + // This tests the fix for the missing exports in index.ts + const exports = await import(`../src/index`) + + expect(typeof exports.isChangeMessage).toBe(`function`) + expect(typeof exports.isControlMessage).toBe(`function`) + + // Verify they work correctly + const changeMessage = { + key: `1`, + value: { id: 1, name: `Test` }, + headers: { operation: `insert` as const }, + } + expect(exports.isChangeMessage(changeMessage)).toBe(true) + + const controlMessage = { + headers: { control: `up-to-date` as const }, + } + expect(exports.isControlMessage(controlMessage)).toBe(true) + expect(exports.isChangeMessage(controlMessage)).toBe(false) + }) + it(`should provide awaitMatch utility in collection utils`, () => { const config = { id: `test-await-match`,