Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions benchmarking/src/app/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Link, Route, BrowserRouter as Router, Routes } from 'react-router-dom';
import { IndexedDBMProvider } from './dbm-context/indexed-dbm-context';
import { MemoryDBMProvider } from './dbm-context/memory-dbm-context';
import { NativeDBMProvider } from './dbm-context/native-dbm-context';
import { OPFSDBMProvider } from './dbm-context/opfs-dbm-context';
import { ParallelIndexedDBMProvider } from './dbm-context/parallel-indexed-dbm-context';
import { ParallelMemoryDBMProvider } from './dbm-context/parallel-memory-dbm-context';
import { RawDBMProvider } from './dbm-context/raw-dbm-context';
Expand All @@ -23,6 +24,9 @@ export function App() {
<li>
<Link to="/indexed-dbm">IndexedDB DuckDB</Link>
</li>
<li>
<Link to="/opfs-dbm">OPFS DuckDB</Link>
</li>
<li>
<Link to="/parallel-memory-dbm">Parallel Memory DuckDB</Link>
</li>
Expand Down Expand Up @@ -75,6 +79,19 @@ export function App() {
</div>
}
/>
<Route
path="/opfs-dbm"
element={
<div>
<h1>OPFS DuckDB</h1>
<OPFSDBMProvider>
<FileLoader>
<QueryBenchmarking />
</FileLoader>
</OPFSDBMProvider>
</div>
}
/>
<Route
path="/parallel-memory-dbm"
element={
Expand Down
25 changes: 24 additions & 1 deletion benchmarking/src/app/dbm-context/instance-manager.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
import { InstanceManagerType } from '@devrev/meerkat-dbm';
import * as duckdb from '@duckdb/duckdb-wasm';
import { AsyncDuckDB, LogEntryVariant } from '@duckdb/duckdb-wasm';
import {
AsyncDuckDB,
DuckDBAccessMode,
LogEntryVariant,
} from '@duckdb/duckdb-wasm';
const JSDELIVR_BUNDLES = duckdb.getJsDelivrBundles();

export interface InstanceManagerOptions {
path?: string;
}

export class InstanceManager implements InstanceManagerType {
private db: AsyncDuckDB | null = null;

private path?: string;

constructor(options: InstanceManagerOptions = {}) {
this.path = options.path;
}

private async initDB() {
const bundle = await duckdb.selectBundle(JSDELIVR_BUNDLES);

Expand All @@ -22,6 +36,15 @@ export class InstanceManager implements InstanceManagerType {
};
const db = new duckdb.AsyncDuckDB(logger, worker);
await db.instantiate(bundle.mainModule, bundle.pthreadWorker);

// Open with configured path if provided
if (this.path) {
await db.open({
path: this.path,
accessMode: DuckDBAccessMode.READ_WRITE,
});
}

URL.revokeObjectURL(worker_url);
return db;
}
Expand Down
61 changes: 61 additions & 0 deletions benchmarking/src/app/dbm-context/opfs-dbm-context.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { DBM, FileManagerType, OPFSFileManager } from '@devrev/meerkat-dbm';
import log from 'loglevel';
import React, { useState } from 'react';
import { DBMContext } from '../hooks/dbm-context';
import { useClassicEffect } from '../hooks/use-classic-effect';
import { InstanceManager } from './instance-manager';
import { useAsyncDuckDB } from './use-async-duckdb';

export const OPFSDBMProvider = ({ children }: { children: JSX.Element }) => {
const fileManagerRef = React.useRef<FileManagerType | null>(null);
const [dbm, setdbm] = useState<DBM | null>(null);

const instanceManagerRef = React.useRef<InstanceManager>(
new InstanceManager({ path: 'opfs://meerkat.db' })
);

const dbState = useAsyncDuckDB();

useClassicEffect(() => {
if (!dbState) {
return;
}
fileManagerRef.current = new OPFSFileManager({
instanceManager: instanceManagerRef.current,
fetchTableFileBuffers: async (table) => {
return [];
},
logger: log,
onEvent: (event) => {
console.info(event);
},
});

log.setLevel('DEBUG');
const dbm = new DBM({
instanceManager: instanceManagerRef.current,
fileManager: fileManagerRef.current,
logger: log,
onEvent: (event) => {
console.info(event);
},
});
setdbm(dbm);
}, [dbState]);

if (!dbm || !fileManagerRef.current) {
return <div>Loading...</div>;
}

return (
<DBMContext.Provider
value={{
dbm,
fileManager: fileManagerRef.current,
fileManagerType: 'opfs',
}}
>
{children}
</DBMContext.Provider>
);
};
14 changes: 11 additions & 3 deletions benchmarking/src/app/file-loader/file-loader.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,17 @@ export const FileLoader = ({ children }: { children: JSX.Element }) => {
});

// Create views for raw and memory file manager after registering the files
if (fileManagerType === 'raw' || fileManagerType === 'memory') {
await dbm.query(generateViewQuery('taxi', ['taxi.parquet']));
await dbm.query(generateViewQuery('taxi_json', ['taxi_json.parquet']));
if (
fileManagerType === 'raw' ||
fileManagerType === 'memory' ||
fileManagerType === 'opfs'
) {
const useTable = fileManagerType === 'opfs';

await dbm.query(generateViewQuery('taxi', ['taxi.parquet'], useTable));
await dbm.query(
generateViewQuery('taxi_json', ['taxi_json.parquet'], useTable)
);
}

setIsFileLoader(true);
Expand Down
1 change: 1 addition & 0 deletions benchmarking/src/app/hooks/dbm-context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export type DBMContextType = {
| 'raw'
| 'memory'
| 'indexdb'
| 'opfs'
| 'native'
| 'parallel-memory'
| 'parallel-indexdb';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ export const QueryBenchmarking = () => {
filePaths = table.files.map((file) => file.fileName);
}

await dbm.query(generateViewQuery(table.tableName, filePaths));
const useTable = fileManagerType === 'opfs';
await dbm.query(
generateViewQuery(table.tableName, filePaths, useTable)
);
}
},
[fileManagerType, dbm]
Expand Down
10 changes: 8 additions & 2 deletions benchmarking/src/app/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
export const generateViewQuery = (tableName: string, files: string[]) => {
return `CREATE VIEW IF NOT EXISTS ${tableName} AS SELECT * FROM read_parquet(['${files.join(
export const generateViewQuery = (
tableName: string,
files: string[],
useTable = false
) => {
const targetType = useTable ? 'TABLE' : 'VIEW';

return `CREATE ${targetType} IF NOT EXISTS ${tableName} AS SELECT * FROM read_parquet(['${files.join(
"','"
)}']);`;
};
1 change: 1 addition & 0 deletions meerkat-dbm/src/dbm/dbm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export class DBM extends TableLockManager {
private async _getConnection() {
if (!this.connection) {
const db = await this.instanceManager.getDB();

this.connection = await db.connect();
if (this.onCreateConnection) {
await this.onCreateConnection(this.connection);
Expand Down
1 change: 1 addition & 0 deletions meerkat-dbm/src/file-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export * from './memory/memory-file-manager';
export * from './memory/parallel-memory-file-manager';
export * from './memory/runner-memory-file-manager';
export * from './native/native-file-manager';
export * from './opfs/opfs-file-manager';
130 changes: 130 additions & 0 deletions meerkat-dbm/src/file-manager/opfs/opfs-file-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { TableConfig } from '../../dbm/types';
import { mergeFileStoreIntoTable } from '../../utils';
import {
FileBufferStore,
FileManagerConstructorOptions,
FileManagerType,
} from '../file-manager-type';
import { FileRegisterer } from '../file-registerer';
import { BaseIndexedDBFileManager } from '../indexed-db/base-indexed-db-file-manager';

export class OPFSFileManager
extends BaseIndexedDBFileManager
implements FileManagerType
{
private fileRegisterer: FileRegisterer;
private configurationOptions: FileManagerConstructorOptions['options'];

fetchTableFileBuffers: (tableName: string) => Promise<FileBufferStore[]>;

constructor({
fetchTableFileBuffers,
instanceManager,
options,
logger,
onEvent,
}: FileManagerConstructorOptions) {
super({ instanceManager, fetchTableFileBuffers, logger, onEvent });

this.fetchTableFileBuffers = fetchTableFileBuffers;
this.fileRegisterer = new FileRegisterer({ instanceManager });
this.configurationOptions = options;
}

async bulkRegisterFileBuffer(fileBuffers: FileBufferStore[]): Promise<void> {
const db = await this.instanceManager.getDB();

const tableNames = Array.from(
new Set(fileBuffers.map((fileBuffer) => fileBuffer.tableName))
);

const currentTableData = await this.indexedDB.tablesKey.toArray();

const updatedTableMap = mergeFileStoreIntoTable(
fileBuffers,
currentTableData
);

/**
* Extracts the tables and files data from the tablesMap and fileBuffers
* in format that can be stored in IndexedDB
*/
const updatedTableData = tableNames.map((tableName) => {
return { tableName, files: updatedTableMap.get(tableName)?.files ?? [] };
});

const newFilesData = fileBuffers.map((fileBuffer) => {
return { buffer: fileBuffer.buffer, fileName: fileBuffer.fileName };
});

// Update the tables and files table in IndexedDB
await this.indexedDB
.transaction(
'rw',
this.indexedDB.tablesKey,
this.indexedDB.files,
async () => {
await this.indexedDB.tablesKey.bulkPut(updatedTableData);

await Promise.all(
newFilesData.map((file) =>
this.fileRegisterer.registerFileBuffer(file.fileName, file.buffer)
)
);
}
)
.catch((error) => {
console.error(error);
});
}

async registerFileBuffer(fileBuffer: FileBufferStore): Promise<void> {
const { buffer, fileName, tableName } = fileBuffer;

const currentTableData = await this.indexedDB.tablesKey.toArray();

const updatedTableMap = mergeFileStoreIntoTable(
[fileBuffer],
currentTableData
);

// Update the tables and files table in IndexedDB
await this.indexedDB
.transaction(
'rw',
this.indexedDB.tablesKey,
this.indexedDB.files,
async () => {
await this.indexedDB.tablesKey.put({
tableName: fileBuffer.tableName,
files: updatedTableMap.get(tableName)?.files ?? [],
});

await this.fileRegisterer.registerFileBuffer(fileName, buffer);
}
)
.catch((error) => {
console.error(error);
});
}

async mountFileBufferByTables(tables: TableConfig[]): Promise<void> {
const tableData = await this.getFilesNameForTables(tables);

/**
* Check if the file registered size is not more than the limit
* If it is more than the limit, then remove the files which are not needed while mounting this the tables
*/
}

async dropFilesByTableName(
tableName: string,
fileNames: string[]
): Promise<void> {
const tableData = await this.indexedDB.tablesKey.get(tableName);
}

async onDBShutdownHandler() {
this.fileRegisterer.flushFileCache();
}
}
Loading