Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: streaming responses from rust server #462

Draft
wants to merge 72 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
6132b13
scaffolding for rust server
domoritz Mar 20, 2024
e13550d
update deps
domoritz Jun 12, 2024
aaf3044
basic duckdb server
domoritz Jul 6, 2024
370f3a7
error handling
domoritz Jul 6, 2024
1b59ea7
abstract database
domoritz Jul 6, 2024
49aec67
make it work
domoritz Jul 6, 2024
d431610
http/2
domoritz Jul 6, 2024
1348411
rust server script
domoritz Jul 6, 2024
a959954
bundles
domoritz Jul 7, 2024
f18320a
Merge branch 'main' into dom/rust
domoritz Jul 7, 2024
da983a2
option for https
domoritz Jul 7, 2024
884f139
support bundle handlings in connnectors
domoritz Jul 7, 2024
af2d675
simplify server
domoritz Jul 7, 2024
ed4bec9
priority queue as class
domoritz Jul 7, 2024
db1c70c
add bundle name to python
domoritz Jul 7, 2024
463930b
bundle handling
domoritz Jul 7, 2024
7214c07
Merge branch 'main' into dom/rust
domoritz Jul 7, 2024
b1e7cc9
fix
domoritz Jul 7, 2024
e73d430
update deps
domoritz Jul 7, 2024
5ba673b
don't collect vectors, return real json
domoritz Jul 7, 2024
6f5a05a
fix bundle and caching
domoritz Jul 7, 2024
c7b5591
implement persists
domoritz Jul 7, 2024
996a096
cache in bundle creating, simplify bundle loading
domoritz Jul 7, 2024
76c9701
rust checks
domoritz Jul 7, 2024
500262b
rust tests
domoritz Jul 7, 2024
e10dd8c
Merge branch 'main' into dom/rust
domoritz Jul 7, 2024
d1394e4
update test names
domoritz Jul 7, 2024
e4c17b6
tests
domoritz Jul 7, 2024
ffbbf2f
format
domoritz Jul 7, 2024
edc5f27
clippy
domoritz Jul 7, 2024
32a708e
Add compression and tracing
domoritz Jul 7, 2024
ca70b30
Https and https
domoritz Jul 8, 2024
431e166
refactor and add ws support
domoritz Jul 12, 2024
94f543e
clean up
domoritz Jul 12, 2024
764e4fd
update and add socket
domoritz Jul 12, 2024
4442015
update deps and fix lint issues
domoritz Jul 12, 2024
52a8a2a
format
domoritz Jul 12, 2024
0921a27
simplify
domoritz Jul 13, 2024
b40b3af
move arrow up
domoritz Jul 14, 2024
b3af286
avoid clone in state
domoritz Jul 14, 2024
a042ebf
avoid another arc
domoritz Jul 14, 2024
3b1cd5c
Use a connection pool rather than a lock on the connection
domoritz Jul 14, 2024
3583ec5
streaming arrow
domoritz Jul 15, 2024
e22544a
debug handler
domoritz Jul 15, 2024
322c219
style
domoritz Jul 18, 2024
2c74243
Merge branch 'main' into dom/rust
domoritz Jul 18, 2024
277b239
rename server and add more docs
domoritz Jul 22, 2024
e18bbe5
minor refactoring
domoritz Jul 22, 2024
a33d43b
test arrow response
domoritz Jul 22, 2024
40a0c4c
add badge
domoritz Jul 22, 2024
3342f60
Release instructions
domoritz Jul 22, 2024
9318522
update installation instructions
domoritz Jul 22, 2024
4f7c391
ignore data
domoritz Jul 22, 2024
627b113
license
domoritz Jul 22, 2024
9eed9e5
Use ? instead of unwrap in main
domoritz Jul 22, 2024
8b69c2a
Support also just http
domoritz Jul 23, 2024
9435702
more tests, fix cache key
domoritz Jul 23, 2024
fcb4220
test bundling
domoritz Jul 23, 2024
9cc7acd
style
domoritz Jul 23, 2024
696f557
clean up
domoritz Jul 23, 2024
b163348
bigger pool
domoritz Jul 23, 2024
b77d791
benchmarks
domoritz Jul 23, 2024
d12f412
update criterion
domoritz Jul 25, 2024
9d108d7
update deps
domoritz Jul 28, 2024
d78cfd1
Merge branch 'main' into dom/rust
domoritz Aug 2, 2024
20d76fb
chore: update deps
domoritz Aug 2, 2024
5307edf
update deps
domoritz Aug 14, 2024
dd086f5
clean up
domoritz Aug 14, 2024
ffc034a
Stream Arrow RecordBatches using mpsc channel and tokio_streams (#481)
jonmmease Aug 14, 2024
e218ac3
Merge branch 'dom/rust' into dom/rust-stream
domoritz Aug 14, 2024
f23763c
clean up
domoritz Aug 14, 2024
711f0c3
fix warnings
domoritz Aug 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ updates:
directory: "/packages/duckdb-server"
schedule:
interval: monthly
- package-ecosystem: cargo
directory: "/packages/duckdb-server-rust"
schedule:
interval: monthly
35 changes: 34 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- run: npm run test

python:
name: Test on Python
name: Test in Python

runs-on: ubuntu-latest

Expand Down Expand Up @@ -64,3 +64,36 @@ jobs:
hatch build
hatch fmt --check
hatch run test:cov

rust:
name: Test in Rust

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Install Rust
uses: dtolnay/rust-toolchain@stable
with:
components: clippy, rust-src, rustfmt

- name: Check formatting
run: |
cd packages/duckdb-server-rust
cargo fmt -- --check
- name: Check
run: |
cd packages/duckdb-server-rust
cargo check
- name: Clippy
run: |
cd packages/duckdb-server-rust
cargo clippy --all -- -D warnings
- name: Build
run: |
cd packages/duckdb-server-rust
cargo build --verbose
- name: Test
run: |
cd packages/duckdb-server-rust
cargo test --verbose
22 changes: 13 additions & 9 deletions dev/bundle.html
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<select id="connectors">
<option value="socket" selected>Socket</option>
<option value="rest">REST</option>
<option value="rest_https">REST (HTTPS)</option>
</select>
</span>
</header>
Expand Down Expand Up @@ -47,28 +48,31 @@

async function create() {
try {
await coordinator().createBundle('test', queries);
output.innerText = "Created bundle"
await coordinator.createBundle('test', queries);
output.innerText = "Created bundle";
} catch(err) {
output.innerText = `Error: ${err}`
output.innerText = `Error: ${err}`;
console.log(err.stack);
}
}

async function load() {
try {
await coordinator().loadBundle('test');
output.innerText = "Loaded bundle"
await coordinator.loadBundle('test');
output.innerText = "Loaded bundle";
} catch(err) {
output.innerText = `Error: ${err}`
output.innerText = `Error: ${err}`;
console.log(err.stack);
}
}

async function query() {
try {
const result = await coordinator().query(queries[1], {cache: false});
output.innerText = `Result = ${result}`
const result = await coordinator.query(queries[1], {cache: false});
output.innerText = `Result = ${result}`;
} catch(err) {
output.innerText = `Error: ${err}`
output.innerText = `Error: ${err}`;
console.log(err.stack);
}
}
</script>
Expand Down
1 change: 1 addition & 0 deletions dev/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
<option value="wasm" selected>WASM</option>
<option value="socket">Socket</option>
<option value="rest">REST</option>
<option value="rest_https">REST (HTTPS)</option>
</select>
</span>

Expand Down
11 changes: 7 additions & 4 deletions dev/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ export function clear() {

let wasm;

export async function setDatabaseConnector(type, options) {
export async function setDatabaseConnector(type) {
let connector;
switch (type) {
case 'socket':
connector = socketConnector(options);
connector = socketConnector();
break;
case 'rest':
connector = restConnector(options);
connector = restConnector();
break;
case 'rest_https':
connector = restConnector('https://localhost:3000/');
break;
case 'wasm':
connector = wasm || (wasm = wasmConnector(options));
connector = wasm || (wasm = wasmConnector());
break;
default:
throw new Error(`Unrecognized connector type: ${type}`);
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"lint": "lerna run lint",
"test": "lerna run test",
"server": "cd packages/duckdb-server && hatch run serve",
"server:rust": "cd packages/duckdb-server-rust && cargo run",
"server:node": "nodemon packages/duckdb/bin/run-server.js",
"dev": "vite",
"release": "npm run test && npm run lint && lerna publish && npm run release:python",
Expand Down
17 changes: 16 additions & 1 deletion packages/core/src/Coordinator.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export class Coordinator {
consolidate = true,
indexes = {}
} = {}) {
/** @type {QueryManager} */
this.manager = manager;
this.manager.cache(cache);
this.manager.consolidate(consolidate);
Expand Down Expand Up @@ -152,11 +153,25 @@ export class Coordinator {
return this.query(query, { ...options, cache: true, priority: Priority.Low });
}

/**
* Create a bundle of queries that can be loaded into the cache.
*
* @param {string} name The name of the bundle.
* @param {[string | {sql: string}, {alias: string}]} queries The queries to save into the bundle.
* @param {number} priority Request priority.
* @returns
*/
createBundle(name, queries, priority = Priority.Low) {
const options = { name, queries };
const options = { name, queries: queries.map(q => typeof q == 'string' ? {sql: q} : q) };
return this.manager.request({ type: 'create-bundle', options }, priority);
}

/**
* Load a bundle into the cache.
* @param {string} name The name of the bundle.
* @param {number} priority Request priority.
* @returns
*/
loadBundle(name, priority = Priority.High) {
const options = { name };
return this.manager.request({ type: 'load-bundle', options }, priority);
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/QueryManager.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { consolidator } from './QueryConsolidator.js';
import { lruCache, voidCache } from './util/cache.js';
import { priorityQueue } from './util/priority-queue.js';
import { PriorityQueue } from './util/priority-queue.js';
import { QueryResult } from './util/query-result.js';

export const Priority = { High: 0, Normal: 1, Low: 2 };

export class QueryManager {
constructor() {
this.queue = priorityQueue(3);
this.queue = new PriorityQueue(3);
this.db = null;
this.clientCache = null;
this._logger = null;
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/connectors/rest.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export function restConnector(uri = 'http://localhost:3000/') {
/**
* Query the DuckDB server.
* @param {object} query
* @param {'exec' | 'arrow' | 'json'} [query.type] The query type: 'exec', 'arrow', or 'json'.
* @param {'exec' | 'arrow' | 'json' | 'create-bundle' | 'load-bundle'} [query.type] The query type.
* @param {string} query.sql A SQL query string.
* @returns the query result
*/
Expand All @@ -19,9 +19,9 @@ export function restConnector(uri = 'http://localhost:3000/') {
body: JSON.stringify(query)
});

return query.type === 'exec' ? req
return query.type === 'json' ? (await req).json()
: query.type === 'arrow' ? tableFromIPC(req)
: (await req).json();
: req;
}
};
}
2 changes: 1 addition & 1 deletion packages/core/src/connectors/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export function socketConnector(uri = 'ws://localhost:3000/') {
/**
* Query the DuckDB server.
* @param {object} query
* @param {'exec' | 'arrow' | 'json'} [query.type] The query type: 'exec', 'arrow', or 'json'.
* @param {'exec' | 'arrow' | 'json' | 'create-bundle' | 'load-bundle'} [query.type] The query type.
* @param {string} query.sql A SQL query string.
* @returns the query result
*/
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/connectors/wasm.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ export function wasmConnector(options = {}) {
/**
* Query the DuckDB-WASM instance.
* @param {object} query
* @param {'exec' | 'arrow' | 'json'} [query.type] The query type: 'exec', 'arrow', or 'json'.
* @param {'exec' | 'arrow' | 'json' | 'create-bundle' | 'load-bundle'} [query.type] The query type.
* @param {string} query.sql A SQL query string.
* @returns the query result
*/
query: async query => {
const { type, sql } = query;
const con = await getConnection();
const result = await con.query(sql);
return type === 'exec' ? undefined
return type === 'json' ? result.toArray()
: type === 'arrow' ? result
: result.toArray();
: undefined;
}
};
}
Expand Down
Loading