From fe1d40023147625aba4c568facd8a70247cdfb8f Mon Sep 17 00:00:00 2001 From: balasaravanan Date: Thu, 8 May 2025 00:52:21 +0530 Subject: [PATCH 1/6] add: opfs file manager --- benchmarking/src/app/app.tsx | 17 +++ benchmarking/src/app/constants.ts | 8 +- .../src/app/dbm-context/opfs-dbm-context.tsx | 60 ++++++++++ .../src/app/file-loader/file-loader.tsx | 16 ++- benchmarking/src/app/hooks/dbm-context.tsx | 1 + benchmarking/src/app/utils.ts | 4 +- meerkat-dbm/src/dbm/dbm.ts | 8 +- meerkat-dbm/src/file-manager/index.ts | 1 + .../file-manager/opfs/opfs-file-manager.ts | 111 ++++++++++++++++++ 9 files changed, 209 insertions(+), 17 deletions(-) create mode 100644 benchmarking/src/app/dbm-context/opfs-dbm-context.tsx create mode 100644 meerkat-dbm/src/file-manager/opfs/opfs-file-manager.ts diff --git a/benchmarking/src/app/app.tsx b/benchmarking/src/app/app.tsx index 35f39d2f..26980421 100644 --- a/benchmarking/src/app/app.tsx +++ b/benchmarking/src/app/app.tsx @@ -2,6 +2,7 @@ import { Link, Route, BrowserRouter as Router, Routes } from 'react-router-dom'; import { IndexedDBMProvider } from './dbm-context/indexed-dbm-context'; import { MemoryDBMProvider } from './dbm-context/memory-dbm-context'; import { NativeDBMProvider } from './dbm-context/native-dbm-context'; +import { OPFSDBMProvider } from './dbm-context/opfs-dbm-context'; import { ParallelIndexedDBMProvider } from './dbm-context/parallel-indexed-dbm-context'; import { ParallelMemoryDBMProvider } from './dbm-context/parallel-memory-dbm-context'; import { RawDBMProvider } from './dbm-context/raw-dbm-context'; @@ -23,6 +24,9 @@ export function App() {
  • IndexedDB DuckDB
  • +
  • + OPFS DuckDB +
  • Parallel Memory DuckDB
  • @@ -75,6 +79,19 @@ export function App() { } /> + +

    OPFS DuckDB

    + + + + + + + } + /> = 1.0005812645 LIMIT 100', - 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi_json GROUP BY order_count', - 'SELECT * FROM taxi_json ORDER BY seconds_in_bucket LIMIT 100', + // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi_json', + // 'SELECT * FROM taxi_json WHERE price >= 1.0005812645 LIMIT 100', + // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi_json GROUP BY order_count', + // 'SELECT * FROM taxi_json ORDER BY seconds_in_bucket LIMIT 100', ]; diff --git a/benchmarking/src/app/dbm-context/opfs-dbm-context.tsx b/benchmarking/src/app/dbm-context/opfs-dbm-context.tsx new file mode 100644 index 00000000..e5bf66da --- /dev/null +++ b/benchmarking/src/app/dbm-context/opfs-dbm-context.tsx @@ -0,0 +1,60 @@ +import { DBM, FileManagerType, OPFSFileManager } from '@devrev/meerkat-dbm'; +import log from 'loglevel'; +import React, { useState } from 'react'; +import { DBMContext } from '../hooks/dbm-context'; +import { useClassicEffect } from '../hooks/use-classic-effect'; +import { InstanceManager } from './instance-manager'; +import { useAsyncDuckDB } from './use-async-duckdb'; + +export const OPFSDBMProvider = ({ children }: { children: JSX.Element }) => { + const fileManagerRef = React.useRef(null); + const [dbm, setdbm] = useState(null); + const instanceManagerRef = React.useRef( + new InstanceManager() + ); + + const dbState = useAsyncDuckDB(); + + useClassicEffect(() => { + if (!dbState) { + return; + } + fileManagerRef.current = new OPFSFileManager({ + instanceManager: instanceManagerRef.current, + fetchTableFileBuffers: async (table) => { + return []; + }, + logger: log, + onEvent: (event) => { + console.info(event); + }, + }); + + log.setLevel('DEBUG'); + const dbm = new DBM({ + instanceManager: instanceManagerRef.current, + fileManager: fileManagerRef.current, + logger: log, + onEvent: (event) => { + console.info(event); + }, + }); + setdbm(dbm); + }, [dbState]); + + if (!dbm || !fileManagerRef.current) { + return
    Loading...
    ; + } + + return ( + + {children} + + ); +}; diff --git a/benchmarking/src/app/file-loader/file-loader.tsx b/benchmarking/src/app/file-loader/file-loader.tsx index 837647b2..c607aff3 100644 --- a/benchmarking/src/app/file-loader/file-loader.tsx +++ b/benchmarking/src/app/file-loader/file-loader.tsx @@ -1,6 +1,6 @@ import axios from 'axios'; import { useState } from 'react'; -import TAXI_JSON_DATA from '../../../public/data-sets/taxi.json'; + import { useDBM } from '../hooks/dbm-context'; import { useClassicEffect } from '../hooks/use-classic-effect'; import { generateViewQuery } from '../utils'; @@ -25,16 +25,14 @@ export const FileLoader = ({ children }: { children: JSX.Element }) => { buffer: fileBufferView, }); - await fileManager.registerJSON({ - json: TAXI_JSON_DATA, - tableName: 'taxi_json', - fileName: 'taxi_json.parquet', - }); - // Create views for raw and memory file manager after registering the files - if (fileManagerType === 'raw' || fileManagerType === 'memory') { + if ( + fileManagerType === 'raw' || + fileManagerType === 'memory' || + fileManagerType === 'opfs' + ) { await dbm.query(generateViewQuery('taxi', ['taxi.parquet'])); - await dbm.query(generateViewQuery('taxi_json', ['taxi_json.parquet'])); + // await dbm.query(generateViewQuery('taxi_json', ['taxi_json.parquet'])); } setIsFileLoader(true); diff --git a/benchmarking/src/app/hooks/dbm-context.tsx b/benchmarking/src/app/hooks/dbm-context.tsx index e6f7a38c..16536bce 100644 --- a/benchmarking/src/app/hooks/dbm-context.tsx +++ b/benchmarking/src/app/hooks/dbm-context.tsx @@ -13,6 +13,7 @@ export type DBMContextType = { | 'raw' | 'memory' | 'indexdb' + | 'opfs' | 'native' | 'parallel-memory' | 'parallel-indexdb'; diff --git a/benchmarking/src/app/utils.ts b/benchmarking/src/app/utils.ts index 0e2521c9..ab9f936f 100644 --- a/benchmarking/src/app/utils.ts +++ b/benchmarking/src/app/utils.ts @@ -1,5 +1,3 @@ export const generateViewQuery = (tableName: string, files: string[]) => { - return `CREATE VIEW IF NOT EXISTS ${tableName} AS SELECT * FROM read_parquet(['${files.join( - "','" - )}']);`; + return `CREATE TABLE ${tableName} AS SELECT * FROM read_parquet('${files[0]}');`; }; diff --git a/meerkat-dbm/src/dbm/dbm.ts b/meerkat-dbm/src/dbm/dbm.ts index 0dd9c81f..f0466424 100644 --- a/meerkat-dbm/src/dbm/dbm.ts +++ b/meerkat-dbm/src/dbm/dbm.ts @@ -1,4 +1,4 @@ -import { AsyncDuckDBConnection } from '@duckdb/duckdb-wasm'; +import { AsyncDuckDBConnection, DuckDBAccessMode } from '@duckdb/duckdb-wasm'; import { Table } from 'apache-arrow/table'; import { v4 as uuidv4 } from 'uuid'; import { FileManagerType } from '../file-manager'; @@ -102,6 +102,12 @@ export class DBM extends TableLockManager { private async _getConnection() { if (!this.connection) { const db = await this.instanceManager.getDB(); + + await db.open({ + path: 'opfs://test.db', + accessMode: DuckDBAccessMode.READ_WRITE, + }); + this.connection = await db.connect(); if (this.onCreateConnection) { await this.onCreateConnection(this.connection); diff --git a/meerkat-dbm/src/file-manager/index.ts b/meerkat-dbm/src/file-manager/index.ts index dac55eef..50794a57 100644 --- a/meerkat-dbm/src/file-manager/index.ts +++ b/meerkat-dbm/src/file-manager/index.ts @@ -6,3 +6,4 @@ export * from './memory/memory-file-manager'; export * from './memory/parallel-memory-file-manager'; export * from './memory/runner-memory-file-manager'; export * from './native/native-file-manager'; +export * from './opfs/opfs-file-manager'; diff --git a/meerkat-dbm/src/file-manager/opfs/opfs-file-manager.ts b/meerkat-dbm/src/file-manager/opfs/opfs-file-manager.ts new file mode 100644 index 00000000..55b6d139 --- /dev/null +++ b/meerkat-dbm/src/file-manager/opfs/opfs-file-manager.ts @@ -0,0 +1,111 @@ +import { InstanceManagerType } from '../../dbm/instance-manager'; +import { TableConfig } from '../../dbm/types'; +import { DBMEvent, DBMLogger } from '../../logger'; +import { Table, TableWiseFiles } from '../../types'; +import { getBufferFromJSON } from '../../utils'; +import { + FileBufferStore, + FileJsonStore, + FileManagerConstructorOptions, + FileManagerType, +} from '../file-manager-type'; + +export class OPFSFileManager implements FileManagerType { + fetchTableFileBuffers: (tableName: string) => Promise; + instanceManager: InstanceManagerType; + + private logger?: DBMLogger; + private onEvent?: (event: DBMEvent) => void; + + constructor({ + fetchTableFileBuffers, + instanceManager, + logger, + onEvent, + }: FileManagerConstructorOptions) { + this.fetchTableFileBuffers = fetchTableFileBuffers; + this.instanceManager = instanceManager; + this.logger = logger; + this.onEvent = onEvent; + } + + async bulkRegisterFileBuffer(props: FileBufferStore[]): Promise { + const promiseArr = props.map((fileBuffer) => + this.registerFileBuffer(fileBuffer) + ); + console.info('bulkRegisterFileBuffer', promiseArr); + const output = await Promise.all(promiseArr); + console.info('bulkRegisterFileBuffer done', output); + console.info('bulkRegisterFileBuffer done', promiseArr); + } + + async registerFileBuffer(props: FileBufferStore): Promise { + console.info('registerFileBuffer', props); + const db = await this.instanceManager.getDB(); + return db.registerFileBuffer(props.fileName, props.buffer); + } + + async bulkRegisterJSON(jsonData: FileJsonStore[]): Promise { + const promiseArr = jsonData.map((fileBuffer) => + this.registerJSON(fileBuffer) + ); + + await Promise.all(promiseArr); + } + + async registerJSON(jsonData: FileJsonStore): Promise { + const { json, tableName, ...fileData } = jsonData; + + /** + * Convert JSON to buffer + */ + const bufferData = await getBufferFromJSON({ + instanceManager: this.instanceManager, + json, + tableName, + logger: this.logger, + onEvent: this.onEvent, + metadata: jsonData.metadata, + }); + + /** + * Register buffer in DB + */ + await this.registerFileBuffer({ + buffer: bufferData, + tableName, + ...fileData, + }); + } + + async mountFileBufferByTables(tables: TableConfig[]): Promise { + // not needed for opfs file manager + } + + async getFilesNameForTables( + tables: TableConfig[] + ): Promise { + // not needed for opfs file manager + return []; + } + + async getTableData(table: TableConfig): Promise { + // not needed for opfs file manager + return; + } + + async setTableMetadata(table: string, metadata: object): Promise { + // not needed for opfs file manager + } + + async dropFilesByTableName( + tableName: string, + fileNames: string[] + ): Promise { + // not needed for opfs file manager + } + + async onDBShutdownHandler() { + // not needed for opfs file manager + } +} From 0c6f0b73745baf34a00b852d7fc0d26d6610b670 Mon Sep 17 00:00:00 2001 From: vpbs2 Date: Wed, 1 Oct 2025 17:12:31 +0530 Subject: [PATCH 2/6] update dbm --- .../src/app/dbm-context/instance-manager.ts | 6 + .../src/app/file-loader/file-loader.tsx | 23 ++- .../query-benchmarking/query-benchmarking.tsx | 3 +- benchmarking/src/app/utils.ts | 5 +- meerkat-dbm/src/dbm/dbm.ts | 7 +- .../src/file-manager/file-registerer.ts | 2 +- .../file-manager/opfs/opfs-file-manager.ts | 149 ++++++++++-------- 7 files changed, 119 insertions(+), 76 deletions(-) diff --git a/benchmarking/src/app/dbm-context/instance-manager.ts b/benchmarking/src/app/dbm-context/instance-manager.ts index 5e810e03..1e960b95 100644 --- a/benchmarking/src/app/dbm-context/instance-manager.ts +++ b/benchmarking/src/app/dbm-context/instance-manager.ts @@ -22,6 +22,12 @@ export class InstanceManager implements InstanceManagerType { }; const db = new duckdb.AsyncDuckDB(logger, worker); await db.instantiate(bundle.mainModule, bundle.pthreadWorker); + + // await db.open({ + // path: 'opfs://meerkat.db', + // accessMode: DuckDBAccessMode.READ_ONLY, + // }); + URL.revokeObjectURL(worker_url); return db; } diff --git a/benchmarking/src/app/file-loader/file-loader.tsx b/benchmarking/src/app/file-loader/file-loader.tsx index c607aff3..a2428fd2 100644 --- a/benchmarking/src/app/file-loader/file-loader.tsx +++ b/benchmarking/src/app/file-loader/file-loader.tsx @@ -6,6 +6,20 @@ import { useClassicEffect } from '../hooks/use-classic-effect'; import { generateViewQuery } from '../utils'; import { TAXI_FILE_URL } from './constants'; +// Write a file directly to OPFS root +async function writeInOPFS(fileName: string, buffer: Uint8Array) { + try { + const root = await navigator.storage.getDirectory(); + const fileHandle = await root.getFileHandle(fileName, { create: true }); + const writable = await fileHandle.createWritable(); + await writable.write(buffer); + await writable.close(); + console.log('Successfully wrote file to OPFS root:', fileName); + } catch (err) { + console.error('Error writing to OPFS root:', err); + } +} + export const FileLoader = ({ children }: { children: JSX.Element }) => { const { fileManager, fileManagerType, dbm } = useDBM(); const [isFileLoader, setIsFileLoader] = useState(false); @@ -18,10 +32,15 @@ export const FileLoader = ({ children }: { children: JSX.Element }) => { const fileBuffer = file.data; const fileBufferView = new Uint8Array(fileBuffer); + const fileName = 'taxi.parquet'; + + // Always write to OPFS at the root + await writeInOPFS(fileName, fileBufferView); + // Register file buffer with the file manager await fileManager.registerFileBuffer({ tableName: 'taxi', - fileName: 'taxi.parquet', + fileName: 'opfs://taxi.parquet', buffer: fileBufferView, }); @@ -31,7 +50,7 @@ export const FileLoader = ({ children }: { children: JSX.Element }) => { fileManagerType === 'memory' || fileManagerType === 'opfs' ) { - await dbm.query(generateViewQuery('taxi', ['taxi.parquet'])); + await dbm.query(generateViewQuery('taxi', ['opfs://taxi.parquet'])); // await dbm.query(generateViewQuery('taxi_json', ['taxi_json.parquet'])); } diff --git a/benchmarking/src/app/query-benchmarking/query-benchmarking.tsx b/benchmarking/src/app/query-benchmarking/query-benchmarking.tsx index b23da20d..40b8276e 100644 --- a/benchmarking/src/app/query-benchmarking/query-benchmarking.tsx +++ b/benchmarking/src/app/query-benchmarking/query-benchmarking.tsx @@ -48,7 +48,8 @@ export const QueryBenchmarking = () => { tables: [{ name: 'taxi' }, { name: 'taxi_json' }], options: { ...(fileManagerType !== 'parallel-indexdb' && - fileManagerType !== 'parallel-memory' && { + fileManagerType !== 'parallel-memory' && + fileManagerType !== 'opfs' && { preQuery: preQuery, }), }, diff --git a/benchmarking/src/app/utils.ts b/benchmarking/src/app/utils.ts index ab9f936f..4d17a4f9 100644 --- a/benchmarking/src/app/utils.ts +++ b/benchmarking/src/app/utils.ts @@ -1,3 +1,6 @@ export const generateViewQuery = (tableName: string, files: string[]) => { - return `CREATE TABLE ${tableName} AS SELECT * FROM read_parquet('${files[0]}');`; + console.log(files, 'files'); + return `CREATE VIEW IF NOT EXISTS ${tableName} AS SELECT * FROM read_parquet(['${files.join( + "','" + )}']);`; }; diff --git a/meerkat-dbm/src/dbm/dbm.ts b/meerkat-dbm/src/dbm/dbm.ts index f0466424..12eb3207 100644 --- a/meerkat-dbm/src/dbm/dbm.ts +++ b/meerkat-dbm/src/dbm/dbm.ts @@ -1,4 +1,4 @@ -import { AsyncDuckDBConnection, DuckDBAccessMode } from '@duckdb/duckdb-wasm'; +import { AsyncDuckDBConnection } from '@duckdb/duckdb-wasm'; import { Table } from 'apache-arrow/table'; import { v4 as uuidv4 } from 'uuid'; import { FileManagerType } from '../file-manager'; @@ -103,11 +103,6 @@ export class DBM extends TableLockManager { if (!this.connection) { const db = await this.instanceManager.getDB(); - await db.open({ - path: 'opfs://test.db', - accessMode: DuckDBAccessMode.READ_WRITE, - }); - this.connection = await db.connect(); if (this.onCreateConnection) { await this.onCreateConnection(this.connection); diff --git a/meerkat-dbm/src/file-manager/file-registerer.ts b/meerkat-dbm/src/file-manager/file-registerer.ts index 99c8a93c..1f16f9bb 100644 --- a/meerkat-dbm/src/file-manager/file-registerer.ts +++ b/meerkat-dbm/src/file-manager/file-registerer.ts @@ -40,7 +40,7 @@ export class FileRegisterer implements FileRegistererType { }); return this.instanceManager .getDB() - .then((db) => db.registerFileBuffer(fileName, buffer)); + .then((db) => db.registerOPFSFileName(fileName)); }; registerEmptyFileBuffer: AsyncDuckDB['registerEmptyFileBuffer'] = async ( diff --git a/meerkat-dbm/src/file-manager/opfs/opfs-file-manager.ts b/meerkat-dbm/src/file-manager/opfs/opfs-file-manager.ts index 55b6d139..d9bd0eb1 100644 --- a/meerkat-dbm/src/file-manager/opfs/opfs-file-manager.ts +++ b/meerkat-dbm/src/file-manager/opfs/opfs-file-manager.ts @@ -1,111 +1,130 @@ -import { InstanceManagerType } from '../../dbm/instance-manager'; import { TableConfig } from '../../dbm/types'; -import { DBMEvent, DBMLogger } from '../../logger'; -import { Table, TableWiseFiles } from '../../types'; -import { getBufferFromJSON } from '../../utils'; +import { mergeFileStoreIntoTable } from '../../utils'; import { FileBufferStore, - FileJsonStore, FileManagerConstructorOptions, FileManagerType, } from '../file-manager-type'; +import { FileRegisterer } from '../file-registerer'; +import { BaseIndexedDBFileManager } from '../indexed-db/base-indexed-db-file-manager'; -export class OPFSFileManager implements FileManagerType { - fetchTableFileBuffers: (tableName: string) => Promise; - instanceManager: InstanceManagerType; +export class OPFSFileManager + extends BaseIndexedDBFileManager + implements FileManagerType +{ + private fileRegisterer: FileRegisterer; + private configurationOptions: FileManagerConstructorOptions['options']; - private logger?: DBMLogger; - private onEvent?: (event: DBMEvent) => void; + fetchTableFileBuffers: (tableName: string) => Promise; constructor({ fetchTableFileBuffers, instanceManager, + options, logger, onEvent, }: FileManagerConstructorOptions) { - this.fetchTableFileBuffers = fetchTableFileBuffers; - this.instanceManager = instanceManager; - this.logger = logger; - this.onEvent = onEvent; - } + super({ instanceManager, fetchTableFileBuffers, logger, onEvent }); - async bulkRegisterFileBuffer(props: FileBufferStore[]): Promise { - const promiseArr = props.map((fileBuffer) => - this.registerFileBuffer(fileBuffer) - ); - console.info('bulkRegisterFileBuffer', promiseArr); - const output = await Promise.all(promiseArr); - console.info('bulkRegisterFileBuffer done', output); - console.info('bulkRegisterFileBuffer done', promiseArr); + this.fetchTableFileBuffers = fetchTableFileBuffers; + this.fileRegisterer = new FileRegisterer({ instanceManager }); + this.configurationOptions = options; } - async registerFileBuffer(props: FileBufferStore): Promise { - console.info('registerFileBuffer', props); + async bulkRegisterFileBuffer(fileBuffers: FileBufferStore[]): Promise { const db = await this.instanceManager.getDB(); - return db.registerFileBuffer(props.fileName, props.buffer); - } - async bulkRegisterJSON(jsonData: FileJsonStore[]): Promise { - const promiseArr = jsonData.map((fileBuffer) => - this.registerJSON(fileBuffer) + const tableNames = Array.from( + new Set(fileBuffers.map((fileBuffer) => fileBuffer.tableName)) ); - await Promise.all(promiseArr); - } + const currentTableData = await this.indexedDB.tablesKey.toArray(); - async registerJSON(jsonData: FileJsonStore): Promise { - const { json, tableName, ...fileData } = jsonData; + const updatedTableMap = mergeFileStoreIntoTable( + fileBuffers, + currentTableData + ); /** - * Convert JSON to buffer + * Extracts the tables and files data from the tablesMap and fileBuffers + * in format that can be stored in IndexedDB */ - const bufferData = await getBufferFromJSON({ - instanceManager: this.instanceManager, - json, - tableName, - logger: this.logger, - onEvent: this.onEvent, - metadata: jsonData.metadata, + const updatedTableData = tableNames.map((tableName) => { + return { tableName, files: updatedTableMap.get(tableName)?.files ?? [] }; }); - /** - * Register buffer in DB - */ - await this.registerFileBuffer({ - buffer: bufferData, - tableName, - ...fileData, + const newFilesData = fileBuffers.map((fileBuffer) => { + return { buffer: fileBuffer.buffer, fileName: fileBuffer.fileName }; }); - } - async mountFileBufferByTables(tables: TableConfig[]): Promise { - // not needed for opfs file manager + // Update the tables and files table in IndexedDB + await this.indexedDB + .transaction( + 'rw', + this.indexedDB.tablesKey, + this.indexedDB.files, + async () => { + await this.indexedDB.tablesKey.bulkPut(updatedTableData); + + await Promise.all( + newFilesData.map((file) => + this.fileRegisterer.registerFileBuffer(file.fileName, file.buffer) + ) + ); + } + ) + .catch((error) => { + console.error(error); + }); } - async getFilesNameForTables( - tables: TableConfig[] - ): Promise { - // not needed for opfs file manager - return []; - } + async registerFileBuffer(fileBuffer: FileBufferStore): Promise { + const { buffer, fileName, tableName } = fileBuffer; + + const currentTableData = await this.indexedDB.tablesKey.toArray(); + + const updatedTableMap = mergeFileStoreIntoTable( + [fileBuffer], + currentTableData + ); - async getTableData(table: TableConfig): Promise
    { - // not needed for opfs file manager - return; + // Update the tables and files table in IndexedDB + await this.indexedDB + .transaction( + 'rw', + this.indexedDB.tablesKey, + this.indexedDB.files, + async () => { + await this.indexedDB.tablesKey.put({ + tableName: fileBuffer.tableName, + files: updatedTableMap.get(tableName)?.files ?? [], + }); + + await this.fileRegisterer.registerFileBuffer(fileName, buffer); + } + ) + .catch((error) => { + console.error(error); + }); } - async setTableMetadata(table: string, metadata: object): Promise { - // not needed for opfs file manager + async mountFileBufferByTables(tables: TableConfig[]): Promise { + const tableData = await this.getFilesNameForTables(tables); + + /** + * Check if the file registered size is not more than the limit + * If it is more than the limit, then remove the files which are not needed while mounting this the tables + */ } async dropFilesByTableName( tableName: string, fileNames: string[] ): Promise { - // not needed for opfs file manager + const tableData = await this.indexedDB.tablesKey.get(tableName); } async onDBShutdownHandler() { - // not needed for opfs file manager + this.fileRegisterer.flushFileCache(); } } From 0a72dc1384040dd7c86bcd49dc186423e06099af Mon Sep 17 00:00:00 2001 From: vpbs2 Date: Fri, 3 Oct 2025 17:14:20 +0530 Subject: [PATCH 3/6] update opfs --- benchmarking/src/app/constants.ts | 68 +++++++++---------- .../src/app/dbm-context/instance-manager.ts | 14 ++-- .../src/app/file-loader/file-loader.tsx | 30 +++----- benchmarking/src/app/utils.ts | 2 +- .../src/file-manager/file-registerer.ts | 2 +- 5 files changed, 53 insertions(+), 63 deletions(-) diff --git a/benchmarking/src/app/constants.ts b/benchmarking/src/app/constants.ts index 0b8fefe1..60f0e3fe 100644 --- a/benchmarking/src/app/constants.ts +++ b/benchmarking/src/app/constants.ts @@ -59,42 +59,42 @@ export const TEST_QUERIES = [ group_by_query.hvfhs_license_num = full_query.hvfhs_license_num LIMIT 1 `, - 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi', - "SELECT * FROM taxi WHERE originating_base_num='B03404' LIMIT 100", - 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi GROUP BY hvfhs_license_num', - 'SELECT * FROM taxi ORDER BY bcf LIMIT 100', - ` - WITH group_by_query AS ( - SELECT - hvfhs_license_num, - COUNT(*) - FROM - taxi - GROUP BY - hvfhs_license_num - ), + // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi', + // "SELECT * FROM taxi WHERE originating_base_num='B03404' LIMIT 100", + // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi GROUP BY hvfhs_license_num', + // 'SELECT * FROM taxi ORDER BY bcf LIMIT 100', + // ` + // WITH group_by_query AS ( + // SELECT + // hvfhs_license_num, + // COUNT(*) + // FROM + // taxi + // GROUP BY + // hvfhs_license_num + // ), - full_query AS ( - SELECT - * - FROM - taxi - ) + // full_query AS ( + // SELECT + // * + // FROM + // taxi + // ) - SELECT - COUNT(*) - FROM - group_by_query - LEFT JOIN - full_query - ON - group_by_query.hvfhs_license_num = full_query.hvfhs_license_num - LIMIT 1 - `, - 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi', - "SELECT * FROM taxi WHERE originating_base_num='B03404' LIMIT 100", - 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi GROUP BY hvfhs_license_num', - 'SELECT * FROM taxi ORDER BY bcf LIMIT 100', + // SELECT + // COUNT(*) + // FROM + // group_by_query + // LEFT JOIN + // full_query + // ON + // group_by_query.hvfhs_license_num = full_query.hvfhs_license_num + // LIMIT 1 + // `, + // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi', + // "SELECT * FROM taxi WHERE originating_base_num='B03404' LIMIT 100", + // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi GROUP BY hvfhs_license_num', + // 'SELECT * FROM taxi ORDER BY bcf LIMIT 100', // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi_json', // 'SELECT * FROM taxi_json WHERE price >= 1.0005812645 LIMIT 100', // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi_json GROUP BY order_count', diff --git a/benchmarking/src/app/dbm-context/instance-manager.ts b/benchmarking/src/app/dbm-context/instance-manager.ts index 1e960b95..60c7ae82 100644 --- a/benchmarking/src/app/dbm-context/instance-manager.ts +++ b/benchmarking/src/app/dbm-context/instance-manager.ts @@ -1,6 +1,10 @@ import { InstanceManagerType } from '@devrev/meerkat-dbm'; import * as duckdb from '@duckdb/duckdb-wasm'; -import { AsyncDuckDB, LogEntryVariant } from '@duckdb/duckdb-wasm'; +import { + AsyncDuckDB, + DuckDBAccessMode, + LogEntryVariant, +} from '@duckdb/duckdb-wasm'; const JSDELIVR_BUNDLES = duckdb.getJsDelivrBundles(); export class InstanceManager implements InstanceManagerType { @@ -23,10 +27,10 @@ export class InstanceManager implements InstanceManagerType { const db = new duckdb.AsyncDuckDB(logger, worker); await db.instantiate(bundle.mainModule, bundle.pthreadWorker); - // await db.open({ - // path: 'opfs://meerkat.db', - // accessMode: DuckDBAccessMode.READ_ONLY, - // }); + await db.open({ + path: 'opfs://meerkat.db', + accessMode: DuckDBAccessMode.READ_WRITE, + }); URL.revokeObjectURL(worker_url); return db; diff --git a/benchmarking/src/app/file-loader/file-loader.tsx b/benchmarking/src/app/file-loader/file-loader.tsx index a2428fd2..7ab6eec0 100644 --- a/benchmarking/src/app/file-loader/file-loader.tsx +++ b/benchmarking/src/app/file-loader/file-loader.tsx @@ -1,25 +1,10 @@ import axios from 'axios'; import { useState } from 'react'; - import { useDBM } from '../hooks/dbm-context'; import { useClassicEffect } from '../hooks/use-classic-effect'; import { generateViewQuery } from '../utils'; import { TAXI_FILE_URL } from './constants'; -// Write a file directly to OPFS root -async function writeInOPFS(fileName: string, buffer: Uint8Array) { - try { - const root = await navigator.storage.getDirectory(); - const fileHandle = await root.getFileHandle(fileName, { create: true }); - const writable = await fileHandle.createWritable(); - await writable.write(buffer); - await writable.close(); - console.log('Successfully wrote file to OPFS root:', fileName); - } catch (err) { - console.error('Error writing to OPFS root:', err); - } -} - export const FileLoader = ({ children }: { children: JSX.Element }) => { const { fileManager, fileManagerType, dbm } = useDBM(); const [isFileLoader, setIsFileLoader] = useState(false); @@ -32,25 +17,26 @@ export const FileLoader = ({ children }: { children: JSX.Element }) => { const fileBuffer = file.data; const fileBufferView = new Uint8Array(fileBuffer); - const fileName = 'taxi.parquet'; - // Always write to OPFS at the root - await writeInOPFS(fileName, fileBufferView); - - // Register file buffer with the file manager await fileManager.registerFileBuffer({ tableName: 'taxi', - fileName: 'opfs://taxi.parquet', + fileName: 'taxi.parquet', buffer: fileBufferView, }); + // await fileManager.registerJSON({ + // json: TAXI_JSON_DATA, + // tableName: 'taxi_json', + // fileName: 'taxi_json.parquet', + // }); + // Create views for raw and memory file manager after registering the files if ( fileManagerType === 'raw' || fileManagerType === 'memory' || fileManagerType === 'opfs' ) { - await dbm.query(generateViewQuery('taxi', ['opfs://taxi.parquet'])); + await dbm.query(generateViewQuery('taxi', ['taxi.parquet'])); // await dbm.query(generateViewQuery('taxi_json', ['taxi_json.parquet'])); } diff --git a/benchmarking/src/app/utils.ts b/benchmarking/src/app/utils.ts index 4d17a4f9..7a374713 100644 --- a/benchmarking/src/app/utils.ts +++ b/benchmarking/src/app/utils.ts @@ -1,6 +1,6 @@ export const generateViewQuery = (tableName: string, files: string[]) => { console.log(files, 'files'); - return `CREATE VIEW IF NOT EXISTS ${tableName} AS SELECT * FROM read_parquet(['${files.join( + return `CREATE TABLE IF NOT EXISTS ${tableName} AS SELECT * FROM read_parquet(['${files.join( "','" )}']);`; }; diff --git a/meerkat-dbm/src/file-manager/file-registerer.ts b/meerkat-dbm/src/file-manager/file-registerer.ts index 1f16f9bb..99c8a93c 100644 --- a/meerkat-dbm/src/file-manager/file-registerer.ts +++ b/meerkat-dbm/src/file-manager/file-registerer.ts @@ -40,7 +40,7 @@ export class FileRegisterer implements FileRegistererType { }); return this.instanceManager .getDB() - .then((db) => db.registerOPFSFileName(fileName)); + .then((db) => db.registerFileBuffer(fileName, buffer)); }; registerEmptyFileBuffer: AsyncDuckDB['registerEmptyFileBuffer'] = async ( From 1dc88818fe8d6df619aa585122fbb698180af1ef Mon Sep 17 00:00:00 2001 From: vpbs2 Date: Fri, 3 Oct 2025 18:13:12 +0530 Subject: [PATCH 4/6] updte --- .../src/app/dbm-context/instance-manager.ts | 21 +++++++++++++++---- .../src/app/dbm-context/opfs-dbm-context.tsx | 3 ++- benchmarking/src/app/utils.ts | 2 +- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/benchmarking/src/app/dbm-context/instance-manager.ts b/benchmarking/src/app/dbm-context/instance-manager.ts index 60c7ae82..b2835db5 100644 --- a/benchmarking/src/app/dbm-context/instance-manager.ts +++ b/benchmarking/src/app/dbm-context/instance-manager.ts @@ -7,9 +7,19 @@ import { } from '@duckdb/duckdb-wasm'; const JSDELIVR_BUNDLES = duckdb.getJsDelivrBundles(); +export interface InstanceManagerOptions { + path?: string; +} + export class InstanceManager implements InstanceManagerType { private db: AsyncDuckDB | null = null; + private path?: string; + + constructor(options: InstanceManagerOptions = {}) { + this.path = options.path; + } + private async initDB() { const bundle = await duckdb.selectBundle(JSDELIVR_BUNDLES); @@ -27,10 +37,13 @@ export class InstanceManager implements InstanceManagerType { const db = new duckdb.AsyncDuckDB(logger, worker); await db.instantiate(bundle.mainModule, bundle.pthreadWorker); - await db.open({ - path: 'opfs://meerkat.db', - accessMode: DuckDBAccessMode.READ_WRITE, - }); + // Open with configured path if provided + if (this.path) { + await db.open({ + path: this.path, + accessMode: DuckDBAccessMode.READ_WRITE, + }); + } URL.revokeObjectURL(worker_url); return db; diff --git a/benchmarking/src/app/dbm-context/opfs-dbm-context.tsx b/benchmarking/src/app/dbm-context/opfs-dbm-context.tsx index e5bf66da..cc06fcc4 100644 --- a/benchmarking/src/app/dbm-context/opfs-dbm-context.tsx +++ b/benchmarking/src/app/dbm-context/opfs-dbm-context.tsx @@ -9,8 +9,9 @@ import { useAsyncDuckDB } from './use-async-duckdb'; export const OPFSDBMProvider = ({ children }: { children: JSX.Element }) => { const fileManagerRef = React.useRef(null); const [dbm, setdbm] = useState(null); + const instanceManagerRef = React.useRef( - new InstanceManager() + new InstanceManager({ path: 'opfs://meerkat.db' }) ); const dbState = useAsyncDuckDB(); diff --git a/benchmarking/src/app/utils.ts b/benchmarking/src/app/utils.ts index 7a374713..4d17a4f9 100644 --- a/benchmarking/src/app/utils.ts +++ b/benchmarking/src/app/utils.ts @@ -1,6 +1,6 @@ export const generateViewQuery = (tableName: string, files: string[]) => { console.log(files, 'files'); - return `CREATE TABLE IF NOT EXISTS ${tableName} AS SELECT * FROM read_parquet(['${files.join( + return `CREATE VIEW IF NOT EXISTS ${tableName} AS SELECT * FROM read_parquet(['${files.join( "','" )}']);`; }; From 9c9c0de33e62c17a449679c10aebbc5e6c5cdcc0 Mon Sep 17 00:00:00 2001 From: vpbs2 Date: Fri, 3 Oct 2025 18:24:57 +0530 Subject: [PATCH 5/6] update --- benchmarking/src/app/constants.ts | 68 +++++++++---------- .../src/app/file-loader/file-loader.tsx | 6 +- .../query-benchmarking/query-benchmarking.tsx | 5 +- benchmarking/src/app/utils.ts | 11 ++- 4 files changed, 50 insertions(+), 40 deletions(-) diff --git a/benchmarking/src/app/constants.ts b/benchmarking/src/app/constants.ts index 60f0e3fe..0b8fefe1 100644 --- a/benchmarking/src/app/constants.ts +++ b/benchmarking/src/app/constants.ts @@ -59,42 +59,42 @@ export const TEST_QUERIES = [ group_by_query.hvfhs_license_num = full_query.hvfhs_license_num LIMIT 1 `, - // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi', - // "SELECT * FROM taxi WHERE originating_base_num='B03404' LIMIT 100", - // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi GROUP BY hvfhs_license_num', - // 'SELECT * FROM taxi ORDER BY bcf LIMIT 100', - // ` - // WITH group_by_query AS ( - // SELECT - // hvfhs_license_num, - // COUNT(*) - // FROM - // taxi - // GROUP BY - // hvfhs_license_num - // ), + 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi', + "SELECT * FROM taxi WHERE originating_base_num='B03404' LIMIT 100", + 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi GROUP BY hvfhs_license_num', + 'SELECT * FROM taxi ORDER BY bcf LIMIT 100', + ` + WITH group_by_query AS ( + SELECT + hvfhs_license_num, + COUNT(*) + FROM + taxi + GROUP BY + hvfhs_license_num + ), - // full_query AS ( - // SELECT - // * - // FROM - // taxi - // ) + full_query AS ( + SELECT + * + FROM + taxi + ) - // SELECT - // COUNT(*) - // FROM - // group_by_query - // LEFT JOIN - // full_query - // ON - // group_by_query.hvfhs_license_num = full_query.hvfhs_license_num - // LIMIT 1 - // `, - // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi', - // "SELECT * FROM taxi WHERE originating_base_num='B03404' LIMIT 100", - // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi GROUP BY hvfhs_license_num', - // 'SELECT * FROM taxi ORDER BY bcf LIMIT 100', + SELECT + COUNT(*) + FROM + group_by_query + LEFT JOIN + full_query + ON + group_by_query.hvfhs_license_num = full_query.hvfhs_license_num + LIMIT 1 + `, + 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi', + "SELECT * FROM taxi WHERE originating_base_num='B03404' LIMIT 100", + 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi GROUP BY hvfhs_license_num', + 'SELECT * FROM taxi ORDER BY bcf LIMIT 100', // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi_json', // 'SELECT * FROM taxi_json WHERE price >= 1.0005812645 LIMIT 100', // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi_json GROUP BY order_count', diff --git a/benchmarking/src/app/file-loader/file-loader.tsx b/benchmarking/src/app/file-loader/file-loader.tsx index 7ab6eec0..d6f0da5c 100644 --- a/benchmarking/src/app/file-loader/file-loader.tsx +++ b/benchmarking/src/app/file-loader/file-loader.tsx @@ -36,8 +36,10 @@ export const FileLoader = ({ children }: { children: JSX.Element }) => { fileManagerType === 'memory' || fileManagerType === 'opfs' ) { - await dbm.query(generateViewQuery('taxi', ['taxi.parquet'])); - // await dbm.query(generateViewQuery('taxi_json', ['taxi_json.parquet'])); + const useTable = fileManagerType === 'opfs'; + + await dbm.query(generateViewQuery('taxi', ['taxi.parquet'], useTable)); + // await dbm.query(generateViewQuery('taxi_json', ['taxi_json.parquet'], useTable)); } setIsFileLoader(true); diff --git a/benchmarking/src/app/query-benchmarking/query-benchmarking.tsx b/benchmarking/src/app/query-benchmarking/query-benchmarking.tsx index 40b8276e..2ef34e6b 100644 --- a/benchmarking/src/app/query-benchmarking/query-benchmarking.tsx +++ b/benchmarking/src/app/query-benchmarking/query-benchmarking.tsx @@ -27,7 +27,10 @@ export const QueryBenchmarking = () => { filePaths = table.files.map((file) => file.fileName); } - await dbm.query(generateViewQuery(table.tableName, filePaths)); + const useTable = fileManagerType === 'opfs'; + await dbm.query( + generateViewQuery(table.tableName, filePaths, useTable) + ); } }, [fileManagerType, dbm] diff --git a/benchmarking/src/app/utils.ts b/benchmarking/src/app/utils.ts index 4d17a4f9..1223a04a 100644 --- a/benchmarking/src/app/utils.ts +++ b/benchmarking/src/app/utils.ts @@ -1,6 +1,11 @@ -export const generateViewQuery = (tableName: string, files: string[]) => { - console.log(files, 'files'); - return `CREATE VIEW IF NOT EXISTS ${tableName} AS SELECT * FROM read_parquet(['${files.join( +export const generateViewQuery = ( + tableName: string, + files: string[], + useTable = false +) => { + const targetType = useTable ? 'TABLE' : 'VIEW'; + + return `CREATE ${targetType} IF NOT EXISTS ${tableName} AS SELECT * FROM read_parquet(['${files.join( "','" )}']);`; }; From ffbb06f1dbb662909cb4cb873019e3851041c730 Mon Sep 17 00:00:00 2001 From: vpbs2 Date: Sun, 5 Oct 2025 18:37:24 +0530 Subject: [PATCH 6/6] update --- benchmarking/src/app/constants.ts | 8 ++++---- benchmarking/src/app/file-loader/file-loader.tsx | 15 +++++++++------ .../app/query-benchmarking/query-benchmarking.tsx | 3 +-- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/benchmarking/src/app/constants.ts b/benchmarking/src/app/constants.ts index 0b8fefe1..d32603d0 100644 --- a/benchmarking/src/app/constants.ts +++ b/benchmarking/src/app/constants.ts @@ -95,8 +95,8 @@ export const TEST_QUERIES = [ "SELECT * FROM taxi WHERE originating_base_num='B03404' LIMIT 100", 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi GROUP BY hvfhs_license_num', 'SELECT * FROM taxi ORDER BY bcf LIMIT 100', - // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi_json', - // 'SELECT * FROM taxi_json WHERE price >= 1.0005812645 LIMIT 100', - // 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi_json GROUP BY order_count', - // 'SELECT * FROM taxi_json ORDER BY seconds_in_bucket LIMIT 100', + 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi_json', + 'SELECT * FROM taxi_json WHERE price >= 1.0005812645 LIMIT 100', + 'SELECT CAST(COUNT(*) as VARCHAR) as total_count FROM taxi_json GROUP BY order_count', + 'SELECT * FROM taxi_json ORDER BY seconds_in_bucket LIMIT 100', ]; diff --git a/benchmarking/src/app/file-loader/file-loader.tsx b/benchmarking/src/app/file-loader/file-loader.tsx index d6f0da5c..8fa5146b 100644 --- a/benchmarking/src/app/file-loader/file-loader.tsx +++ b/benchmarking/src/app/file-loader/file-loader.tsx @@ -1,5 +1,6 @@ import axios from 'axios'; import { useState } from 'react'; +import TAXI_JSON_DATA from '../../../public/data-sets/taxi.json'; import { useDBM } from '../hooks/dbm-context'; import { useClassicEffect } from '../hooks/use-classic-effect'; import { generateViewQuery } from '../utils'; @@ -24,11 +25,11 @@ export const FileLoader = ({ children }: { children: JSX.Element }) => { buffer: fileBufferView, }); - // await fileManager.registerJSON({ - // json: TAXI_JSON_DATA, - // tableName: 'taxi_json', - // fileName: 'taxi_json.parquet', - // }); + await fileManager.registerJSON({ + json: TAXI_JSON_DATA, + tableName: 'taxi_json', + fileName: 'taxi_json.parquet', + }); // Create views for raw and memory file manager after registering the files if ( @@ -39,7 +40,9 @@ export const FileLoader = ({ children }: { children: JSX.Element }) => { const useTable = fileManagerType === 'opfs'; await dbm.query(generateViewQuery('taxi', ['taxi.parquet'], useTable)); - // await dbm.query(generateViewQuery('taxi_json', ['taxi_json.parquet'], useTable)); + await dbm.query( + generateViewQuery('taxi_json', ['taxi_json.parquet'], useTable) + ); } setIsFileLoader(true); diff --git a/benchmarking/src/app/query-benchmarking/query-benchmarking.tsx b/benchmarking/src/app/query-benchmarking/query-benchmarking.tsx index 2ef34e6b..b1c8a1c7 100644 --- a/benchmarking/src/app/query-benchmarking/query-benchmarking.tsx +++ b/benchmarking/src/app/query-benchmarking/query-benchmarking.tsx @@ -51,8 +51,7 @@ export const QueryBenchmarking = () => { tables: [{ name: 'taxi' }, { name: 'taxi_json' }], options: { ...(fileManagerType !== 'parallel-indexdb' && - fileManagerType !== 'parallel-memory' && - fileManagerType !== 'opfs' && { + fileManagerType !== 'parallel-memory' && { preQuery: preQuery, }), },