diff --git a/.changeset/silent-trains-tell.md b/.changeset/silent-trains-tell.md new file mode 100644 index 000000000..414dd8c89 --- /dev/null +++ b/.changeset/silent-trains-tell.md @@ -0,0 +1,5 @@ +--- +"@tanstack/query-db-collection": patch +--- + +Handle pushed-down predicates diff --git a/packages/db/src/query/subset-dedupe.ts b/packages/db/src/query/subset-dedupe.ts index a7c6d7c6a..3f24154ac 100644 --- a/packages/db/src/query/subset-dedupe.ts +++ b/packages/db/src/query/subset-dedupe.ts @@ -239,10 +239,5 @@ export class DeduplicatedLoadSubset { * would reflect the mutated values rather than what was actually loaded. */ export function cloneOptions(options: LoadSubsetOptions): LoadSubsetOptions { - return { - where: options.where, - orderBy: options.orderBy, - limit: options.limit, - // Note: We don't clone subscription as it's not part of predicate matching - } + return { ...options } } diff --git a/packages/query-db-collection/src/query.ts b/packages/query-db-collection/src/query.ts index bbfd6db56..7310b0b38 100644 --- a/packages/query-db-collection/src/query.ts +++ b/packages/query-db-collection/src/query.ts @@ -1,4 +1,5 @@ -import { QueryObserver } from "@tanstack/query-core" +import { QueryObserver, hashKey } from "@tanstack/query-core" +import { DeduplicatedLoadSubset } from "@tanstack/db" import { GetKeyRequiredError, QueryClientRequiredError, @@ -6,23 +7,24 @@ import { QueryKeyRequiredError, } from "./errors" import { createWriteUtils } from "./manual-sync" -import type { - QueryClient, - QueryFunctionContext, - QueryKey, - QueryObserverOptions, - QueryObserverResult, -} from "@tanstack/query-core" import type { BaseCollectionConfig, ChangeMessage, CollectionConfig, DeleteMutationFnParams, InsertMutationFnParams, + LoadSubsetOptions, SyncConfig, UpdateMutationFnParams, UtilsRecord, } from "@tanstack/db" +import type { + QueryClient, + QueryFunctionContext, + QueryKey, + QueryObserverOptions, + QueryObserverResult, +} from "@tanstack/query-core" import type { StandardSchemaV1 } from "@standard-schema/spec" // Re-export for external use @@ -42,6 +44,8 @@ type InferSchemaInput = T extends StandardSchemaV1 : Record : Record +type TQueryKeyBuilder = (opts: LoadSubsetOptions) => TQueryKey + /** * Configuration options for creating a Query Collection * @template T - The explicit type of items stored in the collection @@ -63,7 +67,7 @@ export interface QueryCollectionConfig< TQueryData = Awaited>, > extends BaseCollectionConfig { /** The query key used by TanStack Query to identify this query */ - queryKey: TQueryKey + queryKey: TQueryKey | TQueryKeyBuilder /** Function that fetches data from the server. Must return the complete collection state */ queryFn: TQueryFn extends ( context: QueryFunctionContext @@ -397,6 +401,9 @@ export function queryCollectionOptions( ...baseCollectionConfig } = config + // Default to eager sync mode if not provided + const syncMode = baseCollectionConfig.syncMode ?? `eager` + // Validate required parameters // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition @@ -424,181 +431,415 @@ export function queryCollectionOptions( let errorCount = 0 /** The timestamp for when the query most recently returned the status as "error" */ let lastErrorUpdatedAt = 0 - /** Reference to the QueryObserver for imperative refetch */ - let queryObserver: QueryObserver, any, Array, Array, any> - const internalSync: SyncConfig[`sync`] = (params) => { - const { begin, write, commit, markReady, collection } = params + // hashedQueryKey → queryKey + const hashToQueryKey = new Map() - const observerOptions: QueryObserverOptions< - Array, - any, - Array, - Array, - any - > = { - queryKey: queryKey, - queryFn: queryFn, - structuralSharing: true, - notifyOnChangeProps: `all`, - // Only include options that are explicitly defined to allow QueryClient defaultOptions to be used - ...(meta !== undefined && { meta }), - ...(enabled !== undefined && { enabled }), - ...(refetchInterval !== undefined && { refetchInterval }), - ...(retry !== undefined && { retry }), - ...(retryDelay !== undefined && { retryDelay }), - ...(staleTime !== undefined && { staleTime }), - } + // queryKey → Set + const queryToRows = new Map>() - const localObserver = new QueryObserver< - Array, - any, - Array, - Array, - any - >(queryClient, observerOptions) + // RowKey → Set + const rowToQueries = new Map>() - // Store reference for imperative refetch - queryObserver = localObserver + // queryKey → QueryObserver - map of query observers that we did not yet susbcribe to + const observers = new Map< + string, + QueryObserver, any, Array, Array, any> + >() - let isSubscribed = false - let actualUnsubscribeFn: (() => void) | null = null + // queryKey → QueryObserver's unsubscribe function + const unsubscribes = new Map void>() - type UpdateHandler = Parameters[0] - const handleQueryResult: UpdateHandler = (result) => { - if (result.isSuccess) { - // Clear error state - lastError = undefined - errorCount = 0 + // Helper function to add a row to the internal state + const addRow = (rowKey: string | number, hashedQueryKey: string) => { + const rowToQueriesSet = rowToQueries.get(rowKey) || new Set() + rowToQueriesSet.add(hashedQueryKey) + rowToQueries.set(rowKey, rowToQueriesSet) + + const queryToRowsSet = queryToRows.get(hashedQueryKey) || new Set() + queryToRowsSet.add(rowKey) + queryToRows.set(hashedQueryKey, queryToRowsSet) + } - const rawData = result.data - const newItemsArray = select ? select(rawData) : rawData + // Helper function to remove a row from the internal state + const removeRow = (rowKey: string | number, hashedQuerKey: string) => { + const rowToQueriesSet = rowToQueries.get(rowKey) || new Set() + rowToQueriesSet.delete(hashedQuerKey) + rowToQueries.set(rowKey, rowToQueriesSet) - if ( - !Array.isArray(newItemsArray) || - newItemsArray.some((item) => typeof item !== `object`) - ) { - const errorMessage = select - ? `@tanstack/query-db-collection: select() must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` - : `@tanstack/query-db-collection: queryFn must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` + const queryToRowsSet = queryToRows.get(hashedQuerKey) || new Set() + queryToRowsSet.delete(rowKey) + queryToRows.set(hashedQuerKey, queryToRowsSet) - console.error(errorMessage) - return - } + return rowToQueriesSet.size === 0 + } - const currentSyncedItems: Map = new Map( - collection._state.syncedData.entries() - ) - const newItemsMap = new Map() - newItemsArray.forEach((item) => { - const key = getKey(item) - newItemsMap.set(key, item) - }) + const internalSync: SyncConfig[`sync`] = (params) => { + const { begin, write, commit, markReady, collection } = params - begin() - - // Helper function for shallow equality check of objects - const shallowEqual = ( - obj1: Record, - obj2: Record - ): boolean => { - // Get all keys from both objects - const keys1 = Object.keys(obj1) - const keys2 = Object.keys(obj2) - - // If number of keys is different, objects are not equal - if (keys1.length !== keys2.length) return false - - // Check if all keys in obj1 have the same values in obj2 - return keys1.every((key) => { - // Skip comparing functions and complex objects deeply - if (typeof obj1[key] === `function`) return true - return obj1[key] === obj2[key] + // Track whether sync has been started + let syncStarted = false + + const createQueryFromOpts = ( + opts: LoadSubsetOptions, + queryFunction: typeof queryFn = queryFn + ): true | Promise => { + // Push the predicates down to the queryKey and queryFn + const key = typeof queryKey === `function` ? queryKey(opts) : queryKey + const hashedQueryKey = hashKey(key) + const extendedMeta = { ...meta, loadSubsetOptions: opts } + + if (observers.has(hashedQueryKey)) { + // We already have a query for this queryKey + // Get the current result and return based on its state + const observer = observers.get(hashedQueryKey)! + const currentResult = observer.getCurrentResult() + + if (currentResult.isSuccess) { + // Data is already available, return true synchronously + return true + } else if (currentResult.isError) { + // Error already occurred, reject immediately + return Promise.reject(currentResult.error) + } else { + // Query is still loading, wait for the first result + return new Promise((resolve, reject) => { + const unsubscribe = observer.subscribe((result) => { + if (result.isSuccess) { + unsubscribe() + resolve() + } else if (result.isError) { + unsubscribe() + reject(result.error) + } + }) }) } + } - currentSyncedItems.forEach((oldItem, key) => { - const newItem = newItemsMap.get(key) - if (!newItem) { - write({ type: `delete`, value: oldItem }) - } else if ( - !shallowEqual( - oldItem as Record, - newItem as Record - ) - ) { - // Only update if there are actual differences in the properties - write({ type: `update`, value: newItem }) + const observerOptions: QueryObserverOptions< + Array, + any, + Array, + Array, + any + > = { + queryKey: key, + queryFn: queryFunction, + meta: extendedMeta, + structuralSharing: true, + notifyOnChangeProps: `all`, + + // Only include options that are explicitly defined to allow QueryClient defaultOptions to be used + ...(enabled !== undefined && { enabled }), + ...(refetchInterval !== undefined && { refetchInterval }), + ...(retry !== undefined && { retry }), + ...(retryDelay !== undefined && { retryDelay }), + ...(staleTime !== undefined && { staleTime }), + } + + const localObserver = new QueryObserver< + Array, + any, + Array, + Array, + any + >(queryClient, observerOptions) + + hashToQueryKey.set(hashedQueryKey, key) + observers.set(hashedQueryKey, localObserver) + + // Create a promise that resolves when the query result is first available + const readyPromise = new Promise((resolve, reject) => { + const unsubscribe = localObserver.subscribe((result) => { + if (result.isSuccess) { + unsubscribe() + resolve() + } else if (result.isError) { + unsubscribe() + reject(result.error) } }) + }) + + // If sync has started or there are subscribers to the collection, subscribe to the query straight away + // This creates the main subscription that handles data updates + if (syncStarted || collection.subscriberCount > 0) { + subscribeToQuery(localObserver, hashedQueryKey) + } + + // Tell tanstack query to GC the query when the subscription is unsubscribed + // The subscription is unsubscribed when the live query is GCed. + const subscription = opts.subscription + subscription?.once(`unsubscribed`, () => { + queryClient.removeQueries({ queryKey: key, exact: true }) + }) + + return readyPromise + } + + type UpdateHandler = Parameters[0] + + const makeQueryResultHandler = (queryKey: QueryKey) => { + const hashedQueryKey = hashKey(queryKey) + const handleQueryResult: UpdateHandler = (result) => { + if (result.isSuccess) { + // Clear error state + lastError = undefined + errorCount = 0 - newItemsMap.forEach((newItem, key) => { - if (!currentSyncedItems.has(key)) { - write({ type: `insert`, value: newItem }) + const rawData = result.data + const newItemsArray = select ? select(rawData) : rawData + + if ( + !Array.isArray(newItemsArray) || + newItemsArray.some((item) => typeof item !== `object`) + ) { + const errorMessage = select + ? `@tanstack/query-db-collection: select() must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` + : `@tanstack/query-db-collection: queryFn must return an array of objects. Got: ${typeof newItemsArray} for queryKey ${JSON.stringify(queryKey)}` + + console.error(errorMessage) + return } - }) - commit() + const currentSyncedItems: Map = new Map( + collection._state.syncedData.entries() + ) + const newItemsMap = new Map() + newItemsArray.forEach((item) => { + const key = getKey(item) + newItemsMap.set(key, item) + }) - // Mark collection as ready after first successful query result - markReady() - } else if (result.isError) { - if (result.errorUpdatedAt !== lastErrorUpdatedAt) { - lastError = result.error - errorCount++ - lastErrorUpdatedAt = result.errorUpdatedAt - } + begin() + + // Helper function for shallow equality check of objects + const shallowEqual = ( + obj1: Record, + obj2: Record + ): boolean => { + // Get all keys from both objects + const keys1 = Object.keys(obj1) + const keys2 = Object.keys(obj2) + + // If number of keys is different, objects are not equal + if (keys1.length !== keys2.length) return false + + // Check if all keys in obj1 have the same values in obj2 + return keys1.every((key) => { + // Skip comparing functions and complex objects deeply + if (typeof obj1[key] === `function`) return true + return obj1[key] === obj2[key] + }) + } + + currentSyncedItems.forEach((oldItem, key) => { + const newItem = newItemsMap.get(key) + if (!newItem) { + const needToRemove = removeRow(key, hashedQueryKey) // returns true if the row is no longer referenced by any queries + if (needToRemove) { + write({ type: `delete`, value: oldItem }) + } + } else if ( + !shallowEqual( + oldItem as Record, + newItem as Record + ) + ) { + // Only update if there are actual differences in the properties + write({ type: `update`, value: newItem }) + } + }) + + newItemsMap.forEach((newItem, key) => { + addRow(key, hashedQueryKey) + if (!currentSyncedItems.has(key)) { + write({ type: `insert`, value: newItem }) + } + }) - console.error( - `[QueryCollection] Error observing query ${String(queryKey)}:`, - result.error - ) + commit() - // Mark collection as ready even on error to avoid blocking apps - markReady() + // Mark collection as ready after first successful query result + markReady() + } else if (result.isError) { + if (result.errorUpdatedAt !== lastErrorUpdatedAt) { + lastError = result.error + errorCount++ + lastErrorUpdatedAt = result.errorUpdatedAt + } + + console.error( + `[QueryCollection] Error observing query ${String(queryKey)}:`, + result.error + ) + + // Mark collection as ready even on error to avoid blocking apps + markReady() + } } + return handleQueryResult } - const subscribeToQuery = () => { - if (!isSubscribed) { - actualUnsubscribeFn = localObserver.subscribe(handleQueryResult) - isSubscribed = true + // This function is called when a loadSubset call is deduplicated + // meaning that we have all the data locally available to answer the query + // so we execute the query locally + const createLocalQuery = (opts: LoadSubsetOptions) => { + const queryFn = ({ meta }: QueryFunctionContext) => { + const inserts = collection.currentStateAsChanges( + meta!.loadSubsetOptions as LoadSubsetOptions + )! + const data = inserts.map(({ value }) => value) + return Promise.resolve(data) } + + createQueryFromOpts(opts, queryFn) } - const unsubscribeFromQuery = () => { - if (isSubscribed && actualUnsubscribeFn) { - actualUnsubscribeFn() - actualUnsubscribeFn = null - isSubscribed = false + const isSubscribed = (hashedQueryKey: string) => { + return unsubscribes.has(hashedQueryKey) + } + + const subscribeToQuery = ( + observer: QueryObserver, any, Array, Array, any>, + hashedQueryKey: string + ) => { + if (!isSubscribed(hashedQueryKey)) { + const queryKey = hashToQueryKey.get(hashedQueryKey)! + const handleQueryResult = makeQueryResultHandler(queryKey) + const unsubscribeFn = observer.subscribe(handleQueryResult) + unsubscribes.set(hashedQueryKey, unsubscribeFn) } } - // Always subscribe when sync starts (this could be from preload(), startSync config, or first subscriber) - // We'll dynamically unsubscribe/resubscribe based on subscriber count to maintain staleTime behavior - subscribeToQuery() + const subscribeToQueries = () => { + observers.forEach(subscribeToQuery) + } + + const unsubscribeFromQueries = () => { + unsubscribes.forEach((unsubscribeFn) => { + unsubscribeFn() + }) + unsubscribes.clear() + } + + // Mark that sync has started + syncStarted = true // Set up event listener for subscriber changes const unsubscribeFromCollectionEvents = collection.on( `subscribers:change`, ({ subscriberCount }) => { if (subscriberCount > 0) { - subscribeToQuery() + subscribeToQueries() } else if (subscriberCount === 0) { - unsubscribeFromQuery() + unsubscribeFromQueries() } } ) - // Ensure we process any existing query data (QueryObserver doesn't invoke its callback automatically with initial - // state) - handleQueryResult(localObserver.getCurrentResult()) + // If syncMode is eager, create the initial query without any predicates + if (syncMode === `eager`) { + // Catch any errors to prevent unhandled rejections + const initialResult = createQueryFromOpts({}) + if (initialResult instanceof Promise) { + initialResult.catch(() => { + // Errors are already handled by the query result handler + }) + } + } else { + // In on-demand mode, mark ready immediately since there's no initial query + markReady() + } - return async () => { + // Always subscribe when sync starts (this could be from preload(), startSync config, or first subscriber) + // We'll dynamically unsubscribe/resubscribe based on subscriber count to maintain staleTime behavior + subscribeToQueries() + + // Ensure we process any existing query data (QueryObserver doesn't invoke its callback automatically with initial state) + observers.forEach((observer, hashedQueryKey) => { + const queryKey = hashToQueryKey.get(hashedQueryKey)! + const handleQueryResult = makeQueryResultHandler(queryKey) + handleQueryResult(observer.getCurrentResult()) + }) + + // Subscribe to the query client's cache to handle queries that are GCed by tanstack query + const unsubscribeQueryCache = queryClient + .getQueryCache() + .subscribe((event) => { + const hashedKey = event.query.queryHash + if (event.type === `removed`) { + cleanupQuery(hashedKey) + } + }) + + function cleanupQuery(hashedQueryKey: string) { + // Unsubscribe from the query's observer + unsubscribes.get(hashedQueryKey)?.() + + // Get all the rows that are in the result of this query + const rowKeys = queryToRows.get(hashedQueryKey) ?? new Set() + + // Remove the query from these rows + rowKeys.forEach((rowKey) => { + const queries = rowToQueries.get(rowKey) // set of queries that reference this row + if (queries && queries.size > 0) { + queries.delete(hashedQueryKey) + if (queries.size === 0) { + // Reference count dropped to 0, we can GC the row + rowToQueries.delete(rowKey) + + if (collection.has(rowKey)) { + begin() + write({ type: `delete`, value: collection.get(rowKey) }) + commit() + } + } + } + }) + + // Remove the query from the internal state + unsubscribes.delete(hashedQueryKey) + observers.delete(hashedQueryKey) + queryToRows.delete(hashedQueryKey) + hashToQueryKey.delete(hashedQueryKey) + } + + const cleanup = async () => { unsubscribeFromCollectionEvents() - unsubscribeFromQuery() - await queryClient.cancelQueries({ queryKey }) - queryClient.removeQueries({ queryKey }) + unsubscribeFromQueries() + + const queryKeys = [...hashToQueryKey.values()] + + hashToQueryKey.clear() + queryToRows.clear() + rowToQueries.clear() + observers.clear() + unsubscribeQueryCache() + + await Promise.all( + queryKeys.map(async (queryKey) => { + await queryClient.cancelQueries({ queryKey }) + queryClient.removeQueries({ queryKey }) + }) + ) + } + + // Create deduplicated loadSubset wrapper for non-eager modes + // This prevents redundant snapshot requests when multiple concurrent + // live queries request overlapping or subset predicates + const loadSubsetDedupe = + syncMode === `eager` + ? undefined + : new DeduplicatedLoadSubset({ + loadSubset: createQueryFromOpts, + onDeduplicate: createLocalQuery, + }) + + return { + loadSubset: loadSubsetDedupe?.loadSubset, + cleanup, } } @@ -621,15 +862,15 @@ export function queryCollectionOptions( * @returns Promise that resolves when the refetch is complete, with QueryObserverResult */ const refetch: RefetchFn = async (opts) => { - // Observer is created when sync starts. If never synced, nothing to refetch. - // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition - if (!queryObserver) { - return - } - // Return the QueryObserverResult for users to inspect - return queryObserver.refetch({ - throwOnError: opts?.throwOnError, + const queryKeys = [...hashToQueryKey.values()] + const refetchPromises = queryKeys.map((queryKey) => { + const queryObserver = observers.get(hashKey(queryKey))! + return queryObserver.refetch({ + throwOnError: opts?.throwOnError, + }) }) + + await Promise.all(refetchPromises) } // Create write context for manual write operations @@ -713,6 +954,7 @@ export function queryCollectionOptions( return { ...baseCollectionConfig, getKey, + syncMode, sync: { sync: enhancedInternalSync }, onInsert: wrappedOnInsert, onUpdate: wrappedOnUpdate, diff --git a/packages/query-db-collection/tests/query.test.ts b/packages/query-db-collection/tests/query.test.ts index b3a6dd712..c05771f5a 100644 --- a/packages/query-db-collection/tests/query.test.ts +++ b/packages/query-db-collection/tests/query.test.ts @@ -1,6 +1,11 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest" import { QueryClient } from "@tanstack/query-core" -import { createCollection } from "@tanstack/db" +import { + createCollection, + createLiveQueryCollection, + eq, + or, +} from "@tanstack/db" import { queryCollectionOptions } from "../src/query" import type { QueryFunctionContext } from "@tanstack/query-core" import type { @@ -18,6 +23,12 @@ interface TestItem { value?: number } +interface CategorisedItem { + id: string + name: string + category: string +} + const getKey = (item: TestItem) => item.id // Helper to advance timers and allow microtasks to flush @@ -431,7 +442,9 @@ describe(`QueryCollection`, () => { }) // Verify queryFn was called with the correct context, including the meta object - expect(queryFn).toHaveBeenCalledWith(expect.objectContaining({ meta })) + expect(queryFn).toHaveBeenCalledWith( + expect.objectContaining({ meta: { ...meta, loadSubsetOptions: {} } }) + ) }) describe(`Select method testing`, () => { @@ -2606,7 +2619,6 @@ describe(`QueryCollection`, () => { expect(collection.status).toBe(`ready`) expect(collection.size).toBe(items.length) }) - it(`should allow writeDelete in onDelete handler to write to synced store`, async () => { const queryKey = [`writeDelete-in-onDelete-test`] const items: Array = [ @@ -2650,6 +2662,54 @@ describe(`QueryCollection`, () => { expect(collection.has(`1`)).toBe(false) expect(collection.size).toBe(1) }) + + it(`should transition to ready immediately in on-demand mode without loading data`, async () => { + const queryKey = [`preload-on-demand-test`] + const items: Array = [ + { id: `1`, name: `Item 1` }, + { id: `2`, name: `Item 2` }, + ] + + const queryFn = vi.fn().mockResolvedValue(items) + + const config: QueryCollectionConfig = { + id: `preload-on-demand-test`, + queryClient, + queryKey, + queryFn, + getKey, + syncMode: `on-demand`, // No initial query in on-demand mode + } + + const options = queryCollectionOptions(config) + const collection = createCollection(options) + + // Collection should be idle initially + expect(collection.status).toBe(`idle`) + expect(queryFn).not.toHaveBeenCalled() + expect(collection.size).toBe(0) + + // Preload should resolve immediately without calling queryFn + // since there's no initial query in on-demand mode + await collection.preload() + + // After preload, collection should be ready + // but queryFn should NOT have been called and collection should still be empty + expect(collection.status).toBe(`ready`) + expect(queryFn).not.toHaveBeenCalled() + expect(collection.size).toBe(0) + + // Now if we call loadSubset, it should actually load data + await collection._sync.loadSubset({}) + + await vi.waitFor(() => { + expect(collection.size).toBe(items.length) + }) + + expect(queryFn).toHaveBeenCalledTimes(1) + expect(collection.get(`1`)).toEqual(items[0]) + expect(collection.get(`2`)).toEqual(items[1]) + }) }) describe(`QueryClient defaultOptions`, () => { @@ -2788,4 +2848,660 @@ describe(`QueryCollection`, () => { customQueryClient.clear() }) }) + + describe(`Query Garbage Collection`, () => { + const isCategory = (category: `A` | `B` | `C`, where: any) => { + return ( + where && + where.type === `func` && + where.name === `eq` && + where.args[0].path[0] === `category` && + where.args[1].value === category + ) + } + + it(`should delete all rows when a single query is garbage collected`, async () => { + const queryKey = [`single-query-gc-test`] + const items: Array = [ + { id: `1`, name: `Item 1` }, + { id: `2`, name: `Item 2` }, + { id: `3`, name: `Item 3` }, + ] + + const queryFn = vi.fn().mockResolvedValue(items) + + const config: QueryCollectionConfig = { + id: `single-query-gc-test`, + queryClient, + queryKey, + queryFn, + getKey, + startSync: true, + } + + const options = queryCollectionOptions(config) + const collection = createCollection(options) + + // Wait for initial data to load + await vi.waitFor(() => { + expect(collection.size).toBe(3) + expect(collection.get(`1`)).toEqual(items[0]) + expect(collection.get(`2`)).toEqual(items[1]) + expect(collection.get(`3`)).toEqual(items[2]) + }) + + // Verify all items are in the collection + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + + // Simulate query garbage collection by removing the query from the cache + await collection.cleanup() + + // Verify all items are removed + expect(collection.has(`1`)).toBe(false) + expect(collection.has(`2`)).toBe(false) + expect(collection.has(`3`)).toBe(false) + }) + + it(`should only delete non-shared rows when one of multiple overlapping queries is GCed`, async () => { + const baseQueryKey = [`overlapping-query-test`] + + // Mock queryFn to return different data based on predicates + const queryFn = vi.fn().mockImplementation((context) => { + const { meta } = context + const loadSubsetOptions = meta?.loadSubsetOptions ?? {} + const { where } = loadSubsetOptions + + console.log(`In queryFn:\n`, JSON.stringify(where, null, 2)) + + // Query 1: items 1, 2, 3 (where: { category: 'A' }) + if (isCategory(`A`, where)) { + console.log(`Is category A`) + return Promise.resolve([ + { id: `1`, name: `Item 1` }, + { id: `2`, name: `Item 2` }, + { id: `3`, name: `Item 3` }, + ]) + } + + // Query 2: items 2, 3, 4 (where: { category: 'B' }) + if (isCategory(`B`, where)) { + return Promise.resolve([ + { id: `2`, name: `Item 2` }, + { id: `3`, name: `Item 3` }, + { id: `4`, name: `Item 4` }, + ]) + } + + // Query 3: items 3, 4, 5 (where: { category: 'C' }) + if (isCategory(`C`, where)) { + return Promise.resolve([ + { id: `3`, name: `Item 3` }, + { id: `4`, name: `Item 4` }, + { id: `5`, name: `Item 5` }, + ]) + } + return Promise.resolve([]) + }) + + const queryKey = (ctx: any) => { + if (ctx.where) { + return [...baseQueryKey, ctx.where] + } + return baseQueryKey + } + + const config: QueryCollectionConfig< + TestItem & { category: `A` | `B` | `C` } + > = { + id: `overlapping-test`, + queryClient, + queryKey, + queryFn, + getKey, + startSync: true, + syncMode: `on-demand`, + } + + const options = queryCollectionOptions(config) + const collection = createCollection(options) + + // Collection should start empty with on-demand sync mode + expect(collection.size).toBe(0) + + // Load query 1 with no predicates (items 1, 2, 3) + const query1 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `A`)) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + await query1.preload() + + // Wait for query 1 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(3) + }) + + // Add query 2 with different predicates (items 2, 3, 4) + // We abuse the `where` clause being typed as `any` to pass a category + // but in real usage this would be some Intermediate Representation of the where clause + const query2 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `B`)) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + await query2.preload() + + // Wait for query 2 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(4) // Should have items 1, 2, 3, 4 + }) + + // Add query 3 with different predicates + const query3 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `C`)) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + await query3.preload() + + // Wait for query 3 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(5) // Should have items 1, 2, 3, 4, 5 + }) + + // Verify all items are present + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + expect(collection.has(`4`)).toBe(true) + expect(collection.has(`5`)).toBe(true) + + // GC query 1 (no predicates) - should only remove item 1 (unique to query 1) + // Items 2 and 3 should remain because they're shared with other queries + await query1.cleanup() + + expect(collection.size).toBe(4) // Should have items 2, 3, 4, 5 + + // Verify item 1 is removed (it was only in query 1) + expect(collection.has(`1`)).toBe(false) + + // Verify shared items are still present + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + expect(collection.has(`4`)).toBe(true) + expect(collection.has(`5`)).toBe(true) + + // GC query 2 (where: { category: 'B' }) - should remove item 2 + // Items 3 and 4 should remain because they are shared with query 3 + await query2.cleanup() + + expect(collection.size).toBe(3) // Should have items 3, 4, 5 + + // Verify item 2 is removed (it was only in query 2) + expect(collection.has(`2`)).toBe(false) + + // Verify items 3 and 4 are still present (shared with query 3) + expect(collection.has(`3`)).toBe(true) + expect(collection.has(`4`)).toBe(true) + expect(collection.has(`5`)).toBe(true) + + // GC query 3 (where: { category: 'C' }) - should remove all remaining items + await query3.cleanup() + + expect(collection.size).toBe(0) + + // Verify all items are now removed + expect(collection.has(`3`)).toBe(false) + expect(collection.has(`4`)).toBe(false) + expect(collection.has(`5`)).toBe(false) + }) + + it(`should handle GC of queries with identical data`, async () => { + const baseQueryKey = [`identical-query-test`] + + // Mock queryFn to return the same data for all queries + const queryFn = vi.fn().mockImplementation(() => { + // All queries return the same data regardless of predicates + return Promise.resolve([ + { id: `1`, name: `Item 1`, category: `A` }, + { id: `2`, name: `Item 2`, category: `A` }, + { id: `3`, name: `Item 3`, category: `A` }, + ]) + }) + + const config: QueryCollectionConfig = { + id: `identical-test`, + queryClient, + queryKey: (ctx) => { + if (ctx.where) { + return [...baseQueryKey, ctx.where] + } + return baseQueryKey + }, + queryFn, + getKey, + startSync: true, + syncMode: `on-demand`, + } + + const options = queryCollectionOptions(config) + const collection = createCollection(options) + + // Collection should start empty with on-demand sync mode + expect(collection.size).toBe(0) + + // Load query 1 with no predicates (items 1, 2, 3) + const query1 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + await query1.preload() + + // Wait for query 1 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(3) + }) + + // Add query 2 with different predicates (but returns same data) + const query2 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `A`)) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + await query2.preload() + + // Wait for query 2 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(3) // Same data, no new items + }) + + // Add query 3 with different predicates (but returns same data) + const query3 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => + or(eq(item.category, `A`), eq(item.category, `B`)) + ) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + await query3.preload() + + // Wait for query 3 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(3) // Same data, no new items + }) + + // GC query 1 - should not remove any items (all items are shared with other queries) + await query1.cleanup() + + expect(collection.size).toBe(3) // Items still present due to other queries + + // All items should still be present + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + + // GC query 2 - should still not remove any items (all items are shared with query 3) + await query2.cleanup() + + expect(collection.size).toBe(3) // Items still present due to query 3 + + // All items should still be present + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + + // GC query 3 - should remove all items (no more queries reference them) + await query3.cleanup() + + expect(collection.size).toBe(0) + + // All items should now be removed + expect(collection.has(`1`)).toBe(false) + expect(collection.has(`2`)).toBe(false) + expect(collection.has(`3`)).toBe(false) + }) + + it(`should handle GC of empty queries gracefully`, async () => { + const baseQueryKey = [`empty-query-test`] + + // Mock queryFn to return different data based on predicates + const queryFn = vi.fn().mockImplementation((context) => { + const { meta } = context + const loadSubsetOptions = meta?.loadSubsetOptions || {} + const { where } = loadSubsetOptions + + // Query 2: some items (where: { category: 'B' }) + if (isCategory(`B`, where)) { + return Promise.resolve([ + { id: `1`, name: `Item 1`, category: `B` }, + { id: `2`, name: `Item 2`, category: `B` }, + ]) + } + + return Promise.resolve([]) + }) + + const config: QueryCollectionConfig = + { + id: `empty-test`, + queryClient, + queryKey: (ctx) => { + if (ctx.where) { + return [...baseQueryKey, ctx.where] + } + return baseQueryKey + }, + queryFn, + getKey, + startSync: true, + syncMode: `on-demand`, + } + + const options = queryCollectionOptions(config) + const collection = createCollection(options) + + // Collection should start empty with on-demand sync mode + expect(collection.size).toBe(0) + + // Load query 1 (returns empty array) + const query1 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `A`)) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + + await query1.preload() + + // Wait for query 1 data to load (still empty) + await vi.waitFor(() => { + expect(collection.size).toBe(0) // Empty query + }) + + // Add query 2 with different predicates (items 1, 2) + const query2 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `B`)) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + await query2.preload() + + // Wait for query 2 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(2) // Should have items 1, 2 + }) + + // Verify items are present + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + + // GC empty query 1 - should not affect the collection + await query1.cleanup() + + // Collection should still have items from query 2 + expect(collection.size).toBe(2) + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + + // GC non-empty query 2 - should remove its items + await query2.cleanup() + + await vi.waitFor(() => { + expect(collection.size).toBe(0) + }) + + expect(collection.has(`1`)).toBe(false) + expect(collection.has(`2`)).toBe(false) + }) + + it(`should handle concurrent GC of multiple queries`, async () => { + const baseQueryKey = [`concurrent-query-test`] + + // Mock queryFn to return different data based on predicates + const queryFn = vi.fn().mockImplementation((context) => { + const { meta } = context + const loadSubsetOptions = meta?.loadSubsetOptions || {} + const { where } = loadSubsetOptions + + // Query 1: items 1, 2 (no predicates) + if (isCategory(`C`, where)) { + return Promise.resolve([ + { id: `1`, name: `Item 1`, category: `C` }, + { id: `2`, name: `Item 2`, category: `C` }, + ]) + } + + // Query 2: items 2, 3 (where: { type: 'A' }) + if (isCategory(`A`, where)) { + return Promise.resolve([ + { id: `2`, name: `Item 2`, category: `A` }, + { id: `3`, name: `Item 3`, category: `A` }, + ]) + } + + // Query 3: items 3, 4 (where: { type: 'B' }) + if (isCategory(`B`, where)) { + return Promise.resolve([ + { id: `3`, name: `Item 3`, category: `B` }, + { id: `4`, name: `Item 4`, category: `B` }, + ]) + } + + return Promise.resolve([]) + }) + + const config: QueryCollectionConfig< + TestItem & { category: `A` | `B` | `C` } + > = { + id: `concurrent-test`, + queryClient, + queryKey: (ctx) => { + if (ctx.where) { + return [...baseQueryKey, ctx.where] + } + return baseQueryKey + }, + queryFn, + getKey, + startSync: true, + syncMode: `on-demand`, + } + + const options = queryCollectionOptions(config) + const collection = createCollection(options) + + // Collection should start empty with on-demand sync mode + expect(collection.size).toBe(0) + + // Load query 1 with no predicates (items 1, 2) + const query1 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `C`)) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + await query1.preload() + + // Wait for query 1 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(2) + }) + + // Add query 2 with different predicates (items 2, 3) + const query2 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `A`)) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + await query2.preload() + + // Wait for query 2 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(3) // Should have items 1, 2, 3 + }) + + // Add query 3 with different predicates + const query3 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `B`)) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + await query3.preload() + + // Wait for query 3 data to load + await vi.waitFor(() => { + expect(collection.size).toBe(4) // Should have items 1, 2, 3, 4 + }) + + // GC all queries concurrently + const queries = [query1, query2, query3] + const proms = queries.map((query) => query.cleanup()) + await Promise.all(proms) + + // Collection should be empty after all queries are GCed + expect(collection.size).toBe(0) + + // Verify all items are removed + expect(collection.has(`1`)).toBe(false) + expect(collection.has(`2`)).toBe(false) + expect(collection.has(`3`)).toBe(false) + expect(collection.has(`4`)).toBe(false) + }) + + it(`should deduplicate queries and handle GC correctly when queries are ordered and have a LIMIT`, async () => { + const baseQueryKey = [`deduplication-gc-test`] + + // Mock queryFn to return different data based on predicates + const queryFn = vi.fn().mockImplementation((context) => { + const { meta } = context + const loadSubsetOptions = meta?.loadSubsetOptions ?? {} + const { where, limit } = loadSubsetOptions + + // Query 1: all items with category A (no limit) + if (isCategory(`A`, where) && !limit) { + return Promise.resolve([ + { id: `1`, name: `Item 1`, category: `A` }, + { id: `2`, name: `Item 2`, category: `A` }, + { id: `3`, name: `Item 3`, category: `A` }, + ]) + } + + return Promise.resolve([]) + }) + + const config: QueryCollectionConfig = { + id: `deduplication-test`, + queryClient, + queryKey: (ctx) => { + const key = [...baseQueryKey] + if (ctx.where) { + key.push(`where`, JSON.stringify(ctx.where)) + } + if (ctx.limit) { + key.push(`limit`, ctx.limit.toString()) + } + if (ctx.orderBy) { + key.push(`orderBy`, JSON.stringify(ctx.orderBy)) + } + return key + }, + queryFn, + getKey, + startSync: true, + syncMode: `on-demand`, + } + + const options = queryCollectionOptions(config) + const collection = createCollection(options) + + // Collection should start empty with on-demand sync mode + expect(collection.size).toBe(0) + + // Execute first query: load all rows that belong to category A (returns 3 rows) + const query1 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `A`)) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + await query1.preload() + + // Wait for first query data to load + await vi.waitFor(() => { + expect(collection.size).toBe(3) + expect(queryFn).toHaveBeenCalledTimes(1) + }) + + // Verify all 3 items are present + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + + // Execute second query: load rows with category A, limit 2, ordered by ID + // This should be deduplicated since we already have all category A data + // So it will load the data from the local collection + const query2 = createLiveQueryCollection({ + query: (q) => + q + .from({ item: collection }) + .where(({ item }) => eq(item.category, `A`)) + .orderBy(({ item }) => item.id, `asc`) + .limit(2) + .select(({ item }) => ({ id: item.id, name: item.name })), + }) + await query2.preload() + + await flushPromises() + + // Second query should still only have been called once + // since query2 is deduplicated so it is executed against the local collection + // and not via queryFn + expect(queryFn).toHaveBeenCalledTimes(1) + + // Collection should still have all 3 items (deduplication doesn't remove data) + expect(collection.size).toBe(3) + expect(collection.has(`1`)).toBe(true) + expect(collection.has(`2`)).toBe(true) + expect(collection.has(`3`)).toBe(true) + + // GC the first query (all category A without limit) + await query1.cleanup() + + expect(collection.size).toBe(2) // Should only have items 1 and 2 because they are still referenced by query 2 + + // Verify that only row 3 is removed (it was only referenced by query 1) + expect(collection.has(`1`)).toBe(true) // Still present (referenced by query 2) + expect(collection.has(`2`)).toBe(true) // Still present (referenced by query 2) + expect(collection.has(`3`)).toBe(false) // Removed (only referenced by query 1) + + // GC the second query (category A with limit 2) + await query2.cleanup() + + // Wait for final GC to process + expect(collection.size).toBe(0) + }) + }) })