Skip to content

Commit

Permalink
feat(snowflake-driver): Improve support for NUMERIC in pre-aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed May 24, 2021
1 parent 44f09c3 commit 15e829f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 23 deletions.
27 changes: 12 additions & 15 deletions packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ import { unlink } from 'fs-extra';
import tempy from 'tempy';
import csvWriter from 'csv-write-stream';
import {
BaseDriver,
DownloadTableCSVData,
DownloadTableData,
DownloadTableMemoryData,
StreamTableData,
BaseDriver, DownloadTableCSVData, DownloadTableMemoryData,
DriverInterface, IndexesSQL, StreamTableData, TableStructure,
} from '@cubejs-backend/query-orchestrator';
import { getEnv } from '@cubejs-backend/shared';
import { format as formatSql } from 'sqlstring';
Expand All @@ -24,12 +21,7 @@ const GenericTypeToCubeStore: Record<string, string> = {
text: 'varchar(255)'
};

type Column = {
type: string;
name: string;
};

export class CubeStoreDriver extends BaseDriver {
export class CubeStoreDriver extends BaseDriver implements DriverInterface {
protected readonly config: any;

protected readonly connection: WebSocketConnection;
Expand Down Expand Up @@ -89,7 +81,7 @@ export class CubeStoreDriver extends BaseDriver {
return super.toColumnValue(value, genericType);
}

public async uploadTableWithIndexes(table: string, columns: Column[], tableData: any, indexesSql: any) {
public async uploadTableWithIndexes(table: string, columns: TableStructure, tableData: any, indexesSql: IndexesSQL) {
const indexes =
indexesSql.map((s: any) => s.sql[0].replace(/^CREATE INDEX (.*?) ON (.*?) \((.*)$/, 'INDEX $1 ($3')).join(' ');

Expand All @@ -104,7 +96,12 @@ export class CubeStoreDriver extends BaseDriver {
}
}

private async importRows(table: string, columns: Column[], indexesSql: any, tableData: DownloadTableMemoryData) {
private async importRows(
table: string,
columns: TableStructure,
indexesSql: IndexesSQL,
tableData: DownloadTableMemoryData
) {
await this.createTable(table, columns);
try {
for (let i = 0; i < indexesSql.length; i++) {
Expand Down Expand Up @@ -134,7 +131,7 @@ export class CubeStoreDriver extends BaseDriver {
}
}

private async importCsvFile(tableData: DownloadTableCSVData, table: string, columns: Column[], indexes) {
private async importCsvFile(tableData: DownloadTableCSVData, table: string, columns: TableStructure, indexes) {
const files = Array.isArray(tableData.csvFile) ? tableData.csvFile : [tableData.csvFile];
const createTableSql = this.createTableSql(table, columns);

Expand All @@ -154,7 +151,7 @@ export class CubeStoreDriver extends BaseDriver {
});
}

private async importStream(columns: Column[], tableData: StreamTableData, table: string, indexes) {
private async importStream(columns: TableStructure, tableData: StreamTableData, table: string, indexes) {
const writer = csvWriter({ headers: columns.map(c => c.name) });
const gzipStream = createGzip();
const tempFile = tempy.file();
Expand Down
9 changes: 8 additions & 1 deletion packages/cubejs-query-orchestrator/src/driver/BaseDriver.js
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ export class BaseDriver {
[name, schema]
);

return columns.map(c => ({ name: c.column_name, type: this.toGenericType(c.data_type) }));
return columns.map(c => ({ name: c.column_name, type: this.toGenericTypeFromColumn(c) }));
}

createTable(quotedTableName, columns) {
Expand All @@ -354,6 +354,13 @@ export class BaseDriver {
return `CREATE TABLE ${quotedTableName} (${columns.join(', ')})`;
}

/**
* @param {object} column
*/
toGenericTypeFromColumn(column) {
return this.toGenericType(column.data_type);
}

/**
* @param {string} columnType
* @return {string}
Expand Down
40 changes: 33 additions & 7 deletions packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,21 @@ const hydrators: HydrationConfiguration[] = [
}
];

type NumberColumn = {
NAME: string,
TYPE: 'NUMBER',
NUMERIC_PRECISION: string,
NUMERIC_SCALE: string,
};
type CommonColumn = {
NAME: string,
TYPE: string,
NUMERIC_PRECISION: null,
NUMERIC_SCALE: null,
};
type ColumnInfo = NumberColumn | CommonColumn;

const SnowflakeToGenericType: Record<string, GenericDataBaseType> = {
number: 'decimal',
timestamp_ntz: 'timestamp'
};

Expand Down Expand Up @@ -407,20 +420,33 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface {
return SnowflakeToGenericType[columnType.toLowerCase()] || super.toGenericType(columnType);
}

public toGenericTypeFromColumn(column: ColumnInfo) {
if (column.TYPE === 'NUMBER') {
// 2^64 = 18446744073709551616
if (column.NUMERIC_SCALE === '0' && parseInt(column.NUMERIC_PRECISION, 10) <= 19) {
return 'bigint';
}

return `DECIMAL(${column.NUMERIC_PRECISION}, ${column.NUMERIC_SCALE})`;
}

return SnowflakeToGenericType[column.TYPE.toLowerCase()] || super.toGenericType(column.TYPE);
}

public async tableColumnTypes(table: string) {
const [schema, name] = table.split('.');

const columns = await this.query<{ COLUMN_NAME: string, DATA_TYPE: string }[]>(
`SELECT columns.column_name,
columns.table_name,
columns.table_schema,
columns.data_type
const columns = await this.query<ColumnInfo[]>(
`SELECT columns.column_name as name,
columns.data_type as type,
columns.numeric_precision,
columns.numeric_scale
FROM information_schema.columns
WHERE table_name = ${this.param(0)} AND table_schema = ${this.param(1)}`,
[name.toUpperCase(), schema.toUpperCase()]
);

return columns.map(c => ({ name: c.COLUMN_NAME, type: this.toGenericType(c.DATA_TYPE) }));
return columns.map((c) => ({ name: c.NAME, type: this.toGenericTypeFromColumn(c) }));
}

public async getTablesQuery(schemaName: string) {
Expand Down

0 comments on commit 15e829f

Please sign in to comment.