diff --git a/dev/index.html b/dev/index.html index f9ba1878..f14505b7 100644 --- a/dev/index.html +++ b/dev/index.html @@ -160,7 +160,7 @@ } function setIndex() { - vg.coordinator().dataCubeIndexer.enabled(indexToggle.checked); + vg.coordinator().dataCubeIndexer.enabled = indexToggle.checked; } function reload() { diff --git a/docs/api/core/coordinator.md b/docs/api/core/coordinator.md index 5685d4b7..79857944 100644 --- a/docs/api/core/coordinator.md +++ b/docs/api/core/coordinator.md @@ -20,7 +20,7 @@ Create a new Mosaic Coordinator to manage all database communication for clients * _logger_: The logger to use, defaults to `console`. * _cache_: Boolean flag to enable/disable query caching (default `true`). * _consolidate_ Boolean flag to enable/disable query consolidation (default `true`). -* _indexes_: Data cube indexer options object. The _enabled_ flag (default `true`) determines if data cube indexes should be used when possible. The _temp_ flag (default `true`) controls if temporary tables should be created for data cube indexes. +* _indexes_: Data cube indexer options object. The _enabled_ flag (default `true`) determines if data cube indexes should be used when possible. The _schema_ option (default `'mosaic'`) indicates the database schema in which data cube index tables should be created. ## databaseConnector diff --git a/packages/core/src/Coordinator.js b/packages/core/src/Coordinator.js index 88406a7e..9a050b31 100644 --- a/packages/core/src/Coordinator.js +++ b/packages/core/src/Coordinator.js @@ -1,7 +1,9 @@ import { socketConnector } from './connectors/socket.js'; import { DataCubeIndexer } from './DataCubeIndexer.js'; +import { MosaicClient } from './MosaicClient.js'; import { QueryManager, Priority } from './QueryManager.js'; import { queryFieldInfo } from './util/field-info.js'; +import { QueryResult } from './util/query-result.js'; import { voidLogger } from './util/void-logger.js'; /** @@ -24,6 +26,10 @@ export function coordinator(instance) { return _instance; } +/** + * @typedef {import('@uwdata/mosaic-sql').Query | string} QueryType + */ + /** * A Mosaic Coordinator manages all database communication for clients and * handles selection updates. The Coordinator also performs optimizations @@ -34,7 +40,8 @@ export function coordinator(instance) { * @param {*} [options.manager] The query manager to use. * @param {boolean} [options.cache=true] Boolean flag to enable/disable query caching. * @param {boolean} [options.consolidate=true] Boolean flag to enable/disable query consolidation. - * @param {object} [options.indexes] Data cube indexer options. + * @param {import('./DataCubeIndexer.js').DataCubeIndexerOptions} [options.indexes] + * Data cube indexer options. */ export class Coordinator { constructor(db = socketConnector(), { @@ -48,10 +55,10 @@ export class Coordinator { this.manager = manager; this.manager.cache(cache); this.manager.consolidate(consolidate); - this.dataCubeIndexer = new DataCubeIndexer(this, indexes); - this.logger(logger); this.databaseConnector(db); + this.logger(logger); this.clear(); + this.dataCubeIndexer = new DataCubeIndexer(this, indexes); } /** @@ -98,7 +105,7 @@ export class Coordinator { /** * Cancel previosuly submitted query requests. These queries will be * canceled if they are queued but have not yet been submitted. - * @param {import('./util/query-result.js').QueryResult[]} requests An array + * @param {QueryResult[]} requests An array * of query result objects, such as those returned by the `query` method. */ cancel(requests) { @@ -107,29 +114,30 @@ export class Coordinator { /** * Issue a query for which no result (return value) is needed. - * @param {import('@uwdata/mosaic-sql').Query | string} query The query. + * @param {QueryType | QueryType[]} query The query or an array of queries. + * Each query should be either a Query builder object or a SQL string. * @param {object} [options] An options object. * @param {number} [options.priority] The query priority, defaults to * `Priority.Normal`. - * @returns {import('./util/query-result.js').QueryResult} A query result + * @returns {QueryResult} A query result * promise. */ exec(query, { priority = Priority.Normal } = {}) { - query = Array.isArray(query) ? query.join(';\n') : query; + query = Array.isArray(query) ? query.filter(x => x).join(';\n') : query; return this.manager.request({ type: 'exec', query }, priority); } /** * Issue a query to the backing database. The submitted query may be * consolidate with other queries and its results may be cached. - * @param {import('@uwdata/mosaic-sql').Query | string} query The query. + * @param {QueryType} query The query as either a Query builder object + * or a SQL string. * @param {object} [options] An options object. * @param {'arrow' | 'json'} [options.type] The query result format type. * @param {boolean} [options.cache=true] If true, cache the query result. * @param {number} [options.priority] The query priority, defaults to * `Priority.Normal`. - * @returns {import('./util/query-result.js').QueryResult} A query result - * promise. + * @returns {QueryResult} A query result promise. */ query(query, { type = 'arrow', @@ -143,11 +151,11 @@ export class Coordinator { /** * Issue a query to prefetch data for later use. The query result is cached * for efficient future access. - * @param {import('@uwdata/mosaic-sql').Query | string} query The query. + * @param {QueryType} query The query as either a Query builder object + * or a SQL string. * @param {object} [options] An options object. * @param {'arrow' | 'json'} [options.type] The query result format type. - * @returns {import('./util/query-result.js').QueryResult} A query result - * promise. + * @returns {QueryResult} A query result promise. */ prefetch(query, options = {}) { return this.query(query, { ...options, cache: true, priority: Priority.Low }); @@ -159,7 +167,7 @@ export class Coordinator { * @param {string} name The name of the bundle. * @param {[string | {sql: string}, {alias: string}]} queries The queries to save into the bundle. * @param {number} priority Request priority. - * @returns + * @returns {QueryResult} A query result promise. */ createBundle(name, queries, priority = Priority.Low) { const options = { name, queries: queries.map(q => typeof q == 'string' ? {sql: q} : q) }; @@ -170,7 +178,7 @@ export class Coordinator { * Load a bundle into the cache. * @param {string} name The name of the bundle. * @param {number} priority Request priority. - * @returns + * @returns {QueryResult} A query result promise. */ loadBundle(name, priority = Priority.High) { const options = { name }; @@ -182,8 +190,8 @@ export class Coordinator { /** * Update client data by submitting the given query and returning the * data (or error) to the client. - * @param {import('./MosaicClient.js').MosaicClient} client A Mosaic client. - * @param {import('@uwdata/mosaic-sql').Query | string} query The data query. + * @param {MosaicClient} client A Mosaic client. + * @param {QueryType} query The data query. * @param {number} [priority] The query priority. * @returns {Promise} A Promise that resolves upon completion of the update. */ @@ -201,10 +209,8 @@ export class Coordinator { * Issue a query request for a client. If the query is null or undefined, * the client is simply updated. Otherwise `updateClient` is called. As a * side effect, this method clears the current data cube indexer state. - * @param {import('./MosaicClient.js').MosaicClient} client The client - * to update. - * @param {import('@uwdata/mosaic-sql').Query | string | null} [query] - * The query to issue. + * @param {MosaicClient} client The client to update. + * @param {QueryType | null} [query] The query to issue. */ requestQuery(client, query) { this.dataCubeIndexer.clear(); @@ -215,8 +221,7 @@ export class Coordinator { /** * Connect a client to the coordinator. - * @param {import('./MosaicClient.js').MosaicClient} client The Mosaic - * client to connect. + * @param {MosaicClient} client The Mosaic client to connect. */ async connect(client) { const { clients } = this; @@ -247,8 +252,7 @@ export class Coordinator { /** * Disconnect a client from the coordinator. - * @param {import('./MosaicClient.js').MosaicClient} client The Mosaic - * client to disconnect. + * @param {MosaicClient} client The Mosaic client to disconnect. */ disconnect(client) { const { clients, filterGroups } = this; @@ -267,8 +271,8 @@ export class Coordinator { * Connect a selection-client pair to the coordinator to process updates. * @param {Coordinator} mc The Mosaic coordinator. * @param {import('./Selection.js').Selection} selection A selection. - * @param {import('./MosaicClient.js').MosaicClient} client A Mosiac - * client that is filtered by the given selection. + * @param {MosaicClient} client A Mosiac client that is filtered by the + * given selection. */ function connectSelection(mc, selection, client) { if (!selection) return; diff --git a/packages/core/src/DataCubeIndexer.js b/packages/core/src/DataCubeIndexer.js index 46fe34dd..c83086ba 100644 --- a/packages/core/src/DataCubeIndexer.js +++ b/packages/core/src/DataCubeIndexer.js @@ -6,55 +6,112 @@ import { fnv_hash } from './util/hash.js'; const Skip = { skip: true, result: null }; +/** + * @typedef {object} DataCubeIndexerOptions + * @property {string} [schema] Database schema (namespace) in which to write + * data cube index tables (default 'mosaic'). + * @property {boolean} [options.enabled=true] Flag to enable or disable the + * indexer. This setting can later be updated via the `enabled` method. + */ + /** * Build and query optimized indices ("data cubes") for fast computation of * groupby aggregate queries over compatible client queries and selections. * A data cube contains pre-aggregated data for a Mosaic client, subdivided * by possible query values from an active selection clause. These cubes are * realized as as database tables that can be queried for rapid updates. + * * Compatible client queries must consist of only groupby dimensions and * supported aggregate functions. Compatible selections must contain an active * clause that exposes metadata for an interval or point value predicate. + * + * Data cube index tables are written to a dedicated schema (namespace) that + * can be set using the *schema* constructor option. This schema acts as a + * persistent cache, and index tables may be used across sessions. The + * `dropIndexTables` method issues a query to remove *all* tables within + * this schema. This may be needed if the original tables have updated data, + * but should be used with care. */ export class DataCubeIndexer { /** * Create a new data cube index table manager. * @param {import('./Coordinator.js').Coordinator} coordinator A Mosaic coordinator. - * @param {object} [options] Indexer options. - * @param {boolean} [options.enabled=true] Flag to enable/disable indexer. - * @param {boolean} [options.temp=true] Flag to indicate if generated data - * cube index tables should be temporary tables. + * @param {DataCubeIndexerOptions} [options] Data cube indexer options. */ constructor(coordinator, { - enabled = true, - temp = true + schema = 'mosaic', + enabled = true } = {}) { /** @type {Map} */ this.indexes = new Map(); this.active = null; - this.temp = temp; this.mc = coordinator; + this._schema = schema; this._enabled = enabled; } /** - * Set the enabled state of this indexer. If false, any cached state is + * Set the enabled state of this indexer. If false, any local state is * cleared and subsequent index calls will return null until re-enabled. - * @param {boolean} state The enabled state. + * This method has no effect on any index tables already in the database. + * @param {boolean} [state] The enabled state to set. */ - enabled(state) { - if (state === undefined) { - return this._enabled; - } else if (this._enabled !== state) { + set enabled(state) { + if (this._enabled !== state) { if (!state) this.clear(); this._enabled = state; } } + /** + * Get the enabled state of this indexer. + * @returns {boolean} The current enabled state. + */ + get enabled() { + return this._enabled; + } + + /** + * Set the database schema used by this indexer. Upon changes, any local + * state is cleared. This method does _not_ drop any existing data cube + * tables, use `dropIndexTables` before changing the schema to also remove + * existing index tables in the database. + * @param {string} [schema] The schema name to set. + */ + set schema(schema) { + if (this._schema !== schema) { + this.clear(); + this._schema = schema; + } + } + + /** + * Get the database schema used by this indexer. + * @returns {string} The current schema name. + */ + get schema() { + return this._schema; + } + + /** + * Issues a query through the coordinator to drop the current index table + * schema. *All* tables in the schema will be removed and local state is + * cleared. Call this method if the underlying base tables have been updated, + * causing derived index tables to become stale and inaccurate. Use this + * method with care! Once dropped, the schema will be repopulated by future + * data cube indexer requests. + * @returns A query result promise. + */ + dropIndexTables() { + this.clear(); + return this.mc.exec(`DROP SCHEMA IF EXISTS "${this.schema}" CASCADE`); + } + /** * Clear the cache of data cube index table entries for the current active * selection clause. This method does _not_ drop any existing data cube - * tables. + * tables. Use `dropIndexTables` to remove existing index tables from the + * database. */ clear() { this.indexes.clear(); @@ -77,9 +134,9 @@ export class DataCubeIndexer { */ index(client, selection, activeClause) { // if not enabled, do nothing - if (!this._enabled) return null; + if (!this.enabled) return null; - const { indexes, mc, temp } = this; + const { indexes, mc, schema } = this; const { source } = activeClause; // if there is no clause source to track, do nothing @@ -125,8 +182,11 @@ export class DataCubeIndexer { } else { // generate data cube index table const filter = selection.remove(source).predicate(client); - info = dataCubeInfo(client.query(filter), active, indexCols); - info.result = mc.exec(create(info.table, info.create, { temp })); + info = dataCubeInfo(client.query(filter), active, indexCols, schema); + info.result = mc.exec([ + `CREATE SCHEMA IF NOT EXISTS ${schema}`, + create(info.table, info.create, { temp: false }) + ]); info.result.catch(e => mc.logger().error(e)); } @@ -223,7 +283,7 @@ function binInterval(scale, pixelSize, bin) { * @param {*} indexCols Data cube index column definitions. * @returns {DataCubeInfo} */ -function dataCubeInfo(clientQuery, active, indexCols) { +function dataCubeInfo(clientQuery, active, indexCols, schema) { const { dims, aggr, aux } = indexCols; const { columns } = active; @@ -246,7 +306,7 @@ function dataCubeInfo(clientQuery, active, indexCols) { // generate creation query string and hash id const create = query.toString(); const id = (fnv_hash(create) >>> 0).toString(16); - const table = `cube_index_${id}`; + const table = `${schema}.cube_${id}`; // generate data cube select query const select = Query @@ -255,7 +315,7 @@ function dataCubeInfo(clientQuery, active, indexCols) { .groupby(dims) .orderby(order); - return new DataCubeInfo({ table, create, active, select }); + return new DataCubeInfo({ id, table, create, active, select }); } /** diff --git a/packages/widget/mosaic_widget/__init__.py b/packages/widget/mosaic_widget/__init__.py index 0100cc40..104dc682 100644 --- a/packages/widget/mosaic_widget/__init__.py +++ b/packages/widget/mosaic_widget/__init__.py @@ -37,14 +37,13 @@ class MosaicWidget(anywidget.AnyWidget): # The current params indexed by name params = traitlets.Dict({}).tag(sync=True) - # Whether data cube indexes should be created as temp tables - temp_indexes = traitlets.Bool().tag(sync=True) + # Where data cube indexes should be created + data_cube_schema = traitlets.Unicode().tag(sync=True) def __init__( self, spec: dict | None = None, con=None, - temp_indexes=True, data=None, *args, **kwargs, @@ -55,8 +54,6 @@ def __init__( spec (dict, optional): The initial Mosaic specification. Defaults to {}. con (connection, optional): A DuckDB connection. Defaults to duckdb.connect(). - temp_indexes (bool, optional): Whether data cube indexes should be - created as temp tables tables. Defaults to True. data (dict, optional): Pandas DataFrames to add to DuckDB. The keys are used as the names of the tables. Defaults to {}. """ @@ -70,7 +67,6 @@ def __init__( super().__init__(*args, **kwargs) self.spec = spec self.con = con - self.temp_indexes = temp_indexes for name, df in data.items(): self.con.register(name, df) self.on_msg(self._handle_custom_msg) diff --git a/packages/widget/src/index.js b/packages/widget/src/index.js index e9a1acc4..0a7b0ba7 100644 --- a/packages/widget/src/index.js +++ b/packages/widget/src/index.js @@ -7,24 +7,24 @@ import { v4 as uuidv4 } from 'uuid'; * @typedef {Record} Params * * @typedef Model - * @prop {import('@uwdata/mosaic-spec').Spec} spec the current specification - * @prop {boolean} temp_indexes whether data cube indexes should be created as temp tables - * @prop {Params} params the current params + * @property {import('@uwdata/mosaic-spec').Spec} spec + * The current Mosaic specification. + * @property {string} data_cube_schema The database schema in which to store + * data cube index tables (default 'mosaic'). + * @property {Params} params The current params. */ export default { /** @type {import('anywidget/types').Initialize} */ - // eslint-disable-next-line no-unused-vars - initialize(view) {}, + initialize(view) { + view.model.set('data_cube_schema', coordinator().dataCubeIndexer.schema); + }, /** @type {import('anywidget/types').Render} */ render(view) { view.el.classList.add('mosaic-widget'); - const getSpec = () => view.model.get('spec'); - - const getTempIndexes = () => view.model.get('temp_indexes'); - + const getDataCubeSchema = () => view.model.get('data_cube_schema'); const logger = coordinator().logger(); /** @type Map, startTime: number, resolve: (value: any) => void, reject: (reason?: any) => void}> */ @@ -90,10 +90,10 @@ export default { view.model.on('change:spec', () => updateSpec()); function configureCoordinator() { - coordinator().dataCubeIndexer.temp = getTempIndexes(); + coordinator().dataCubeIndexer.schema = getDataCubeSchema(); } - view.model.on('change:temp_indexes', () => configureCoordinator()); + view.model.on('change:data_cube_schema', () => configureCoordinator()); view.model.on('msg:custom', (msg, buffers) => { logger.group(`query ${msg.uuid}`);