Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/breezy-heads-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/electric-db-collection': patch
---

Adds support for the new subset-end message introduced in Electric.
2 changes: 1 addition & 1 deletion packages/electric-db-collection/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"src"
],
"dependencies": {
"@electric-sql/client": "^1.2.0",
"@electric-sql/client": "^1.3.0",
"@standard-schema/spec": "^1.0.0",
"@tanstack/db": "workspace:*",
"@tanstack/store": "^0.8.0",
Expand Down
58 changes: 31 additions & 27 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,15 @@ function isSnapshotEndMessage<T extends Row<unknown>>(
return isControlMessage(message) && message.headers.control === `snapshot-end`
}

function isSubsetEndMessage<T extends Row<unknown>>(
message: Message<T>,
): message is ControlMessage & { headers: { control: `subset-end` } } {
return (
isControlMessage(message) &&
(message.headers.control as string) === `subset-end`
)
}

function parseSnapshotMessage(message: SnapshotEndMessage): PostgresSnapshot {
return {
xmin: message.headers.xmin,
Expand Down Expand Up @@ -997,8 +1006,8 @@ function createElectricSync<T extends Row<unknown>>(
})

unsubscribeStream = stream.subscribe((messages: Array<Message<T>>) => {
let hasUpToDate = false
let hasSnapshotEnd = false
// Track commit point type - up-to-date takes precedence as it also triggers progressive mode atomic swap
let commitPoint: `up-to-date` | `subset-end` | null = null

// Clear the current batch buffer at the START of processing a new batch
// This preserves messages from the previous batch until new ones arrive,
Expand Down Expand Up @@ -1075,13 +1084,19 @@ function createElectricSync<T extends Row<unknown>>(
})
}
} else if (isSnapshotEndMessage(message)) {
// Skip snapshot-end tracking during buffered initial sync (will be extracted during atomic swap)
// Track postgres snapshot metadata for resolving awaiting mutations
// Skip during buffered initial sync (will be extracted during atomic swap)
if (!isBufferingInitialSync()) {
newSnapshots.push(parseSnapshotMessage(message))
}
hasSnapshotEnd = true
} else if (isUpToDateMessage(message)) {
hasUpToDate = true
// up-to-date takes precedence - also triggers progressive mode atomic swap
commitPoint = `up-to-date`
} else if (isSubsetEndMessage(message)) {
// subset-end triggers commit but not progressive mode atomic swap
if (commitPoint !== `up-to-date`) {
commitPoint = `subset-end`
}
} else if (isMustRefetchMessage(message)) {
debug(
`${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate`,
Expand All @@ -1100,16 +1115,15 @@ function createElectricSync<T extends Row<unknown>>(
loadSubsetDedupe?.reset()

// Reset flags so we continue accumulating changes until next up-to-date
hasUpToDate = false
hasSnapshotEnd = false
commitPoint = null
hasReceivedUpToDate = false // Reset for progressive mode (isBufferingInitialSync will reflect this)
bufferedMessages.length = 0 // Clear buffered messages
}
}

if (hasUpToDate || hasSnapshotEnd) {
// PROGRESSIVE MODE: Atomic swap on first up-to-date
if (isBufferingInitialSync() && hasUpToDate) {
if (commitPoint !== null) {
// PROGRESSIVE MODE: Atomic swap on first up-to-date (not subset-end)
if (isBufferingInitialSync() && commitPoint === `up-to-date`) {
debug(
`${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages`,
)
Expand Down Expand Up @@ -1155,25 +1169,16 @@ function createElectricSync<T extends Row<unknown>>(
)
} else {
// Normal mode or on-demand: commit transaction if one was started
// In eager mode, only commit on snapshot-end if we've already received
// the first up-to-date, because the snapshot-end in the log could be from
// a significant period before the stream is actually up to date
const shouldCommit =
hasUpToDate || syncMode === `on-demand` || hasReceivedUpToDate

if (transactionStarted && shouldCommit) {
// Both up-to-date and subset-end trigger a commit
if (transactionStarted) {
commit()
transactionStarted = false
}
}

if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) {
// Mark the collection as ready now that sync is up to date
wrappedMarkReady(isBufferingInitialSync())
}
wrappedMarkReady(isBufferingInitialSync())

// Track that we've received the first up-to-date for progressive mode
if (hasUpToDate) {
if (commitPoint === `up-to-date`) {
hasReceivedUpToDate = true
}

Expand Down Expand Up @@ -1204,12 +1209,11 @@ function createElectricSync<T extends Row<unknown>>(
return seen
})

// Resolve all matched pending matches on up-to-date or snapshot-end in on-demand mode
// Resolve all matched pending matches on up-to-date or subset-end
// Set batchCommitted BEFORE resolving to avoid timing window where late awaitMatch
// calls could register as "matched" after resolver pass already ran
if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) {
batchCommitted.setState(() => true)
}
batchCommitted.setState(() => true)

resolveMatchedPendingMatches()
}
})
Expand Down
62 changes: 42 additions & 20 deletions packages/electric-db-collection/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2820,9 +2820,9 @@ describe(`Electric Integration`, () => {
expect(testCollection.status).toBe(`ready`)
})

it(`should commit on snapshot-end in eager mode AFTER first up-to-date`, () => {
it(`should commit on subset-end in eager mode`, () => {
const config = {
id: `eager-snapshot-end-test`,
id: `eager-subset-end-test`,
shapeOptions: {
url: `http://test-url`,
params: { table: `test_table` },
Expand All @@ -2834,28 +2834,45 @@ describe(`Electric Integration`, () => {

const testCollection = createCollection(electricCollectionOptions(config))

// First send up-to-date (with initial data) to establish the connection
// Send data followed by subset-end (marks end of injected subset snapshot)
// subset-end should trigger a commit
subscriber([
{
key: `1`,
value: { id: 1, name: `Test User` },
headers: { operation: `insert` },
},
{
headers: { control: `up-to-date` },
headers: { control: `subset-end` },
},
])

// Data should be committed and collection ready
expect(testCollection.has(1)).toBe(true)
expect(testCollection.get(1)).toEqual({ id: 1, name: `Test User` })
expect(testCollection.status).toBe(`ready`)
})

it(`should NOT commit on snapshot-end (only tracks metadata)`, () => {
const config = {
id: `eager-snapshot-end-no-commit-test`,
shapeOptions: {
url: `http://test-url`,
params: { table: `test_table` },
},
syncMode: `eager` as const,
getKey: (item: Row) => item.id as number,
startSync: true,
}

// Now send more data followed by snapshot-end (simulating incremental snapshot)
// After the first up-to-date, snapshot-end SHOULD commit
const testCollection = createCollection(electricCollectionOptions(config))

// Send data followed by snapshot-end
// snapshot-end should NOT trigger a commit - only up-to-date or subset-end do
subscriber([
{
key: `2`,
value: { id: 2, name: `Second User` },
key: `1`,
value: { id: 1, name: `Test User` },
headers: { operation: `insert` },
},
{
Expand All @@ -2868,15 +2885,25 @@ describe(`Electric Integration`, () => {
},
])

// Data should be committed since we've already received up-to-date
expect(testCollection.has(2)).toBe(true)
expect(testCollection.get(2)).toEqual({ id: 2, name: `Second User` })
// Data should NOT be committed yet (snapshot-end doesn't trigger commit)
expect(testCollection.has(1)).toBe(false)
expect(testCollection.status).toBe(`loading`)

// Now send up-to-date to commit
subscriber([
{
headers: { control: `up-to-date` },
},
])

// Now data should be committed
expect(testCollection.has(1)).toBe(true)
expect(testCollection.status).toBe(`ready`)
})

it(`should commit and mark ready on snapshot-end in on-demand mode`, () => {
it(`should commit and mark ready on subset-end in on-demand mode`, () => {
const config = {
id: `on-demand-snapshot-end-test`,
id: `on-demand-subset-end-test`,
shapeOptions: {
url: `http://test-url`,
params: { table: `test_table` },
Expand All @@ -2888,20 +2915,15 @@ describe(`Electric Integration`, () => {

const testCollection = createCollection(electricCollectionOptions(config))

// Send data followed by snapshot-end (but no up-to-date)
// Send data followed by subset-end (marks end of injected subset snapshot)
subscriber([
{
key: `1`,
value: { id: 1, name: `Test User` },
headers: { operation: `insert` },
},
{
headers: {
control: `snapshot-end`,
xmin: `100`,
xmax: `110`,
xip_list: [],
},
headers: { control: `subset-end` },
},
])

Expand Down
2 changes: 1 addition & 1 deletion packages/react-db/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"react": ">=16.8.0"
},
"devDependencies": {
"@electric-sql/client": "^1.2.0",
"@electric-sql/client": "^1.3.0",
"@testing-library/react": "^16.3.0",
"@types/react": "^19.2.7",
"@types/react-dom": "^19.2.3",
Expand Down
2 changes: 1 addition & 1 deletion packages/solid-db/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"solid-js": ">=1.9.0"
},
"devDependencies": {
"@electric-sql/client": "^1.2.0",
"@electric-sql/client": "^1.3.0",
"@solidjs/testing-library": "^0.8.10",
"@vitest/coverage-istanbul": "^3.2.4",
"jsdom": "^27.2.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/vue-db/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"vue": ">=3.3.0"
},
"devDependencies": {
"@electric-sql/client": "^1.2.0",
"@electric-sql/client": "^1.3.0",
"@vitejs/plugin-vue": "^6.0.2",
"@vitest/coverage-istanbul": "^3.2.4",
"vue": "^3.5.25"
Expand Down
22 changes: 11 additions & 11 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading