Skip to content

Commit

Permalink
feat: Create data cube index tables in a schema. (#519)
Browse files Browse the repository at this point in the history
* feat!: Create data cube index tables in a schema.

* rename to data_cube_schema and implement corresponding python part

* docs: Fix docs typo.

---------

Co-authored-by: Dominik Moritz <domoritz@gmail.com>
  • Loading branch information
jheer and domoritz authored Sep 15, 2024
1 parent cf19ab0 commit 09d292c
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 67 deletions.
2 changes: 1 addition & 1 deletion dev/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@
}

function setIndex() {
vg.coordinator().dataCubeIndexer.enabled(indexToggle.checked);
vg.coordinator().dataCubeIndexer.enabled = indexToggle.checked;
}

function reload() {
Expand Down
2 changes: 1 addition & 1 deletion docs/api/core/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Create a new Mosaic Coordinator to manage all database communication for clients
* _logger_: The logger to use, defaults to `console`.
* _cache_: Boolean flag to enable/disable query caching (default `true`).
* _consolidate_ Boolean flag to enable/disable query consolidation (default `true`).
* _indexes_: Data cube indexer options object. The _enabled_ flag (default `true`) determines if data cube indexes should be used when possible. The _temp_ flag (default `true`) controls if temporary tables should be created for data cube indexes.
* _indexes_: Data cube indexer options object. The _enabled_ flag (default `true`) determines if data cube indexes should be used when possible. The _schema_ option (default `'mosaic'`) indicates the database schema in which data cube index tables should be created.

## databaseConnector

Expand Down
58 changes: 31 additions & 27 deletions packages/core/src/Coordinator.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { socketConnector } from './connectors/socket.js';
import { DataCubeIndexer } from './DataCubeIndexer.js';
import { MosaicClient } from './MosaicClient.js';
import { QueryManager, Priority } from './QueryManager.js';
import { queryFieldInfo } from './util/field-info.js';
import { QueryResult } from './util/query-result.js';
import { voidLogger } from './util/void-logger.js';

/**
Expand All @@ -24,6 +26,10 @@ export function coordinator(instance) {
return _instance;
}

/**
* @typedef {import('@uwdata/mosaic-sql').Query | string} QueryType
*/

/**
* A Mosaic Coordinator manages all database communication for clients and
* handles selection updates. The Coordinator also performs optimizations
Expand All @@ -34,7 +40,8 @@ export function coordinator(instance) {
* @param {*} [options.manager] The query manager to use.
* @param {boolean} [options.cache=true] Boolean flag to enable/disable query caching.
* @param {boolean} [options.consolidate=true] Boolean flag to enable/disable query consolidation.
* @param {object} [options.indexes] Data cube indexer options.
* @param {import('./DataCubeIndexer.js').DataCubeIndexerOptions} [options.indexes]
* Data cube indexer options.
*/
export class Coordinator {
constructor(db = socketConnector(), {
Expand All @@ -48,10 +55,10 @@ export class Coordinator {
this.manager = manager;
this.manager.cache(cache);
this.manager.consolidate(consolidate);
this.dataCubeIndexer = new DataCubeIndexer(this, indexes);
this.logger(logger);
this.databaseConnector(db);
this.logger(logger);
this.clear();
this.dataCubeIndexer = new DataCubeIndexer(this, indexes);
}

/**
Expand Down Expand Up @@ -98,7 +105,7 @@ export class Coordinator {
/**
* Cancel previosuly submitted query requests. These queries will be
* canceled if they are queued but have not yet been submitted.
* @param {import('./util/query-result.js').QueryResult[]} requests An array
* @param {QueryResult[]} requests An array
* of query result objects, such as those returned by the `query` method.
*/
cancel(requests) {
Expand All @@ -107,29 +114,30 @@ export class Coordinator {

/**
* Issue a query for which no result (return value) is needed.
* @param {import('@uwdata/mosaic-sql').Query | string} query The query.
* @param {QueryType | QueryType[]} query The query or an array of queries.
* Each query should be either a Query builder object or a SQL string.
* @param {object} [options] An options object.
* @param {number} [options.priority] The query priority, defaults to
* `Priority.Normal`.
* @returns {import('./util/query-result.js').QueryResult} A query result
* @returns {QueryResult} A query result
* promise.
*/
exec(query, { priority = Priority.Normal } = {}) {
query = Array.isArray(query) ? query.join(';\n') : query;
query = Array.isArray(query) ? query.filter(x => x).join(';\n') : query;
return this.manager.request({ type: 'exec', query }, priority);
}

/**
* Issue a query to the backing database. The submitted query may be
* consolidate with other queries and its results may be cached.
* @param {import('@uwdata/mosaic-sql').Query | string} query The query.
* @param {QueryType} query The query as either a Query builder object
* or a SQL string.
* @param {object} [options] An options object.
* @param {'arrow' | 'json'} [options.type] The query result format type.
* @param {boolean} [options.cache=true] If true, cache the query result.
* @param {number} [options.priority] The query priority, defaults to
* `Priority.Normal`.
* @returns {import('./util/query-result.js').QueryResult} A query result
* promise.
* @returns {QueryResult} A query result promise.
*/
query(query, {
type = 'arrow',
Expand All @@ -143,11 +151,11 @@ export class Coordinator {
/**
* Issue a query to prefetch data for later use. The query result is cached
* for efficient future access.
* @param {import('@uwdata/mosaic-sql').Query | string} query The query.
* @param {QueryType} query The query as either a Query builder object
* or a SQL string.
* @param {object} [options] An options object.
* @param {'arrow' | 'json'} [options.type] The query result format type.
* @returns {import('./util/query-result.js').QueryResult} A query result
* promise.
* @returns {QueryResult} A query result promise.
*/
prefetch(query, options = {}) {
return this.query(query, { ...options, cache: true, priority: Priority.Low });
Expand All @@ -159,7 +167,7 @@ export class Coordinator {
* @param {string} name The name of the bundle.
* @param {[string | {sql: string}, {alias: string}]} queries The queries to save into the bundle.
* @param {number} priority Request priority.
* @returns
* @returns {QueryResult} A query result promise.
*/
createBundle(name, queries, priority = Priority.Low) {
const options = { name, queries: queries.map(q => typeof q == 'string' ? {sql: q} : q) };
Expand All @@ -170,7 +178,7 @@ export class Coordinator {
* Load a bundle into the cache.
* @param {string} name The name of the bundle.
* @param {number} priority Request priority.
* @returns
* @returns {QueryResult} A query result promise.
*/
loadBundle(name, priority = Priority.High) {
const options = { name };
Expand All @@ -182,8 +190,8 @@ export class Coordinator {
/**
* Update client data by submitting the given query and returning the
* data (or error) to the client.
* @param {import('./MosaicClient.js').MosaicClient} client A Mosaic client.
* @param {import('@uwdata/mosaic-sql').Query | string} query The data query.
* @param {MosaicClient} client A Mosaic client.
* @param {QueryType} query The data query.
* @param {number} [priority] The query priority.
* @returns {Promise} A Promise that resolves upon completion of the update.
*/
Expand All @@ -201,10 +209,8 @@ export class Coordinator {
* Issue a query request for a client. If the query is null or undefined,
* the client is simply updated. Otherwise `updateClient` is called. As a
* side effect, this method clears the current data cube indexer state.
* @param {import('./MosaicClient.js').MosaicClient} client The client
* to update.
* @param {import('@uwdata/mosaic-sql').Query | string | null} [query]
* The query to issue.
* @param {MosaicClient} client The client to update.
* @param {QueryType | null} [query] The query to issue.
*/
requestQuery(client, query) {
this.dataCubeIndexer.clear();
Expand All @@ -215,8 +221,7 @@ export class Coordinator {

/**
* Connect a client to the coordinator.
* @param {import('./MosaicClient.js').MosaicClient} client The Mosaic
* client to connect.
* @param {MosaicClient} client The Mosaic client to connect.
*/
async connect(client) {
const { clients } = this;
Expand Down Expand Up @@ -247,8 +252,7 @@ export class Coordinator {

/**
* Disconnect a client from the coordinator.
* @param {import('./MosaicClient.js').MosaicClient} client The Mosaic
* client to disconnect.
* @param {MosaicClient} client The Mosaic client to disconnect.
*/
disconnect(client) {
const { clients, filterGroups } = this;
Expand All @@ -267,8 +271,8 @@ export class Coordinator {
* Connect a selection-client pair to the coordinator to process updates.
* @param {Coordinator} mc The Mosaic coordinator.
* @param {import('./Selection.js').Selection} selection A selection.
* @param {import('./MosaicClient.js').MosaicClient} client A Mosiac
* client that is filtered by the given selection.
* @param {MosaicClient} client A Mosiac client that is filtered by the
* given selection.
*/
function connectSelection(mc, selection, client) {
if (!selection) return;
Expand Down
102 changes: 81 additions & 21 deletions packages/core/src/DataCubeIndexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,112 @@ import { fnv_hash } from './util/hash.js';

const Skip = { skip: true, result: null };

/**
* @typedef {object} DataCubeIndexerOptions
* @property {string} [schema] Database schema (namespace) in which to write
* data cube index tables (default 'mosaic').
* @property {boolean} [options.enabled=true] Flag to enable or disable the
* indexer. This setting can later be updated via the `enabled` method.
*/

/**
* Build and query optimized indices ("data cubes") for fast computation of
* groupby aggregate queries over compatible client queries and selections.
* A data cube contains pre-aggregated data for a Mosaic client, subdivided
* by possible query values from an active selection clause. These cubes are
* realized as as database tables that can be queried for rapid updates.
*
* Compatible client queries must consist of only groupby dimensions and
* supported aggregate functions. Compatible selections must contain an active
* clause that exposes metadata for an interval or point value predicate.
*
* Data cube index tables are written to a dedicated schema (namespace) that
* can be set using the *schema* constructor option. This schema acts as a
* persistent cache, and index tables may be used across sessions. The
* `dropIndexTables` method issues a query to remove *all* tables within
* this schema. This may be needed if the original tables have updated data,
* but should be used with care.
*/
export class DataCubeIndexer {
/**
* Create a new data cube index table manager.
* @param {import('./Coordinator.js').Coordinator} coordinator A Mosaic coordinator.
* @param {object} [options] Indexer options.
* @param {boolean} [options.enabled=true] Flag to enable/disable indexer.
* @param {boolean} [options.temp=true] Flag to indicate if generated data
* cube index tables should be temporary tables.
* @param {DataCubeIndexerOptions} [options] Data cube indexer options.
*/
constructor(coordinator, {
enabled = true,
temp = true
schema = 'mosaic',
enabled = true
} = {}) {
/** @type {Map<import('./MosaicClient.js').MosaicClient, DataCubeInfo | Skip | null>} */
this.indexes = new Map();
this.active = null;
this.temp = temp;
this.mc = coordinator;
this._schema = schema;
this._enabled = enabled;
}

/**
* Set the enabled state of this indexer. If false, any cached state is
* Set the enabled state of this indexer. If false, any local state is
* cleared and subsequent index calls will return null until re-enabled.
* @param {boolean} state The enabled state.
* This method has no effect on any index tables already in the database.
* @param {boolean} [state] The enabled state to set.
*/
enabled(state) {
if (state === undefined) {
return this._enabled;
} else if (this._enabled !== state) {
set enabled(state) {
if (this._enabled !== state) {
if (!state) this.clear();
this._enabled = state;
}
}

/**
* Get the enabled state of this indexer.
* @returns {boolean} The current enabled state.
*/
get enabled() {
return this._enabled;
}

/**
* Set the database schema used by this indexer. Upon changes, any local
* state is cleared. This method does _not_ drop any existing data cube
* tables, use `dropIndexTables` before changing the schema to also remove
* existing index tables in the database.
* @param {string} [schema] The schema name to set.
*/
set schema(schema) {
if (this._schema !== schema) {
this.clear();
this._schema = schema;
}
}

/**
* Get the database schema used by this indexer.
* @returns {string} The current schema name.
*/
get schema() {
return this._schema;
}

/**
* Issues a query through the coordinator to drop the current index table
* schema. *All* tables in the schema will be removed and local state is
* cleared. Call this method if the underlying base tables have been updated,
* causing derived index tables to become stale and inaccurate. Use this
* method with care! Once dropped, the schema will be repopulated by future
* data cube indexer requests.
* @returns A query result promise.
*/
dropIndexTables() {
this.clear();
return this.mc.exec(`DROP SCHEMA IF EXISTS "${this.schema}" CASCADE`);
}

/**
* Clear the cache of data cube index table entries for the current active
* selection clause. This method does _not_ drop any existing data cube
* tables.
* tables. Use `dropIndexTables` to remove existing index tables from the
* database.
*/
clear() {
this.indexes.clear();
Expand All @@ -77,9 +134,9 @@ export class DataCubeIndexer {
*/
index(client, selection, activeClause) {
// if not enabled, do nothing
if (!this._enabled) return null;
if (!this.enabled) return null;

const { indexes, mc, temp } = this;
const { indexes, mc, schema } = this;
const { source } = activeClause;

// if there is no clause source to track, do nothing
Expand Down Expand Up @@ -125,8 +182,11 @@ export class DataCubeIndexer {
} else {
// generate data cube index table
const filter = selection.remove(source).predicate(client);
info = dataCubeInfo(client.query(filter), active, indexCols);
info.result = mc.exec(create(info.table, info.create, { temp }));
info = dataCubeInfo(client.query(filter), active, indexCols, schema);
info.result = mc.exec([
`CREATE SCHEMA IF NOT EXISTS ${schema}`,
create(info.table, info.create, { temp: false })
]);
info.result.catch(e => mc.logger().error(e));
}

Expand Down Expand Up @@ -223,7 +283,7 @@ function binInterval(scale, pixelSize, bin) {
* @param {*} indexCols Data cube index column definitions.
* @returns {DataCubeInfo}
*/
function dataCubeInfo(clientQuery, active, indexCols) {
function dataCubeInfo(clientQuery, active, indexCols, schema) {
const { dims, aggr, aux } = indexCols;
const { columns } = active;

Expand All @@ -246,7 +306,7 @@ function dataCubeInfo(clientQuery, active, indexCols) {
// generate creation query string and hash id
const create = query.toString();
const id = (fnv_hash(create) >>> 0).toString(16);
const table = `cube_index_${id}`;
const table = `${schema}.cube_${id}`;

// generate data cube select query
const select = Query
Expand All @@ -255,7 +315,7 @@ function dataCubeInfo(clientQuery, active, indexCols) {
.groupby(dims)
.orderby(order);

return new DataCubeInfo({ table, create, active, select });
return new DataCubeInfo({ id, table, create, active, select });
}

/**
Expand Down
Loading

0 comments on commit 09d292c

Please sign in to comment.