Skip to content

Commit

Permalink
Add allow_stream_result to startPendingQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
Y-- committed Oct 28, 2024
1 parent a9a38bb commit c3ef4ec
Show file tree
Hide file tree
Showing 11 changed files with 35 additions and 21 deletions.
2 changes: 1 addition & 1 deletion lib/include/duckdb/web/webdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class WebDB {
/// Run a query and return the materialized query result
arrow::Result<std::shared_ptr<arrow::Buffer>> RunQuery(std::string_view text);
/// Execute a query as pending query and return the stream schema when finished
arrow::Result<std::shared_ptr<arrow::Buffer>> PendingQuery(std::string_view text);
arrow::Result<std::shared_ptr<arrow::Buffer>> PendingQuery(std::string_view text, bool allow_stream_result);
/// Poll a pending query and return the schema when finished
arrow::Result<std::shared_ptr<arrow::Buffer>> PollPendingQuery();
/// Cancel a pending query
Expand Down
5 changes: 3 additions & 2 deletions lib/src/webdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,11 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> WebDB::Connection::RunQuery(std::s
}
}

arrow::Result<std::shared_ptr<arrow::Buffer>> WebDB::Connection::PendingQuery(std::string_view text) {
arrow::Result<std::shared_ptr<arrow::Buffer>> WebDB::Connection::PendingQuery(std::string_view text,
bool allow_stream_result) {
try {
// Send the query
auto result = connection_.PendingQuery(std::string{text});
auto result = connection_.PendingQuery(std::string{text}, allow_stream_result);
if (result->HasError()) return arrow::Status{arrow::StatusCode::ExecutionError, std::move(result->GetError())};
current_pending_query_result_ = std::move(result);
current_pending_query_was_canceled_ = false;
Expand Down
5 changes: 3 additions & 2 deletions lib/src/webdb_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,10 @@ void duckdb_web_query_run_buffer(WASMResponse* packed, ConnectionHdl connHdl, co
WASMResponseBuffer::Get().Store(*packed, std::move(r));
}
/// Start a pending query
void duckdb_web_pending_query_start(WASMResponse* packed, ConnectionHdl connHdl, const char* script) {
void duckdb_web_pending_query_start(WASMResponse* packed, ConnectionHdl connHdl, const char* script,
bool allow_stream_result) {
auto c = reinterpret_cast<WebDB::Connection*>(connHdl);
auto r = c->PendingQuery(script);
auto r = c->PendingQuery(script, allow_stream_result);
WASMResponseBuffer::Get().Store(*packed, std::move(r));
}
/// Poll a pending query
Expand Down
13 changes: 9 additions & 4 deletions packages/duckdb-wasm/src/bindings/bindings_base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
/** Send a query and return the full result */
public runQuery(conn: number, text: string): Uint8Array {
const BUF = TEXT_ENCODER.encode(text);
const bufferPtr = this.mod._malloc(BUF.length );
const bufferOfs = this.mod.HEAPU8.subarray(bufferPtr, bufferPtr + BUF.length );
const bufferPtr = this.mod._malloc(BUF.length);
const bufferOfs = this.mod.HEAPU8.subarray(bufferPtr, bufferPtr + BUF.length);
bufferOfs.set(BUF);
const [s, d, n] = callSRet(this.mod, 'duckdb_web_query_run_buffer', ['number', 'number', 'number'], [conn, bufferPtr, BUF.length]);
if (s !== StatusCode.SUCCESS) {
Expand All @@ -182,8 +182,13 @@ export abstract class DuckDBBindingsBase implements DuckDBBindings {
* On null, the query has to be executed using `pollPendingQuery` until that returns != null.
* Results can then be fetched using `fetchQueryResults`
*/
public startPendingQuery(conn: number, text: string): Uint8Array | null {
const [s, d, n] = callSRet(this.mod, 'duckdb_web_pending_query_start', ['number', 'string'], [conn, text]);
public startPendingQuery(conn: number, text: string, allowStreamResult: boolean = false): Uint8Array | null {
const [s, d, n] = callSRet(
this.mod,
'duckdb_web_pending_query_start',
['number', 'string', 'boolean'],
[conn, text, allowStreamResult],
);
if (s !== StatusCode.SUCCESS) {
throw new Error(readString(this.mod, d, n));
}
Expand Down
2 changes: 1 addition & 1 deletion packages/duckdb-wasm/src/bindings/bindings_interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface DuckDBBindings {
connect(): DuckDBConnection;
disconnect(conn: number): void;
runQuery(conn: number, text: string): Uint8Array;
startPendingQuery(conn: number, text: string): Uint8Array | null;
startPendingQuery(conn: number, text: string, allowStreamResult: boolean): Uint8Array | null;
pollPendingQuery(conn: number): Uint8Array | null;
cancelPendingQuery(conn: number): boolean;
fetchQueryResults(conn: number): Uint8Array;
Expand Down
5 changes: 3 additions & 2 deletions packages/duckdb-wasm/src/bindings/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ export class DuckDBConnection {
/** Send a query */
public async send<T extends { [key: string]: arrow.DataType } = any>(
text: string,
allowStreamResult: boolean = false,
): Promise<arrow.RecordBatchStreamReader<T>> {
let header = this._bindings.startPendingQuery(this._conn, text);
let header = this._bindings.startPendingQuery(this._conn, text, allowStreamResult);
while (header == null) {
header = await new Promise((resolve, reject) => {
try {
Expand Down Expand Up @@ -79,7 +80,7 @@ export class DuckDBConnection {

/** Insert an arrow table */
public insertArrowTable(table: arrow.Table, options: ArrowInsertOptions): void {
const buffer = arrow.tableToIPC(table, 'stream');
const buffer = arrow.tableToIPC(table, 'stream');
this.insertArrowFromIPCStream(buffer, options);
}
/** Insert an arrow table from an ipc stream */
Expand Down
13 changes: 9 additions & 4 deletions packages/duckdb-wasm/src/parallel/async_bindings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,16 @@ export class AsyncDuckDB implements AsyncDuckDBBindings {
}

/** Start a pending query */
public async startPendingQuery(conn: ConnectionID, text: string): Promise<Uint8Array | null> {
const task = new WorkerTask<WorkerRequestType.START_PENDING_QUERY, [ConnectionID, string], Uint8Array | null>(
public async startPendingQuery(
conn: ConnectionID,
text: string,
allowStreamResult: boolean = false,
): Promise<Uint8Array | null> {
const task = new WorkerTask<
WorkerRequestType.START_PENDING_QUERY,
[conn, text],
);
[ConnectionID, string, boolean],
Uint8Array | null
>(WorkerRequestType.START_PENDING_QUERY, [conn, text, allowStreamResult]);
return await this.postTask(task);
}
/** Poll a pending query */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export interface AsyncDuckDBBindings {

disconnect(conn: number): Promise<void>;
runQuery(conn: number, text: string): Promise<Uint8Array>;
startPendingQuery(conn: number, text: string): Promise<Uint8Array | null>;
startPendingQuery(conn: number, text: string, allowStreamResult: boolean): Promise<Uint8Array | null>;
pollPendingQuery(conn: number): Promise<Uint8Array | null>;
cancelPendingQuery(conn: number): Promise<boolean>;
fetchQueryResults(conn: number): Promise<Uint8Array>;
Expand Down
3 changes: 2 additions & 1 deletion packages/duckdb-wasm/src/parallel/async_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export class AsyncDuckDBConnection {
/** Send a query */
public async send<T extends { [key: string]: arrow.DataType } = any>(
text: string,
allowStreamResult: boolean = false,
): Promise<arrow.AsyncRecordBatchStreamReader<T>> {
this._bindings.logger.log({
timestamp: new Date(),
Expand All @@ -59,7 +60,7 @@ export class AsyncDuckDBConnection {
event: LogEvent.RUN,
value: text,
});
let header = await this._bindings.startPendingQuery(this._conn, text);
let header = await this._bindings.startPendingQuery(this._conn, text, allowStreamResult);
while (header == null) {
header = await this._bindings.pollPendingQuery(this._conn);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/duckdb-wasm/src/parallel/worker_dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ export abstract class AsyncDuckDBDispatcher implements Logger {
break;
}
case WorkerRequestType.START_PENDING_QUERY: {
const result = this._bindings.startPendingQuery(request.data[0], request.data[1]);
const result = this._bindings.startPendingQuery(request.data[0], request.data[1], request.data[2]);
const transfer = [];
if (result) {
transfer.push(result.buffer);
Expand Down
4 changes: 2 additions & 2 deletions packages/duckdb-wasm/src/parallel/worker_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ export type WorkerRequestVariant =
| WorkerRequest<WorkerRequestType.RUN_PREPARED, [number, number, any[]]>
| WorkerRequest<WorkerRequestType.RUN_QUERY, [number, string]>
| WorkerRequest<WorkerRequestType.SEND_PREPARED, [number, number, any[]]>
| WorkerRequest<WorkerRequestType.START_PENDING_QUERY, [number, string]>
| WorkerRequest<WorkerRequestType.START_PENDING_QUERY, [number, string, boolean]>
| WorkerRequest<WorkerRequestType.TOKENIZE, string>;

export type WorkerResponseVariant =
Expand Down Expand Up @@ -198,7 +198,7 @@ export type WorkerTaskVariant =
| WorkerTask<WorkerRequestType.RUN_PREPARED, [number, number, any[]], Uint8Array>
| WorkerTask<WorkerRequestType.RUN_QUERY, [ConnectionID, string], Uint8Array>
| WorkerTask<WorkerRequestType.SEND_PREPARED, [number, number, any[]], Uint8Array>
| WorkerTask<WorkerRequestType.START_PENDING_QUERY, [ConnectionID, string], Uint8Array | null>
| WorkerTask<WorkerRequestType.START_PENDING_QUERY, [ConnectionID, string, boolean], Uint8Array | null>
| WorkerTask<WorkerRequestType.POLL_PENDING_QUERY, ConnectionID, Uint8Array | null>
| WorkerTask<WorkerRequestType.CANCEL_PENDING_QUERY, ConnectionID, boolean>
| WorkerTask<WorkerRequestType.TOKENIZE, string, ScriptTokens>;

0 comments on commit c3ef4ec

Please sign in to comment.