Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .changeset/fix-await-match-inserts.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
'@tanstack/electric-db-collection': patch
---

Fix awaitMatch race condition on inserts and export isChangeMessage/isControlMessage.

**Bug fixes:**

- Fixed race condition where `awaitMatch` would timeout on inserts when Electric synced faster than the API call
- Messages are now preserved in buffer until next batch arrives, allowing `awaitMatch` to find them
- Added `batchCommitted` flag to track commit state, consistent with `awaitTxId` semantics
- Fixed `batchCommitted` to also trigger on `snapshot-end` in `on-demand` mode (matching "ready" semantics)

**Export fixes:**

- `isChangeMessage` and `isControlMessage` are now exported from the package index as documented
38 changes: 32 additions & 6 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ export function electricCollectionOptions<T extends Row<unknown>>(
// Buffer messages since last up-to-date to handle race conditions
const currentBatchMessages = new Store<Array<Message<any>>>([])

// Track whether the current batch has been committed (up-to-date received)
// This allows awaitMatch to resolve immediately for messages from committed batches
const batchCommitted = new Store<boolean>(false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Not sure this needs to be a store as we are not reactively watching it? I guess Claude is following the pattern in the file.

(we don't really need store in here at all)


/**
* Helper function to remove multiple matches from the pendingMatches store
*/
Expand Down Expand Up @@ -560,6 +564,7 @@ export function electricCollectionOptions<T extends Row<unknown>>(
syncMode: internalSyncMode,
pendingMatches,
currentBatchMessages,
batchCommitted,
removePendingMatches,
resolveMatchedPendingMatches,
collectionId: config.id,
Expand Down Expand Up @@ -689,18 +694,29 @@ export function electricCollectionOptions<T extends Row<unknown>>(
// Check against current batch messages first to handle race conditions
for (const message of currentBatchMessages.state) {
if (matchFn(message)) {
// If batch is committed (up-to-date already received), resolve immediately
// just like awaitTxId does when it finds a txid in seenTxids
if (batchCommitted.state) {
debug(
`${config.id ? `[${config.id}] ` : ``}awaitMatch found immediate match in committed batch, resolving immediately`,
)
clearTimeout(timeoutId)
resolve(true)
return
}

// If batch is not yet committed, register match and wait for up-to-date
debug(
`${config.id ? `[${config.id}] ` : ``}awaitMatch found immediate match in current batch, waiting for up-to-date`,
)
// Register match as already matched
pendingMatches.setState((current) => {
const newMatches = new Map(current)
newMatches.set(matchId, {
matchFn: checkMatch,
resolve,
reject,
timeoutId,
matched: true, // Already matched
matched: true, // Already matched, will resolve on up-to-date
})
return newMatches
})
Expand Down Expand Up @@ -831,6 +847,7 @@ function createElectricSync<T extends Row<unknown>>(
>
>
currentBatchMessages: Store<Array<Message<T>>>
batchCommitted: Store<boolean>
removePendingMatches: (matchIds: Array<string>) => void
resolveMatchedPendingMatches: () => void
collectionId?: string
Expand All @@ -843,6 +860,7 @@ function createElectricSync<T extends Row<unknown>>(
syncMode,
pendingMatches,
currentBatchMessages,
batchCommitted,
removePendingMatches,
resolveMatchedPendingMatches,
collectionId,
Expand Down Expand Up @@ -982,6 +1000,12 @@ function createElectricSync<T extends Row<unknown>>(
let hasUpToDate = false
let hasSnapshotEnd = false

// Clear the current batch buffer at the START of processing a new batch
// This preserves messages from the previous batch until new ones arrive,
// allowing awaitMatch to find messages even if called after up-to-date
currentBatchMessages.setState(() => [])
batchCommitted.setState(() => false)

for (const message of messages) {
// Add message to current batch buffer (for race condition handling)
if (isChangeMessage(message)) {
Expand Down Expand Up @@ -1143,9 +1167,6 @@ function createElectricSync<T extends Row<unknown>>(
}
}

// Clear the current batch buffer since we're now up-to-date
currentBatchMessages.setState(() => [])

if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) {
// Mark the collection as ready now that sync is up to date
wrappedMarkReady(isBufferingInitialSync())
Expand Down Expand Up @@ -1183,7 +1204,12 @@ function createElectricSync<T extends Row<unknown>>(
return seen
})

// Resolve all matched pending matches on up-to-date
// Resolve all matched pending matches on up-to-date or snapshot-end in on-demand mode
// Set batchCommitted BEFORE resolving to avoid timing window where late awaitMatch
// calls could register as "matched" after resolver pass already ran
if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) {
batchCommitted.setState(() => true)
}
resolveMatchedPendingMatches()
}
})
Expand Down
2 changes: 2 additions & 0 deletions packages/electric-db-collection/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export {
electricCollectionOptions,
isChangeMessage,
isControlMessage,
type ElectricCollectionConfig,
type ElectricCollectionUtils,
type Txid,
Expand Down
Loading
Loading