Skip to content

Commit

Permalink
namespace.resumableQuery (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
fahreddinozcan authored Feb 7, 2025
1 parent 35253f2 commit 3f5cbd2
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 0 deletions.
68 changes: 68 additions & 0 deletions src/commands/client/namespace/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,72 @@ describe("NAMESPACE", () => {

expect(fetchData[0]?.metadata?.upstash).toBe("test-1-updated");
});

describe("RESUMABLE QUERY", () => {
afterAll(async () => await resetIndexes());

const index = new Index({
url: process.env.UPSTASH_VECTOR_REST_URL!,
token: process.env.UPSTASH_VECTOR_REST_TOKEN!,
});

test("should fetch results in batches from namespace", async () => {
const namespace = index.namespace("test-namespace-resumable");

await namespace.upsert([
{ id: 1, vector: range(0, 384) },
{ id: 2, vector: range(0, 384) },
{ id: 3, vector: range(0, 384) },
]);

await awaitUntilIndexed(index);

const { result, fetchNext, stop } = await namespace.resumableQuery({
maxIdle: 3600,
topK: 2,
vector: range(0, 384),
includeMetadata: true,
});

expect(result).toBeDefined();
expect(result.length).toBe(2);

const nextBatch = await fetchNext(1);
expect(nextBatch).toBeDefined();
expect(nextBatch.length).toBe(1);

await stop();
}, 10_000);

test("should throw error after stopping query", async () => {
const namespace = index.namespace("test-namespace-resumable-stop");

await namespace.upsert({
id: "test",
vector: range(0, 384),
});

await awaitUntilIndexed(index);

const query = await namespace.resumableQuery({
maxIdle: 3600,
topK: 10,
vector: range(0, 384),
});

expect(query.result).toBeDefined();
await query.stop();

try {
await query.fetchNext(5);
expect(false).toBe(true); // Force failure if no error thrown
} catch (error) {
if (error instanceof Error)
expect(error.message).toBe(
"The resumable query has already been stopped. Please start another resumable query."
);
}
});
});
});

37 changes: 37 additions & 0 deletions src/commands/client/namespace/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
UpdateCommand,
UpsertCommand,
} from "@commands/client";
import { ResumableQuery, type ResumableQueryPayload } from "../resumable-query";
import type { Dict } from "@commands/client/types";
import type { Requester } from "@http";
import type { CommandArgs } from "../../../vector";
Expand Down Expand Up @@ -141,6 +142,42 @@ export class Namespace<TIndexMetadata extends Dict = Dict> {
query = <TMetadata extends Dict = TIndexMetadata>(args: CommandArgs<typeof QueryCommand>) =>
new QueryCommand<TMetadata>(args, { namespace: this.namespace }).exec(this.client);


/**
* Initializes a resumable query operation on the vector database.
* This method allows for querying large result sets in multiple chunks or implementing pagination.
*
* @template TMetadata
* @param {ResumableQueryPayload} args - The arguments for the resumable query.
* @param {number} args.maxIdle - The maximum idle time in seconds before the query session expires.
* @param {number} args.topK - The number of top results to return in each fetch operation.
* @param {number[]} args.vector - The query vector used for similarity search.
* @param {boolean} [args.includeMetadata] - Whether to include metadata in the query results.
* @param {boolean} [args.includeVectors] - Whether to include vectors in the query results.
* @param {Object} [options] - Additional options for the query.
* @returns {Promise<ResumableQuery<TMetadata>>} A promise that resolves to a ResumableQuery object.
* @example
* const { result, fetchNext, stop } = await index.namespace("ns").resumableQuery({
* maxIdle: 3600,
* topK: 50,
* vector: [0.1, 0.2, 0.3, ...],
* includeMetadata: true,
* includeVectors: true
* }, { namespace: 'my-namespace' });
*
* const firstBatch = await fetchNext(10);
* const secondBatch = await fetchNext(10);
* await stop(); // End the query session
*/
resumableQuery = async <TMetadata extends Dict = TIndexMetadata>(
args: ResumableQueryPayload,
) => {
const resumableQuery = new ResumableQuery<TMetadata>(args, this.client, this.namespace);
const initialQuery = await resumableQuery.start();
const { fetchNext, stop } = resumableQuery;
return { fetchNext, stop, result: initialQuery.scores };
};

/**
* Deletes a specific item or items from the index namespace by their ID(s). *
*
Expand Down

0 comments on commit 3f5cbd2

Please sign in to comment.