diff --git a/.changeset/fix-deleted-items-not-disappearing-from-live-queries.md b/.changeset/fix-deleted-items-not-disappearing-from-live-queries.md new file mode 100644 index 000000000..fba20cbd5 --- /dev/null +++ b/.changeset/fix-deleted-items-not-disappearing-from-live-queries.md @@ -0,0 +1,11 @@ +--- +'@tanstack/db': patch +--- + +fix: deleted items not disappearing from live queries with `.limit()` + +Fixed a bug where deleting an item from a live query with `.orderBy()` and `.limit()` would not remove it from the query results. The `subscribeChanges` callback would never fire with a delete event. + +The issue was caused by duplicate inserts reaching the D2 pipeline, which corrupted the multiplicity tracking used by `TopKWithFractionalIndexOperator`. A delete would decrement multiplicity from 2 to 1 instead of 1 to 0, so the item remained visible. + +Fixed by ensuring `sentKeys` is updated before callbacks execute (preventing race conditions) and filtering duplicate inserts in `filterAndFlipChanges`. diff --git a/packages/db/src/collection/changes.ts b/packages/db/src/collection/changes.ts index 873c906eb..a69400375 100644 --- a/packages/db/src/collection/changes.ts +++ b/packages/db/src/collection/changes.ts @@ -109,6 +109,10 @@ export class CollectionChangesManager< if (options.includeInitialState) { subscription.requestSnapshot({ trackLoadSubsetPromise: false }) + } else if (options.includeInitialState === false) { + // When explicitly set to false (not just undefined), mark all state as "seen" + // so that all future changes (including deletes) pass through unfiltered. + subscription.markAllStateAsSeen() } // Add to batched listeners diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 44981d460..6573fccb7 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -52,6 +52,11 @@ export class CollectionSubscription { private loadedInitialState = false + // Flag to skip filtering in filterAndFlipChanges. + // This is separate from loadedInitialState because we want to allow + // requestSnapshot to still work even when filtering is skipped. + private skipFiltering = false + // Flag to indicate that we have sent at least 1 snapshot. // While `snapshotSent` is false we filter out all changes from subscription to the collection. private snapshotSent = false @@ -244,6 +249,13 @@ export class CollectionSubscription (change) => !this.sentKeys.has(change.key), ) + // Add keys to sentKeys BEFORE calling callback to prevent race condition. + // If a change event arrives while the callback is executing, it will see + // the keys already in sentKeys and filter out duplicates correctly. + for (const change of filteredSnapshot) { + this.sentKeys.add(change.key) + } + this.snapshotSent = true this.callback(filteredSnapshot) return true @@ -367,6 +379,13 @@ export class CollectionSubscription // Use the current count as the offset for this load const currentOffset = this.limitedSnapshotRowCount + // Add keys to sentKeys BEFORE calling callback to prevent race condition. + // If a change event arrives while the callback is executing, it will see + // the keys already in sentKeys and filter out duplicates correctly. + for (const change of changes) { + this.sentKeys.add(change.key) + } + this.callback(changes) // Update the row count and last key after sending (for next call's offset/cursor) @@ -441,10 +460,11 @@ export class CollectionSubscription * Filters and flips changes for keys that have not been sent yet. * Deletes are filtered out for keys that have not been sent yet. * Updates are flipped into inserts for keys that have not been sent yet. + * Duplicate inserts are filtered out to prevent D2 multiplicity > 1. */ private filterAndFlipChanges(changes: Array>) { - if (this.loadedInitialState) { - // We loaded the entire initial state + if (this.loadedInitialState || this.skipFiltering) { + // We loaded the entire initial state or filtering is explicitly skipped // so no need to filter or flip changes return changes } @@ -452,7 +472,9 @@ export class CollectionSubscription const newChanges = [] for (const change of changes) { let newChange = change - if (!this.sentKeys.has(change.key)) { + const keyInSentKeys = this.sentKeys.has(change.key) + + if (!keyInSentKeys) { if (change.type === `update`) { newChange = { ...change, type: `insert`, previousValue: undefined } } else if (change.type === `delete`) { @@ -460,6 +482,19 @@ export class CollectionSubscription continue } this.sentKeys.add(change.key) + } else { + // Key was already sent - handle based on change type + if (change.type === `insert`) { + // Filter out duplicate inserts - the key was already inserted. + // This prevents D2 multiplicity from going above 1, which would + // cause deletes to not properly remove items (multiplicity would + // go from 2 to 1 instead of 1 to 0). + continue + } else if (change.type === `delete`) { + // Remove from sentKeys so future inserts for this key are allowed + // (e.g., after truncate + reinsert) + this.sentKeys.delete(change.key) + } } newChanges.push(newChange) } @@ -467,17 +502,32 @@ export class CollectionSubscription } private trackSentKeys(changes: Array>) { - if (this.loadedInitialState) { - // No need to track sent keys if we loaded the entire state. - // Since we sent everything, all keys must have been observed. + if (this.loadedInitialState || this.skipFiltering) { + // No need to track sent keys if we loaded the entire state or filtering is skipped. + // Since filtering won't be applied, all keys are effectively "observed". return } for (const change of changes) { - this.sentKeys.add(change.key) + if (change.type === `delete`) { + // Remove deleted keys from sentKeys so future re-inserts are allowed + this.sentKeys.delete(change.key) + } else { + // For inserts and updates, track the key as sent + this.sentKeys.add(change.key) + } } } + /** + * Mark that the subscription should not filter any changes. + * This is used when includeInitialState is explicitly set to false, + * meaning the caller doesn't want initial state but does want ALL future changes. + */ + markAllStateAsSeen() { + this.skipFiltering = true + } + unsubscribe() { // Unload all subsets that this subscription loaded // We pass the exact same LoadSubsetOptions we used for loadSubset diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 4258302c7..f3f4b1cce 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -1193,8 +1193,10 @@ describe(`Collection.subscribeChanges`, () => { f.write({ type: `insert`, value: { id: 1, value: `server-after` } }) f.commit() - // Expect delete, insert with optimistic value, and an empty event from markReady - expect(changeEvents.length).toBe(3) + // Expect delete and insert with optimistic value + // Note: Previously there was a duplicate insert event that was incorrectly + // being sent, causing 3 events. Now duplicates are filtered correctly. + expect(changeEvents.length).toBe(2) expect(changeEvents[0]).toEqual({ type: `delete`, key: 1, diff --git a/packages/db/tests/query/optimistic-delete-with-limit.test.ts b/packages/db/tests/query/optimistic-delete-with-limit.test.ts new file mode 100644 index 000000000..2eec67d7b --- /dev/null +++ b/packages/db/tests/query/optimistic-delete-with-limit.test.ts @@ -0,0 +1,596 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { createCollection } from '../../src/collection/index.js' +import { mockSyncCollectionOptions } from '../utils.js' +import { createLiveQueryCollection } from '../../src/query/live-query-collection.js' +import { like } from '../../src/query/builder/functions.js' +import type { ChangeMessage } from '../../src/types.js' +import type { Collection } from '../../src/collection/index.js' + +type Item = { + id: string + value: number + name: string +} + +const initialData: Array = [ + { id: `1`, value: 100, name: `Item A` }, + { id: `2`, value: 90, name: `Item B` }, + { id: `3`, value: 80, name: `Item C` }, + { id: `4`, value: 70, name: `Item D` }, + { id: `5`, value: 60, name: `Item E` }, +] + +describe(`Optimistic delete with limit`, () => { + let sourceCollection: Collection + + beforeEach(async () => { + sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `items`, + getKey: (item: Item) => item.id, + initialData, + }), + ) + + // Wait for the collection to be ready + await sourceCollection.preload() + }) + + it(`should emit delete event with limit`, async () => { + // Create a live query with orderBy and limit (matching the user's pattern) + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(3) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results + const initialResults = Array.from(liveQueryCollection.values()) + expect(initialResults).toHaveLength(3) + expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`, `3`]) + + // Subscribe to changes on the live query collection + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: false, + }) + + // Clear any initial calls from subscription setup + changeCallback.mockClear() + + // Optimistically delete item 2 (which is in the visible top 3) + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // The callback should have been called with the delete event + expect(changeCallback).toHaveBeenCalled() + + // Get the changes from all calls + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + console.log( + `All changes (with limit):`, + JSON.stringify(allChanges, null, 2), + ) + + // Should have a delete for item 2 + const deleteEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `delete`, + ) + expect(deleteEvents.length).toBeGreaterThan(0) + expect(deleteEvents.some((e: ChangeMessage) => e.key === `2`)).toBe( + true, + ) + + subscription.unsubscribe() + }) + + it(`should emit delete event without limit (baseline)`, async () => { + // Create a live query WITHOUT limit (for comparison) + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results + const initialResults = Array.from(liveQueryCollection.values()) + expect(initialResults).toHaveLength(5) + + // Subscribe to changes on the live query collection + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: false, + }) + + // Clear any initial calls from subscription setup + changeCallback.mockClear() + + // Optimistically delete item 2 + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // The callback should have been called with the delete event + expect(changeCallback).toHaveBeenCalled() + + // Get the changes from all calls + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + console.log( + `All changes (without limit):`, + JSON.stringify(allChanges, null, 2), + ) + + // Should have a delete for item 2 + const deleteEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `delete`, + ) + expect(deleteEvents.length).toBeGreaterThan(0) + expect(deleteEvents.some((e: ChangeMessage) => e.key === `2`)).toBe( + true, + ) + + subscription.unsubscribe() + }) + + it(`should emit delete event with limit and includeInitialState: true`, async () => { + // Create a live query with orderBy and limit (matching the user's exact pattern) + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(3) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results + const initialResults = Array.from(liveQueryCollection.values()) + expect(initialResults).toHaveLength(3) + expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`, `3`]) + + // Subscribe to changes on the live query collection with includeInitialState: true + // This is what the user is doing + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: true, + }) + + // Wait for initial state to be sent + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Clear initial state calls + changeCallback.mockClear() + + // Optimistically delete item 2 (which is in the visible top 3) + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // The callback should have been called with the delete event + expect(changeCallback).toHaveBeenCalled() + + // Get the changes from all calls + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + console.log( + `All changes (with limit, includeInitialState: true):`, + JSON.stringify(allChanges, null, 2), + ) + + // Should have a delete for item 2 + const deleteEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `delete`, + ) + expect(deleteEvents.length).toBeGreaterThan(0) + expect(deleteEvents.some((e: ChangeMessage) => e.key === `2`)).toBe( + true, + ) + + subscription.unsubscribe() + }) + + it(`should emit delete event with limit and offset`, async () => { + // Create a live query with orderBy, limit AND offset (matching the user's exact pattern) + const pageSize = 2 + const pageIndex = 0 + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(pageSize) + .offset(pageIndex * pageSize) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results - should be items 1 and 2 (highest values) + const initialResults = Array.from(liveQueryCollection.values()) + expect(initialResults).toHaveLength(2) + expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`]) + + // Subscribe to changes with includeInitialState: true (same as user) + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: true, + }) + + // Wait for initial state to be sent + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Clear initial state calls + changeCallback.mockClear() + + // Delete item 2 (which is in the visible page) + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // The callback should have been called with the delete event + console.log( + `All changes (with limit+offset, includeInitialState: true):`, + JSON.stringify( + changeCallback.mock.calls.flatMap((call) => call[0]), + null, + 2, + ), + ) + expect(changeCallback).toHaveBeenCalled() + + // Get the changes from all calls + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + + // Should have a delete for item 2 + const deleteEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `delete`, + ) + expect(deleteEvents.length).toBeGreaterThan(0) + expect(deleteEvents.some((e: ChangeMessage) => e.key === `2`)).toBe( + true, + ) + + subscription.unsubscribe() + }) + + it(`should emit delete event with where clause, limit and offset (matching user's exact pattern)`, async () => { + // Create a live query that matches the user's pattern: + // query.where(...).orderBy(...).limit(pageSize).offset(pageIndex * pageSize) + const pageSize = 2 + const pageIndex = 0 + const search = `Item` // Simulating their search filter + + const liveQueryCollection = createLiveQueryCollection((q) => { + let query = q.from({ items: sourceCollection }) + // Add a where clause like the user does + query = query.where(({ items }) => like(items.name, `%${search}%`)) + return query + .orderBy(({ items }) => items.value, `desc`) + .limit(pageSize) + .offset(pageIndex * pageSize) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })) + }) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results - should be items 1 and 2 (highest values matching search) + const initialResults = Array.from(liveQueryCollection.values()) + console.log( + `Initial results (where + limit + offset):`, + JSON.stringify(initialResults, null, 2), + ) + expect(initialResults).toHaveLength(2) + expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`]) + + // Subscribe to changes with includeInitialState: true (same as user) + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: true, + }) + + // Wait for initial state to be sent + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Clear initial state calls + changeCallback.mockClear() + + // Delete item 2 (which is in the visible page) + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // The callback should have been called with the delete event + console.log( + `All changes (where + limit + offset, includeInitialState: true):`, + JSON.stringify( + changeCallback.mock.calls.flatMap((call) => call[0]), + null, + 2, + ), + ) + expect(changeCallback).toHaveBeenCalled() + + // Get the changes from all calls + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + + // Should have a delete for item 2 + const deleteEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `delete`, + ) + expect(deleteEvents.length).toBeGreaterThan(0) + expect(deleteEvents.some((e: ChangeMessage) => e.key === `2`)).toBe( + true, + ) + + subscription.unsubscribe() + }) + + it(`should emit delete and update when deleting from different page (page 1 delete while viewing page 2)`, async () => { + // This test captures the scenario from Marius's environment: + // - Viewing page 2 (items 3 and 4 with offset=2, limit=2) + // - Delete an item from page 1 (item 2) + // - The live query should update because items shift: + // - After delete, sorted order is: 1, 3, 4, 5 + // - Page 2 (offset=2, limit=2) should now show items 4 and 5 + // - So: item 3 should be deleted from result, item 5 should be inserted + + const pageSize = 2 + const pageIndex = 1 // page 2 (0-indexed) + + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(pageSize) + .offset(pageIndex * pageSize) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results - page 2 should show items 3 and 4 (offset 2, limit 2) + // Sorted by value desc: 1 (100), 2 (90), 3 (80), 4 (70), 5 (60) + // Page 2 = offset 2 = items 3 and 4 + const initialResults = Array.from(liveQueryCollection.values()) + console.log( + `Initial results (page 2):`, + JSON.stringify(initialResults, null, 2), + ) + expect(initialResults).toHaveLength(2) + expect(initialResults.map((r) => r.id)).toEqual([`3`, `4`]) + + // Subscribe to changes with includeInitialState: true + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: true, + }) + + // Wait for initial state to be sent + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Clear initial state calls + changeCallback.mockClear() + + // Delete item 2 (which is on page 1, NOT in current view) + console.log(`Deleting item 2 (on page 1, not visible on page 2)...`) + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 50)) + + // After deleting item 2: + // - Sorted order becomes: 1 (100), 3 (80), 4 (70), 5 (60) + // - Page 2 (offset 2, limit 2) should now show items 4 and 5 + // - So: item 3 should be deleted from result, item 5 should be inserted + + // Check the state after delete + const resultsAfterDelete = Array.from(liveQueryCollection.values()) + console.log( + `Results after delete:`, + JSON.stringify(resultsAfterDelete, null, 2), + ) + + // The live query collection should now show items 4 and 5 + expect(resultsAfterDelete).toHaveLength(2) + expect(resultsAfterDelete.map((r) => r.id)).toEqual([`4`, `5`]) + + // Check that we got the expected change events + console.log( + `All changes (page 2 after deleting from page 1):`, + JSON.stringify( + changeCallback.mock.calls.flatMap((call) => call[0]), + null, + 2, + ), + ) + + // We should have received change events (delete for item 3, insert for item 5) + expect(changeCallback).toHaveBeenCalled() + + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + + // Should have a delete for item 3 (shifted out of page 2) + const deleteEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `delete`, + ) + expect(deleteEvents.some((e: ChangeMessage) => e.key === `3`)).toBe( + true, + ) + + // Should have an insert for item 5 (shifted into page 2) + const insertEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `insert`, + ) + expect(insertEvents.some((e: ChangeMessage) => e.key === `5`)).toBe( + true, + ) + + subscription.unsubscribe() + }) + + it(`should NOT update when deleting item beyond TopK window (no-op case)`, async () => { + // Test scenario: delete an item that's AFTER the TopK window + // - Page 1: items 1 and 2 (offset=0, limit=2) + // - Delete item 5 (which is on page 3) + // - Page 1 should NOT change (items 1 and 2 are still there) + + const pageSize = 2 + const pageIndex = 0 // page 1 + + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(pageSize) + .offset(pageIndex * pageSize) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results - page 1 should show items 1 and 2 + const initialResults = Array.from(liveQueryCollection.values()) + console.log( + `Initial results (page 1 for no-op test):`, + JSON.stringify(initialResults, null, 2), + ) + expect(initialResults).toHaveLength(2) + expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`]) + + // Subscribe to changes + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: true, + }) + + // Wait for initial state to be sent + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Clear initial state calls + changeCallback.mockClear() + + // Delete item 5 (which is on page 3, beyond the TopK window) + console.log(`Deleting item 5 (on page 3, beyond TopK window)...`) + sourceCollection.delete(`5`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 50)) + + // After deleting item 5: + // - Sorted order becomes: 1 (100), 2 (90), 3 (80), 4 (70) + // - Page 1 (offset 0, limit 2) still shows items 1 and 2 + // - No change to page 1 + + // Check the state after delete + const resultsAfterDelete = Array.from(liveQueryCollection.values()) + console.log( + `Results after delete (no-op):`, + JSON.stringify(resultsAfterDelete, null, 2), + ) + + // The live query collection should still show items 1 and 2 + expect(resultsAfterDelete).toHaveLength(2) + expect(resultsAfterDelete.map((r) => r.id)).toEqual([`1`, `2`]) + + // Check that we did NOT receive any change events + console.log( + `Change events (should be empty):`, + JSON.stringify( + changeCallback.mock.calls.flatMap((call) => call[0]), + null, + 2, + ), + ) + + // No changes expected since item 5 is outside the window + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + expect(allChanges).toHaveLength(0) + + subscription.unsubscribe() + }) + + it(`should update state correctly after delete with limit`, async () => { + // Create a live query with orderBy and limit + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(3) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results + let results = Array.from(liveQueryCollection.values()) + expect(results.map((r) => r.id)).toEqual([`1`, `2`, `3`]) + + // Subscribe to changes + liveQueryCollection.subscribeChanges(() => {}, { + includeInitialState: false, + }) + + // Optimistically delete item 2 (which is in the visible top 3) + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Check that the state is updated + // Item 2 should be gone, and item 4 should move into the top 3 + results = Array.from(liveQueryCollection.values()) + expect(results.map((r) => r.id)).toEqual([`1`, `3`, `4`]) + }) +})