From dbc98714d00444694f021b48cda8409900a16777 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 16 Dec 2025 23:34:00 +0000 Subject: [PATCH 1/5] Fix groupBy duplicate insert errors on incremental live updates When processing incremental updates to groupBy queries, the D2 pipeline might emit an insert for an updated aggregate without a corresponding delete in certain edge cases. This caused "already exists" errors in the live query collection. Added a defensive check in applyChanges() that treats inserts as updates when the key already exists, but only for queries without custom getKey (like groupBy). Queries with custom getKey + joins still throw errors as expected to alert users of potential data issues. Added comprehensive test suite for groupBy incremental update scenarios: - Basic incremental updates with same groupBy key - Multiple groups being updated - Sum aggregate with incremental updates - Batch vs incremental processing - Subquery pattern (groupBy used as source for another query) - Rapid sequential inserts - Multiple events in single batch --- .../query/live/collection-config-builder.ts | 27 +- .../tests/query/group-by-incremental.test.ts | 516 ++++++++++++++++++ 2 files changed, 539 insertions(+), 4 deletions(-) create mode 100644 packages/db/tests/query/group-by-incremental.test.ts diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index b663fdad5..649b34fcd 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -722,10 +722,29 @@ export class CollectionConfigBuilder< // Simple singular insert. if (inserts && deletes === 0) { - write({ - value, - type: `insert`, - }) + // Defensive check: if the key already exists and we're NOT using a custom getKey, + // treat as update instead. This handles edge cases in groupBy where the D2 pipeline + // might emit an insert for an updated aggregate without a corresponding delete + // (e.g., due to timing or state issues in incremental updates). + // + // We only apply this for queries WITHOUT custom getKey (like groupBy) because: + // - For groupBy: the key is derived from the group expression, and duplicate inserts + // without deletes can happen due to D2 pipeline state issues + // - For custom getKey + joins: duplicates indicate a user error (e.g., using a non-unique + // key for a 1:N relationship), and we should throw an error to alert the user + const hasCustomGetKey = !!this.config.getKey + const keyExists = collection.has(collection.getKeyFromItem(value)) + if (!hasCustomGetKey && keyExists) { + write({ + value, + type: `update`, + }) + } else { + write({ + value, + type: `insert`, + }) + } } else if ( // Insert & update(s) (updates are a delete & insert) inserts > deletes || diff --git a/packages/db/tests/query/group-by-incremental.test.ts b/packages/db/tests/query/group-by-incremental.test.ts new file mode 100644 index 000000000..9c795ab52 --- /dev/null +++ b/packages/db/tests/query/group-by-incremental.test.ts @@ -0,0 +1,516 @@ +import { describe, expect, test } from 'vitest' +import { createLiveQueryCollection } from '../../src/query/index.js' +import { createCollection } from '../../src/collection/index.js' +import { mockSyncCollectionOptionsNoInitialState } from '../utils.js' +import { count, sum } from '../../src/query/builder/functions.js' + +/** + * Tests for groupBy incremental updates + * + * This test file specifically addresses the bug where groupBy works correctly + * during batch processing (preload) but fails with "already exists" errors + * when processing incremental live updates. + * + * Bug report: When multiple events with the same groupBy key but different + * primary keys arrive incrementally, the second event causes a duplicate + * key error in the live query's internal collection. + */ + +type Event = { + id: string + language: string + title?: string +} + +/** + * Helper to create a collection that's ready for testing. + * Handles all the boilerplate setup: preload, begin, commit, markReady. + */ +async function createReadyCollection(opts: { + id: string + getKey: (item: T) => string | number +}) { + const collection = createCollection( + mockSyncCollectionOptionsNoInitialState(opts), + ) + + const preloadPromise = collection.preload() + collection.utils.begin() + collection.utils.commit() + collection.utils.markReady() + await preloadPromise + + return collection +} + +describe(`GroupBy Incremental Updates`, () => { + describe(`Bug: Duplicate insert errors on live updates`, () => { + test(`should update aggregate when second event with same groupBy key arrives`, async () => { + // Create an empty collection that we'll populate incrementally + const eventsCollection = await createReadyCollection({ + id: `events`, + getKey: (event) => event.id, + }) + + // Create a groupBy query that counts events by language + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Initially empty + expect(languageCounts.size).toBe(0) + + // Insert first event with language="ru" + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event1`, language: `ru`, title: `First Russian Event` }, + }) + eventsCollection.utils.commit() + + // Should have one group with count 1 + expect(languageCounts.size).toBe(1) + const ruGroup1 = languageCounts.get(`ru`) + expect(ruGroup1).toBeDefined() + expect(ruGroup1?.language).toBe(`ru`) + expect(ruGroup1?.count).toBe(1) + + // Insert second event with same language="ru" but different id + // This is where the bug occurs - should UPDATE the aggregate, not try to INSERT + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event2`, language: `ru`, title: `Second Russian Event` }, + }) + eventsCollection.utils.commit() + + // Should still have one group, but with count 2 + expect(languageCounts.size).toBe(1) + const ruGroup2 = languageCounts.get(`ru`) + expect(ruGroup2).toBeDefined() + expect(ruGroup2?.language).toBe(`ru`) + expect(ruGroup2?.count).toBe(2) + }) + + test(`should handle multiple groups being updated incrementally`, async () => { + const eventsCollection = await createReadyCollection({ + id: `events-multi`, + getKey: (event) => event.id, + }) + + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Insert events for different languages + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `en1`, language: `en` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`en`)?.count).toBe(1) + + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `ru1`, language: `ru` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.size).toBe(2) + expect(languageCounts.get(`en`)?.count).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(1) + + // Add more to Russian - this is where the bug manifests + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `ru2`, language: `ru` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.size).toBe(2) + expect(languageCounts.get(`en`)?.count).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(2) + + // Add more to English + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `en2`, language: `en` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.size).toBe(2) + expect(languageCounts.get(`en`)?.count).toBe(2) + expect(languageCounts.get(`ru`)?.count).toBe(2) + }) + + test(`should handle sum aggregate with incremental updates`, async () => { + type Order = { + id: number + customerId: number + amount: number + } + + const ordersCollection = await createReadyCollection({ + id: `orders`, + getKey: (order) => order.id, + }) + + const customerTotals = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ orders: ordersCollection }) + .groupBy(({ orders }) => orders.customerId) + .select(({ orders }) => ({ + customerId: orders.customerId, + total: sum(orders.amount), + orderCount: count(orders.id), + })), + }) + + // Add first order for customer 1 + ordersCollection.utils.begin() + ordersCollection.utils.write({ + type: `insert`, + value: { id: 1, customerId: 1, amount: 100 }, + }) + ordersCollection.utils.commit() + + expect(customerTotals.size).toBe(1) + expect(customerTotals.get(1)?.total).toBe(100) + expect(customerTotals.get(1)?.orderCount).toBe(1) + + // Add second order for same customer - this triggers the bug + ordersCollection.utils.begin() + ordersCollection.utils.write({ + type: `insert`, + value: { id: 2, customerId: 1, amount: 200 }, + }) + ordersCollection.utils.commit() + + expect(customerTotals.size).toBe(1) + expect(customerTotals.get(1)?.total).toBe(300) + expect(customerTotals.get(1)?.orderCount).toBe(2) + + // Add third order + ordersCollection.utils.begin() + ordersCollection.utils.write({ + type: `insert`, + value: { id: 3, customerId: 1, amount: 150 }, + }) + ordersCollection.utils.commit() + + expect(customerTotals.size).toBe(1) + expect(customerTotals.get(1)?.total).toBe(450) + expect(customerTotals.get(1)?.orderCount).toBe(3) + }) + + test(`batch processing works correctly (baseline)`, () => { + // This test verifies that batch processing works - establishing the baseline + type Event = { + id: string + language: string + } + + // Create collection with initial data (batch processing) + const eventsCollection = createCollection({ + id: `events-batch`, + getKey: (event: Event) => event.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { id: `event1`, language: `ru` }, + }) + write({ + type: `insert`, + value: { id: `event2`, language: `ru` }, + }) + write({ + type: `insert`, + value: { id: `event3`, language: `en` }, + }) + commit() + markReady() + }, + }, + startSync: true, + }) + + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Batch processing should work correctly + expect(languageCounts.size).toBe(2) + expect(languageCounts.get(`ru`)?.count).toBe(2) + expect(languageCounts.get(`en`)?.count).toBe(1) + }) + + test(`mixed batch and incremental updates`, async () => { + type EventType = { + id: string + language: string + } + + const eventsCollection = createCollection( + mockSyncCollectionOptionsNoInitialState({ + id: `events-mixed`, + getKey: (event) => event.id, + }), + ) + + // Setup and batch insert initial data + const preloadPromise = eventsCollection.preload() + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event1`, language: `ru` }, + }) + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event2`, language: `ru` }, + }) + eventsCollection.utils.commit() + eventsCollection.utils.markReady() + await preloadPromise + + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // After batch, should have count 2 + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(2) + + // Now add incrementally - this is where the bug occurs + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event3`, language: `ru` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(3) + }) + + test(`groupBy with subquery (matching bug report pattern)`, async () => { + // This test mimics the exact pattern from the bug report: + // A groupBy result is used as a source for another query with orderBy/limit + type WikiEvent = { + id: string + language: string + } + + const eventsCollection = await createReadyCollection({ + id: `events-subquery`, + getKey: (event) => event.id, + }) + + // Create the groupBy query that counts events by language + // This is used as a subquery + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Create the outer query that orders by count and limits + const topLanguages = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ stats: languageCounts }) + .orderBy(({ stats }) => stats.count, `desc`) + .limit(5), + }) + + // Initially empty + expect(topLanguages.size).toBe(0) + + // Insert first event with language="ru" + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event1`, language: `ru` }, + }) + eventsCollection.utils.commit() + + // Should have one language with count 1 + expect(topLanguages.size).toBe(1) + const firstResult = [...topLanguages.values()][0] + expect(firstResult?.language).toBe(`ru`) + expect(firstResult?.count).toBe(1) + + // Insert second event with same language="ru" but different id + // This is where the bug would occur + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event2`, language: `ru` }, + }) + eventsCollection.utils.commit() + + // Should still have one language, but with count 2 + expect(topLanguages.size).toBe(1) + const secondResult = [...topLanguages.values()][0] + expect(secondResult?.language).toBe(`ru`) + expect(secondResult?.count).toBe(2) + + // Add more events to different languages + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event3`, language: `en` }, + }) + eventsCollection.utils.commit() + + expect(topLanguages.size).toBe(2) + + // Add another Russian event + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event4`, language: `ru` }, + }) + eventsCollection.utils.commit() + + // Russian should now have count 3 + const results = [...topLanguages.values()] + const ruResult = results.find((r) => r.language === `ru`) + const enResult = results.find((r) => r.language === `en`) + expect(ruResult?.count).toBe(3) + expect(enResult?.count).toBe(1) + }) + + test(`groupBy with rapid sequential inserts`, async () => { + // Test rapid sequential inserts that might trigger race conditions + const eventsCollection = await createReadyCollection({ + id: `events-rapid`, + getKey: (event) => event.id, + }) + + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Rapidly insert multiple events with the same language + for (let i = 0; i < 10; i++) { + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event-${i}`, language: `ru` }, + }) + eventsCollection.utils.commit() + } + + // Should have accumulated all counts + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(10) + }) + + test(`groupBy with multiple events in single batch`, async () => { + // Test inserting multiple events with same groupBy key in a single batch + const eventsCollection = await createReadyCollection({ + id: `events-batch-same`, + getKey: (event) => event.id, + }) + + const languageCounts = createLiveQueryCollection({ + startSync: true, + query: (q) => + q + .from({ events: eventsCollection }) + .groupBy(({ events }) => events.language) + .select(({ events }) => ({ + language: events.language, + count: count(events.id), + })), + }) + + // Insert multiple events in a single batch + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event1`, language: `ru` }, + }) + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event2`, language: `ru` }, + }) + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event3`, language: `ru` }, + }) + eventsCollection.utils.commit() + + // Should have one group with count 3 + expect(languageCounts.size).toBe(1) + expect(languageCounts.get(`ru`)?.count).toBe(3) + + // Then add more incrementally + eventsCollection.utils.begin() + eventsCollection.utils.write({ + type: `insert`, + value: { id: `event4`, language: `ru` }, + }) + eventsCollection.utils.commit() + + expect(languageCounts.get(`ru`)?.count).toBe(4) + }) + }) +}) From 1fee992c168252ae5c8023675d13035cc42f95b8 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 16 Dec 2025 23:41:18 +0000 Subject: [PATCH 2/5] Fix workspace dependency version mismatch for query-db-collection Update @tanstack/query-db-collection from ^1.0.7 to ^1.0.8 in: - examples/react/todo - examples/solid/todo This fixes the sherif check that requires consistent dependency versions across the workspace. --- examples/react/todo/package.json | 2 +- examples/solid/todo/package.json | 2 +- pnpm-lock.yaml | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/react/todo/package.json b/examples/react/todo/package.json index 3dc6432fa..4f738205f 100644 --- a/examples/react/todo/package.json +++ b/examples/react/todo/package.json @@ -5,7 +5,7 @@ "dependencies": { "@tanstack/electric-db-collection": "^0.2.12", "@tanstack/query-core": "^5.90.12", - "@tanstack/query-db-collection": "^1.0.7", + "@tanstack/query-db-collection": "^1.0.8", "@tanstack/react-db": "^0.1.56", "@tanstack/react-router": "^1.140.0", "@tanstack/react-start": "^1.140.0", diff --git a/examples/solid/todo/package.json b/examples/solid/todo/package.json index 438b61a11..7301e813a 100644 --- a/examples/solid/todo/package.json +++ b/examples/solid/todo/package.json @@ -5,7 +5,7 @@ "dependencies": { "@tanstack/electric-db-collection": "^0.2.12", "@tanstack/query-core": "^5.90.12", - "@tanstack/query-db-collection": "^1.0.7", + "@tanstack/query-db-collection": "^1.0.8", "@tanstack/solid-db": "^0.1.54", "@tanstack/solid-router": "^1.140.0", "@tanstack/solid-start": "^1.140.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 109e22297..fdfce9385 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -433,7 +433,7 @@ importers: specifier: ^5.90.12 version: 5.90.12 '@tanstack/query-db-collection': - specifier: ^1.0.7 + specifier: ^1.0.8 version: link:../../../packages/query-db-collection '@tanstack/react-db': specifier: ^0.1.56 @@ -554,7 +554,7 @@ importers: specifier: ^5.90.12 version: 5.90.12 '@tanstack/query-db-collection': - specifier: ^1.0.7 + specifier: ^1.0.8 version: link:../../../packages/query-db-collection '@tanstack/solid-db': specifier: ^0.1.54 From 18395ae839388b333f2a5ca6efdd99ec1223516a Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Dec 2025 00:23:48 +0000 Subject: [PATCH 3/5] Add sync layer fallback for groupBy duplicate insert errors The previous fix in applyChanges was insufficient because the actual error was thrown in the sync layer. This adds a fallback that converts inserts to updates for live queries without custom getKey (like groupBy) when the key already exists, preventing duplicate key errors during incremental updates. --- packages/db/src/collection/sync.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index de2fbb5e1..b4ce2d0b5 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -122,6 +122,10 @@ export class CollectionSyncManager< !isTruncateTransaction ) { const existingValue = this.state.syncedData.get(key) + const utils = this.config + .utils as Partial | undefined + const internal = utils?.[LIVE_QUERY_INTERNAL] + if ( existingValue !== undefined && deepEquals(existingValue, messageWithoutKey.value) @@ -130,10 +134,12 @@ export class CollectionSyncManager< // Treat it as an update so we preserve optimistic intent without // throwing a duplicate-key error during reconciliation. messageType = `update` + } else if (internal && !internal.hasCustomGetKey) { + // For live queries without custom getKey (like groupBy), the D2 pipeline + // might emit an insert for an updated aggregate without a corresponding + // delete in certain edge cases. Convert to update to avoid duplicate key errors. + messageType = `update` } else { - const utils = this.config - .utils as Partial - const internal = utils[LIVE_QUERY_INTERNAL] throw new DuplicateKeySyncError(key, this.id, { hasCustomGetKey: internal?.hasCustomGetKey ?? false, hasJoins: internal?.hasJoins ?? false, From 79f1766020a9b1b647de7a94966dbc64375895b9 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 17 Dec 2025 00:25:04 +0000 Subject: [PATCH 4/5] ci: apply automated fixes --- packages/db/src/collection/sync.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/db/src/collection/sync.ts b/packages/db/src/collection/sync.ts index b4ce2d0b5..1d71837dc 100644 --- a/packages/db/src/collection/sync.ts +++ b/packages/db/src/collection/sync.ts @@ -122,8 +122,9 @@ export class CollectionSyncManager< !isTruncateTransaction ) { const existingValue = this.state.syncedData.get(key) - const utils = this.config - .utils as Partial | undefined + const utils = this.config.utils as + | Partial + | undefined const internal = utils?.[LIVE_QUERY_INTERNAL] if ( From ce63da049c3aac2cedb4446c05b2909b66c378f7 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 17 Dec 2025 00:42:01 +0000 Subject: [PATCH 5/5] Add failing test for groupBy duplicate insert bug Added a test that directly exercises the sync layer fix by: 1. Setting up a collection with utils[LIVE_QUERY_INTERNAL].hasCustomGetKey = false (simulating a live query without custom getKey, like groupBy) 2. Inserting a document with a specific key 3. Attempting to insert another document with the same key (without delete) Without the fix, this would throw "Cannot insert document with key ... already exists". With the fix, the insert is converted to an update. Also added a test to verify regular collections still throw errors on duplicate inserts. --- .../tests/query/group-by-incremental.test.ts | 131 ++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/packages/db/tests/query/group-by-incremental.test.ts b/packages/db/tests/query/group-by-incremental.test.ts index 9c795ab52..e3efdb7fc 100644 --- a/packages/db/tests/query/group-by-incremental.test.ts +++ b/packages/db/tests/query/group-by-incremental.test.ts @@ -44,6 +44,137 @@ async function createReadyCollection(opts: { } describe(`GroupBy Incremental Updates`, () => { + describe(`Sync layer duplicate insert handling`, () => { + test(`sync layer should convert insert to update for live query without custom getKey when key exists`, async () => { + // This test directly exercises the sync layer fix by simulating the scenario + // where the D2 pipeline emits only an insert (without delete) for an existing key. + // This happens in certain edge cases with groupBy aggregates. + // + // The fix checks for utils[LIVE_QUERY_INTERNAL].hasCustomGetKey to determine + // if we should convert duplicate inserts to updates. + + type GroupResult = { + language: string + count: number + } + + // Import the internal symbol used by live queries + const { LIVE_QUERY_INTERNAL } = await import( + `../../src/query/live/internal.js` + ) + + // Create a collection that mimics a live query collection structure + // with hasCustomGetKey: false (like groupBy queries) + const liveQueryCollection = createCollection({ + id: `live-query-sync-test`, + getKey: (item) => item.language, + sync: { + sync: ({ begin, write, commit, markReady }) => { + // First batch: insert initial aggregate + begin() + write({ + type: `insert`, + value: { language: `ru`, count: 1 }, + }) + commit() + markReady() + + // Later: simulate D2 emitting only an insert for updated aggregate + // (without the corresponding delete for the old value) + // This is the edge case that causes the bug + setTimeout(() => { + begin() + // This insert should be converted to update by the sync layer + // because the key "ru" already exists AND hasCustomGetKey is false + write({ + type: `insert`, + value: { language: `ru`, count: 2 }, + }) + commit() + }, 10) + }, + }, + startSync: true, + // This is the key part: set up utils with LIVE_QUERY_INTERNAL + // to indicate this is a live query without custom getKey + utils: { + [LIVE_QUERY_INTERNAL]: { + hasCustomGetKey: false, + hasJoins: false, + getBuilder: () => null, + }, + } as any, + }) + + await liveQueryCollection.preload() + + // Initial state + expect(liveQueryCollection.size).toBe(1) + expect(liveQueryCollection.get(`ru`)?.count).toBe(1) + + // Wait for the second write + await new Promise((resolve) => setTimeout(resolve, 50)) + + // After the "insert" that should be converted to update + // Without the fix, this would throw: "Cannot insert document with key 'ru' ... already exists" + expect(liveQueryCollection.size).toBe(1) + expect(liveQueryCollection.get(`ru`)?.count).toBe(2) + }) + + test(`sync layer should throw error for regular collection with duplicate insert`, async () => { + // Regular collections (without LIVE_QUERY_INTERNAL) should still throw + // an error when trying to insert a duplicate key + + type Item = { + id: string + value: number + } + + const collection = createCollection({ + id: `regular-collection-test`, + getKey: (item) => item.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + begin() + write({ + type: `insert`, + value: { id: `item1`, value: 1 }, + }) + commit() + markReady() + + // This should throw because it's a regular collection + setTimeout(() => { + begin() + try { + write({ + type: `insert`, + value: { id: `item1`, value: 2 }, + }) + commit() + } catch { + // Expected - error should be thrown + } + }, 10) + }, + }, + startSync: true, + }) + + await collection.preload() + + expect(collection.size).toBe(1) + expect(collection.get(`item1`)?.value).toBe(1) + + // Wait for the second write attempt + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Value should NOT be updated because the insert should have been rejected + expect(collection.size).toBe(1) + expect(collection.get(`item1`)?.value).toBe(1) + }) + }) + describe(`Bug: Duplicate insert errors on live updates`, () => { test(`should update aggregate when second event with same groupBy key arrives`, async () => { // Create an empty collection that we'll populate incrementally