From 2ac99c15c3b21171b92d186b8a72f4f566567fe7 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 11 Dec 2025 14:57:13 +0100 Subject: [PATCH 1/6] Upgrade @electric-sql/client to preview version --- packages/electric-db-collection/package.json | 2 +- packages/react-db/package.json | 2 +- packages/solid-db/package.json | 2 +- packages/vue-db/package.json | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/electric-db-collection/package.json b/packages/electric-db-collection/package.json index 74cc0d874..e4733f5c7 100644 --- a/packages/electric-db-collection/package.json +++ b/packages/electric-db-collection/package.json @@ -46,7 +46,7 @@ "src" ], "dependencies": { - "@electric-sql/client": "^1.2.0", + "@electric-sql/client": "^1.3.0", "@standard-schema/spec": "^1.0.0", "@tanstack/db": "workspace:*", "@tanstack/store": "^0.8.0", diff --git a/packages/react-db/package.json b/packages/react-db/package.json index 2b98dc3ba..be648fa00 100644 --- a/packages/react-db/package.json +++ b/packages/react-db/package.json @@ -52,7 +52,7 @@ "react": ">=16.8.0" }, "devDependencies": { - "@electric-sql/client": "^1.2.0", + "@electric-sql/client": "^1.3.0", "@testing-library/react": "^16.3.0", "@types/react": "^19.2.7", "@types/react-dom": "^19.2.3", diff --git a/packages/solid-db/package.json b/packages/solid-db/package.json index 1cab7abf5..a467fb505 100644 --- a/packages/solid-db/package.json +++ b/packages/solid-db/package.json @@ -47,7 +47,7 @@ "solid-js": ">=1.9.0" }, "devDependencies": { - "@electric-sql/client": "^1.2.0", + "@electric-sql/client": "^1.3.0", "@solidjs/testing-library": "^0.8.10", "@vitest/coverage-istanbul": "^3.2.4", "jsdom": "^27.2.0", diff --git a/packages/vue-db/package.json b/packages/vue-db/package.json index 3c893b63b..b4a5642ab 100644 --- a/packages/vue-db/package.json +++ b/packages/vue-db/package.json @@ -50,7 +50,7 @@ "vue": ">=3.3.0" }, "devDependencies": { - "@electric-sql/client": "^1.2.0", + "@electric-sql/client": "^1.3.0", "@vitejs/plugin-vue": "^6.0.2", "@vitest/coverage-istanbul": "^3.2.4", "vue": "^3.5.25" From 127daedf8d80fbeee7499f14cb2fbd448cd492f9 Mon Sep 17 00:00:00 2001 From: Ilia Borovitinov Date: Wed, 17 Dec 2025 13:20:32 +0300 Subject: [PATCH 2/6] lockfile --- pnpm-lock.yaml | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 79344992a..00158bcf9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -769,8 +769,8 @@ importers: packages/electric-db-collection: dependencies: '@electric-sql/client': - specifier: ^1.2.0 - version: 1.2.0 + specifier: ^1.3.0 + version: 1.3.0 '@standard-schema/spec': specifier: ^1.0.0 version: 1.0.0 @@ -876,8 +876,8 @@ importers: version: 1.6.0(react@19.2.1) devDependencies: '@electric-sql/client': - specifier: ^1.2.0 - version: 1.2.0 + specifier: ^1.3.0 + version: 1.3.0 '@testing-library/react': specifier: ^16.3.0 version: 16.3.0(@testing-library/dom@10.4.1)(@types/react-dom@19.2.3(@types/react@19.2.7))(@types/react@19.2.7)(react-dom@19.2.1(react@19.2.1))(react@19.2.1) @@ -941,8 +941,8 @@ importers: version: link:../db devDependencies: '@electric-sql/client': - specifier: ^1.2.0 - version: 1.2.0 + specifier: ^1.3.0 + version: 1.3.0 '@solidjs/testing-library': specifier: ^0.8.10 version: 0.8.10(solid-js@1.9.10) @@ -1022,8 +1022,8 @@ importers: version: link:../db devDependencies: '@electric-sql/client': - specifier: ^1.2.0 - version: 1.2.0 + specifier: ^1.3.0 + version: 1.3.0 '@vitejs/plugin-vue': specifier: ^6.0.2 version: 6.0.2(vite@7.2.6(@types/node@24.7.0)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.90.0)(terser@5.44.0)(tsx@4.21.0)(yaml@2.8.1))(vue@3.5.25(typescript@5.9.3)) @@ -1533,8 +1533,8 @@ packages: '@drizzle-team/brocli@0.10.2': resolution: {integrity: sha512-z33Il7l5dKjUgGULTqBsQBQwckHh5AbIuxhdsIxDDiZAzBOrZO6q9ogcWC65kU382AfynTfgNumVcNIjuIua6w==} - '@electric-sql/client@1.2.0': - resolution: {integrity: sha512-K/MEjti3UF4aPKJJqO6Tp4f5noqc2/3icU1NPdpKfQaHwbzGtEX2aJmL2vxTEUJbfyrISkPKbOPnrz/lAvw1Vg==} + '@electric-sql/client@1.3.0': + resolution: {integrity: sha512-6pVQOBmocDBj5diGCOcyekL9gsa2Kk3j/eN3fytzLYKxqUguvuojMisoYPexHgOchJlcMzR6qpJTYj42npWWxw==} '@emnapi/core@1.5.0': resolution: {integrity: sha512-sbP8GzB1WDzacS8fgNPpHlp6C9VZe+SJP3F90W9rLemaQj2PzIuTEl1qDOYQf58YIpyjViI24y9aPWCjEzY2cg==} @@ -9587,7 +9587,7 @@ snapshots: '@drizzle-team/brocli@0.10.2': {} - '@electric-sql/client@1.2.0': + '@electric-sql/client@1.3.0': dependencies: '@microsoft/fetch-event-source': 2.0.1 optionalDependencies: From 4229a3253158d03735d8cd474077ac6ddf83de25 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 11 Dec 2025 15:09:31 +0100 Subject: [PATCH 3/6] Handle subset-end message --- .../electric-db-collection/src/electric.ts | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index bc9d4bdd4..6ad40a318 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -288,6 +288,15 @@ function isSnapshotEndMessage>( return isControlMessage(message) && message.headers.control === `snapshot-end` } +function isSubsetEndMessage>( + message: Message, +): message is ControlMessage & { headers: { control: `subset-end` } } { + return ( + isControlMessage(message) && + (message.headers.control as string) === `subset-end` + ) +} + function parseSnapshotMessage(message: SnapshotEndMessage): PostgresSnapshot { return { xmin: message.headers.xmin, @@ -998,7 +1007,7 @@ function createElectricSync>( unsubscribeStream = stream.subscribe((messages: Array>) => { let hasUpToDate = false - let hasSnapshotEnd = false + let hasSubsetEnd = false // Clear the current batch buffer at the START of processing a new batch // This preserves messages from the previous batch until new ones arrive, @@ -1075,11 +1084,14 @@ function createElectricSync>( }) } } else if (isSnapshotEndMessage(message)) { - // Skip snapshot-end tracking during buffered initial sync (will be extracted during atomic swap) + // Track postgres snapshot metadata for resolving awaiting mutations + // Skip during buffered initial sync (will be extracted during atomic swap) if (!isBufferingInitialSync()) { newSnapshots.push(parseSnapshotMessage(message)) } - hasSnapshotEnd = true + } else if (isSubsetEndMessage(message)) { + // subset-end marks the end of an injected subset snapshot - treat like up-to-date for commit + hasSubsetEnd = true } else if (isUpToDateMessage(message)) { hasUpToDate = true } else if (isMustRefetchMessage(message)) { @@ -1101,13 +1113,13 @@ function createElectricSync>( // Reset flags so we continue accumulating changes until next up-to-date hasUpToDate = false - hasSnapshotEnd = false + hasSubsetEnd = false hasReceivedUpToDate = false // Reset for progressive mode (isBufferingInitialSync will reflect this) bufferedMessages.length = 0 // Clear buffered messages } } - if (hasUpToDate || hasSnapshotEnd) { + if (hasUpToDate || hasSubsetEnd) { // PROGRESSIVE MODE: Atomic swap on first up-to-date if (isBufferingInitialSync() && hasUpToDate) { debug( @@ -1155,19 +1167,14 @@ function createElectricSync>( ) } else { // Normal mode or on-demand: commit transaction if one was started - // In eager mode, only commit on snapshot-end if we've already received - // the first up-to-date, because the snapshot-end in the log could be from - // a significant period before the stream is actually up to date - const shouldCommit = - hasUpToDate || syncMode === `on-demand` || hasReceivedUpToDate - - if (transactionStarted && shouldCommit) { + // Both up-to-date and subset-end trigger a commit + if (transactionStarted) { commit() transactionStarted = false } } - if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) { + if (hasUpToDate || (hasSubsetEnd && syncMode === `on-demand`)) { // Mark the collection as ready now that sync is up to date wrappedMarkReady(isBufferingInitialSync()) } From d8e175e015d0dd7eed964194a9aeaba3ab8e036e Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 11 Dec 2025 15:09:54 +0100 Subject: [PATCH 4/6] Update unit tests to include subset-end tests --- .../tests/electric.test.ts | 62 +++++++++++++------ 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index be2031bf6..87d5567e8 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -2820,9 +2820,9 @@ describe(`Electric Integration`, () => { expect(testCollection.status).toBe(`ready`) }) - it(`should commit on snapshot-end in eager mode AFTER first up-to-date`, () => { + it(`should commit on subset-end in eager mode`, () => { const config = { - id: `eager-snapshot-end-test`, + id: `eager-subset-end-test`, shapeOptions: { url: `http://test-url`, params: { table: `test_table` }, @@ -2834,7 +2834,8 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) - // First send up-to-date (with initial data) to establish the connection + // Send data followed by subset-end (marks end of injected subset snapshot) + // subset-end should trigger a commit subscriber([ { key: `1`, @@ -2842,20 +2843,36 @@ describe(`Electric Integration`, () => { headers: { operation: `insert` }, }, { - headers: { control: `up-to-date` }, + headers: { control: `subset-end` }, }, ]) // Data should be committed and collection ready expect(testCollection.has(1)).toBe(true) + expect(testCollection.get(1)).toEqual({ id: 1, name: `Test User` }) expect(testCollection.status).toBe(`ready`) + }) + + it(`should NOT commit on snapshot-end (only tracks metadata)`, () => { + const config = { + id: `eager-snapshot-end-no-commit-test`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + syncMode: `eager` as const, + getKey: (item: Row) => item.id as number, + startSync: true, + } - // Now send more data followed by snapshot-end (simulating incremental snapshot) - // After the first up-to-date, snapshot-end SHOULD commit + const testCollection = createCollection(electricCollectionOptions(config)) + + // Send data followed by snapshot-end + // snapshot-end should NOT trigger a commit - only up-to-date or subset-end do subscriber([ { - key: `2`, - value: { id: 2, name: `Second User` }, + key: `1`, + value: { id: 1, name: `Test User` }, headers: { operation: `insert` }, }, { @@ -2868,15 +2885,25 @@ describe(`Electric Integration`, () => { }, ]) - // Data should be committed since we've already received up-to-date - expect(testCollection.has(2)).toBe(true) - expect(testCollection.get(2)).toEqual({ id: 2, name: `Second User` }) + // Data should NOT be committed yet (snapshot-end doesn't trigger commit) + expect(testCollection.has(1)).toBe(false) + expect(testCollection.status).toBe(`loading`) + + // Now send up-to-date to commit + subscriber([ + { + headers: { control: `up-to-date` }, + }, + ]) + + // Now data should be committed + expect(testCollection.has(1)).toBe(true) expect(testCollection.status).toBe(`ready`) }) - it(`should commit and mark ready on snapshot-end in on-demand mode`, () => { + it(`should commit and mark ready on subset-end in on-demand mode`, () => { const config = { - id: `on-demand-snapshot-end-test`, + id: `on-demand-subset-end-test`, shapeOptions: { url: `http://test-url`, params: { table: `test_table` }, @@ -2888,7 +2915,7 @@ describe(`Electric Integration`, () => { const testCollection = createCollection(electricCollectionOptions(config)) - // Send data followed by snapshot-end (but no up-to-date) + // Send data followed by subset-end (marks end of injected subset snapshot) subscriber([ { key: `1`, @@ -2896,12 +2923,7 @@ describe(`Electric Integration`, () => { headers: { operation: `insert` }, }, { - headers: { - control: `snapshot-end`, - xmin: `100`, - xmax: `110`, - xip_list: [], - }, + headers: { control: `subset-end` }, }, ]) From c9f964362939c038ffaea97a42db25015c0d6d24 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 11 Dec 2025 15:12:40 +0100 Subject: [PATCH 5/6] Changeset --- .changeset/breezy-heads-relax.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/breezy-heads-relax.md diff --git a/.changeset/breezy-heads-relax.md b/.changeset/breezy-heads-relax.md new file mode 100644 index 000000000..e21e8df65 --- /dev/null +++ b/.changeset/breezy-heads-relax.md @@ -0,0 +1,5 @@ +--- +'@tanstack/electric-db-collection': patch +--- + +Adds support for the new subset-end message introduced in Electric. From e96d6ed1c7f5b19671852f3a0507cd1f702c0197 Mon Sep 17 00:00:00 2001 From: Kevin De Porre Date: Thu, 11 Dec 2025 15:20:31 +0100 Subject: [PATCH 6/6] Unify the hasUpToDate and hasSubsetEnd variables into a commitPoint variable --- .../electric-db-collection/src/electric.ts | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 6ad40a318..4598768fe 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -1006,8 +1006,8 @@ function createElectricSync>( }) unsubscribeStream = stream.subscribe((messages: Array>) => { - let hasUpToDate = false - let hasSubsetEnd = false + // Track commit point type - up-to-date takes precedence as it also triggers progressive mode atomic swap + let commitPoint: `up-to-date` | `subset-end` | null = null // Clear the current batch buffer at the START of processing a new batch // This preserves messages from the previous batch until new ones arrive, @@ -1089,11 +1089,14 @@ function createElectricSync>( if (!isBufferingInitialSync()) { newSnapshots.push(parseSnapshotMessage(message)) } - } else if (isSubsetEndMessage(message)) { - // subset-end marks the end of an injected subset snapshot - treat like up-to-date for commit - hasSubsetEnd = true } else if (isUpToDateMessage(message)) { - hasUpToDate = true + // up-to-date takes precedence - also triggers progressive mode atomic swap + commitPoint = `up-to-date` + } else if (isSubsetEndMessage(message)) { + // subset-end triggers commit but not progressive mode atomic swap + if (commitPoint !== `up-to-date`) { + commitPoint = `subset-end` + } } else if (isMustRefetchMessage(message)) { debug( `${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate`, @@ -1112,16 +1115,15 @@ function createElectricSync>( loadSubsetDedupe?.reset() // Reset flags so we continue accumulating changes until next up-to-date - hasUpToDate = false - hasSubsetEnd = false + commitPoint = null hasReceivedUpToDate = false // Reset for progressive mode (isBufferingInitialSync will reflect this) bufferedMessages.length = 0 // Clear buffered messages } } - if (hasUpToDate || hasSubsetEnd) { - // PROGRESSIVE MODE: Atomic swap on first up-to-date - if (isBufferingInitialSync() && hasUpToDate) { + if (commitPoint !== null) { + // PROGRESSIVE MODE: Atomic swap on first up-to-date (not subset-end) + if (isBufferingInitialSync() && commitPoint === `up-to-date`) { debug( `${collectionId ? `[${collectionId}] ` : ``}Progressive mode: Performing atomic swap with ${bufferedMessages.length} buffered messages`, ) @@ -1173,14 +1175,10 @@ function createElectricSync>( transactionStarted = false } } - - if (hasUpToDate || (hasSubsetEnd && syncMode === `on-demand`)) { - // Mark the collection as ready now that sync is up to date - wrappedMarkReady(isBufferingInitialSync()) - } + wrappedMarkReady(isBufferingInitialSync()) // Track that we've received the first up-to-date for progressive mode - if (hasUpToDate) { + if (commitPoint === `up-to-date`) { hasReceivedUpToDate = true } @@ -1211,12 +1209,11 @@ function createElectricSync>( return seen }) - // Resolve all matched pending matches on up-to-date or snapshot-end in on-demand mode + // Resolve all matched pending matches on up-to-date or subset-end // Set batchCommitted BEFORE resolving to avoid timing window where late awaitMatch // calls could register as "matched" after resolver pass already ran - if (hasUpToDate || (hasSnapshotEnd && syncMode === `on-demand`)) { - batchCommitted.setState(() => true) - } + batchCommitted.setState(() => true) + resolveMatchedPendingMatches() } })