From 3f5cbd210bcf886f3777d35ae0828a514a43c17c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fahreddin=20=C3=96zcan?= <88107904+fahreddinozcan@users.noreply.github.com> Date: Fri, 7 Feb 2025 14:17:14 +0300 Subject: [PATCH] namespace.resumableQuery (#56) --- src/commands/client/namespace/index.test.ts | 68 +++++++++++++++++++++ src/commands/client/namespace/index.ts | 37 +++++++++++ 2 files changed, 105 insertions(+) diff --git a/src/commands/client/namespace/index.test.ts b/src/commands/client/namespace/index.test.ts index 8f57359..b9af183 100644 --- a/src/commands/client/namespace/index.test.ts +++ b/src/commands/client/namespace/index.test.ts @@ -105,4 +105,72 @@ describe("NAMESPACE", () => { expect(fetchData[0]?.metadata?.upstash).toBe("test-1-updated"); }); + + describe("RESUMABLE QUERY", () => { + afterAll(async () => await resetIndexes()); + + const index = new Index({ + url: process.env.UPSTASH_VECTOR_REST_URL!, + token: process.env.UPSTASH_VECTOR_REST_TOKEN!, + }); + + test("should fetch results in batches from namespace", async () => { + const namespace = index.namespace("test-namespace-resumable"); + + await namespace.upsert([ + { id: 1, vector: range(0, 384) }, + { id: 2, vector: range(0, 384) }, + { id: 3, vector: range(0, 384) }, + ]); + + await awaitUntilIndexed(index); + + const { result, fetchNext, stop } = await namespace.resumableQuery({ + maxIdle: 3600, + topK: 2, + vector: range(0, 384), + includeMetadata: true, + }); + + expect(result).toBeDefined(); + expect(result.length).toBe(2); + + const nextBatch = await fetchNext(1); + expect(nextBatch).toBeDefined(); + expect(nextBatch.length).toBe(1); + + await stop(); + }, 10_000); + + test("should throw error after stopping query", async () => { + const namespace = index.namespace("test-namespace-resumable-stop"); + + await namespace.upsert({ + id: "test", + vector: range(0, 384), + }); + + await awaitUntilIndexed(index); + + const query = await namespace.resumableQuery({ + maxIdle: 3600, + topK: 10, + vector: range(0, 384), + }); + + expect(query.result).toBeDefined(); + await query.stop(); + + try { + await query.fetchNext(5); + expect(false).toBe(true); // Force failure if no error thrown + } catch (error) { + if (error instanceof Error) + expect(error.message).toBe( + "The resumable query has already been stopped. Please start another resumable query." + ); + } + }); + }); }); + diff --git a/src/commands/client/namespace/index.ts b/src/commands/client/namespace/index.ts index 1e507f0..cf1af2b 100644 --- a/src/commands/client/namespace/index.ts +++ b/src/commands/client/namespace/index.ts @@ -7,6 +7,7 @@ import { UpdateCommand, UpsertCommand, } from "@commands/client"; +import { ResumableQuery, type ResumableQueryPayload } from "../resumable-query"; import type { Dict } from "@commands/client/types"; import type { Requester } from "@http"; import type { CommandArgs } from "../../../vector"; @@ -141,6 +142,42 @@ export class Namespace { query = (args: CommandArgs) => new QueryCommand(args, { namespace: this.namespace }).exec(this.client); + + /** + * Initializes a resumable query operation on the vector database. + * This method allows for querying large result sets in multiple chunks or implementing pagination. + * + * @template TMetadata + * @param {ResumableQueryPayload} args - The arguments for the resumable query. + * @param {number} args.maxIdle - The maximum idle time in seconds before the query session expires. + * @param {number} args.topK - The number of top results to return in each fetch operation. + * @param {number[]} args.vector - The query vector used for similarity search. + * @param {boolean} [args.includeMetadata] - Whether to include metadata in the query results. + * @param {boolean} [args.includeVectors] - Whether to include vectors in the query results. + * @param {Object} [options] - Additional options for the query. + * @returns {Promise>} A promise that resolves to a ResumableQuery object. + * @example + * const { result, fetchNext, stop } = await index.namespace("ns").resumableQuery({ + * maxIdle: 3600, + * topK: 50, + * vector: [0.1, 0.2, 0.3, ...], + * includeMetadata: true, + * includeVectors: true + * }, { namespace: 'my-namespace' }); + * + * const firstBatch = await fetchNext(10); + * const secondBatch = await fetchNext(10); + * await stop(); // End the query session + */ + resumableQuery = async ( + args: ResumableQueryPayload, + ) => { + const resumableQuery = new ResumableQuery(args, this.client, this.namespace); + const initialQuery = await resumableQuery.start(); + const { fetchNext, stop } = resumableQuery; + return { fetchNext, stop, result: initialQuery.scores }; + }; + /** * Deletes a specific item or items from the index namespace by their ID(s). * *